Skip to main content
Version: v0.6.0

2.2 Reduce Operator

The ReduceStreamOperator is used to reduce the streaming data to non-streaming data.

There are two ways to use the ReduceStreamOperator:

Build a ReduceStreamOperator with a reduce function

from dbgpt.core.awel import DAG, ReduceStreamOperator

with DAG("awel_reduce_operator") as dag:
task = ReduceStreamOperator(reduce_function=lambda x, y: x + y)

Implement a custom ReduceStreamOperator

from dbgpt.core.awel import DAG, ReduceStreamOperator

class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y

with DAG("awel_reduce_operator") as dag:
task = MySumOperator()

Examples

Sum the numbers

Create a new file named reduce_operator_sum_numbers.py in the awel_tutorial directory and add the following code:

import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, ReduceStreamOperator, StreamifyAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y

with DAG("sum_numbers_dag") as dag:
task = NumberProducerOperator()
sum_task = MySumOperator()
task >> sum_task

o1 = asyncio.run(sum_task.call(call_data=5))
if o1 == sum(range(5)):
print(f"Success! n is 5, sum is {o1}")
else:
print("Failed")
o2 = asyncio.run(sum_task.call(call_data=10))
if o2 == sum(range(10)):
print(f"Success! n is 10, sum is {o2}")
else:
print("Failed")

Then run the following command to execute the code:

poetry run python awel_tutorial/reduce_operator_sum_numbers.py

And you will see "Success! n is 5, sum is 10" and "Success! n is 10, sum is 45" printed to the console.

Success! n is 5, sum is 10
Success! n is 10, sum is 45