2.6 Unstreamify Operator
The UnstreamifyAbsOperator
is the opposite of the StreamifyAbsOperator
. It converts
a stream of data into a single data.
There are one way to use the UnstreamifyAbsOperator
:
Implement A Custom UnstreamifyAbsOperator
Just override the unstreamify
method to return a single data.
from typing import AsyncIterator
from dbgpt.core.awel import DAG, UnstreamifyAbsOperator
class SumOperator(UnstreamifyAbsOperator[int, int]):
"""Unstreamify the stream of numbers"""
async def unstreamify(self, it: AsyncIterator[int]) -> int:
return sum([i async for i in it])
with DAG("sum_dag") as dag:
task = SumOperator()
Examples
Sum The Numbers
Create a new file named unstreamify_operator_sum_numbers.py
in the awel_tutorial
directory and add the following code:
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, UnstreamifyAbsOperator, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
class SumOperator(UnstreamifyAbsOperator[int, int]):
"""Unstreamify the stream of numbers"""
async def unstreamify(self, it: AsyncIterator[int]) -> int:
return sum([i async for i in it])
with DAG("sum_dag") as dag:
task = NumberProducerOperator()
sum_task = SumOperator()
task >> sum_task
print(asyncio.run(sum_task.call(call_data=5)))
print(asyncio.run(sum_task.call(call_data=10)))
And run the following command to execute the code:
poetry run python awel_tutorial/unstreamify_operator_sum_numbers.py
And you will see the following output printed to the console.
10
45