跳到主要内容
版本:dev

2.3 Join Operator

The JoinOperator is used to join the data from multiple input data into a single data. Example, if you have two parents, you can join the data from both parents into a single data.

There are one way to use the JoinOperator:

Build a JoinOperator with a combine function

from dbgpt.core.awel import DAG, JoinOperator

with DAG("awel_join_operator") as dag:
task = JoinOperator(combine_function=lambda x, y: x + y)

Examples

Two Sum

In this example, we will create a JoinOperator that sums the data from two parents.

Create a new file named join_operator_sum_numbers.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import (
DAG, JoinOperator, MapOperator, InputOperator, SimpleCallDataInputSource
)

with DAG("sum_numbers_dag") as dag:
# Create a input task to receive data from call_data
input_task = InputOperator(input_source=SimpleCallDataInputSource())
task1 = MapOperator(map_function=lambda x: x["t1"])
task2 = MapOperator(map_function=lambda x: x["t2"])
sum_task = JoinOperator(combine_function=lambda x, y: x + y)
input_task >> task1 >> sum_task
input_task >> task2 >> sum_task

if asyncio.run(sum_task.call(call_data={"t1": 5, "t2": 8})) == 13:
print("Success!")
else:
print("Failed")

And run the following command to execute the code:

poetry run python awel_tutorial/join_operator_sum_numbers.py

And you will see "Success!" printed to the console.

Success!

The graph of the DAG is like this: