2.3 Join Operator
The JoinOperator
is used to join the data from multiple input data into a single data.
Example, if you have two parents, you can join the data from both parents into a single
data.
There are one way to use the JoinOperator
:
Build a JoinOperator
with a combine function
from dbgpt.core.awel import DAG, JoinOperator
with DAG("awel_join_operator") as dag:
task = JoinOperator(combine_function=lambda x, y: x + y)
Examples
Two Sum
In this example, we will create a JoinOperator
that sums the data from two parents.
Create a new file named join_operator_sum_numbers.py
in the awel_tutorial
directory and add the following code:
import asyncio
from dbgpt.core.awel import (
DAG, JoinOperator, MapOperator, InputOperator, SimpleCallDataInputSource
)
with DAG("sum_numbers_dag") as dag:
# Create a input task to receive data from call_data
input_task = InputOperator(input_source=SimpleCallDataInputSource())
task1 = MapOperator(map_function=lambda x: x["t1"])
task2 = MapOperator(map_function=lambda x: x["t2"])
sum_task = JoinOperator(combine_function=lambda x, y: x + y)
input_task >> task1 >> sum_task
input_task >> task2 >> sum_task
if asyncio.run(sum_task.call(call_data={"t1": 5, "t2": 8})) == 13:
print("Success!")
else:
print("Failed")
And run the following command to execute the code:
poetry run python awel_tutorial/join_operator_sum_numbers.py
And you will see "Success!" printed to the console.
Success!
The graph of the DAG is like this: