2.2 Reduce Operator
The ReduceStreamOperator
is used to reduce the streaming data to non-streaming data.
There are two ways to use the ReduceStreamOperator
:
Build a ReduceStreamOperator
with a reduce function
from dbgpt.core.awel import DAG, ReduceStreamOperator
with DAG("awel_reduce_operator") as dag:
task = ReduceStreamOperator(reduce_function=lambda x, y: x + y)
Implement a custom ReduceStreamOperator
from dbgpt.core.awel import DAG, ReduceStreamOperator
class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y
with DAG("awel_reduce_operator") as dag:
task = MySumOperator()
Examples
Sum the numbers
Create a new file named reduce_operator_sum_numbers.py
in the awel_tutorial
directory and add the following code:
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, ReduceStreamOperator, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y
with DAG("sum_numbers_dag") as dag:
task = NumberProducerOperator()
sum_task = MySumOperator()
task >> sum_task
o1 = asyncio.run(sum_task.call(call_data=5))
if o1 == sum(range(5)):
print(f"Success! n is 5, sum is {o1}")
else:
print("Failed")
o2 = asyncio.run(sum_task.call(call_data=10))
if o2 == sum(range(10)):
print(f"Success! n is 10, sum is {o2}")
else:
print("Failed")
Then run the following command to execute the code:
poetry run python awel_tutorial/reduce_operator_sum_numbers.py
And you will see "Success! n is 5, sum is 10" and "Success! n is 10, sum is 45" printed to the console.
Success! n is 5, sum is 10
Success! n is 10, sum is 45