Skip to main content
Version: v0.6.0

2.6 Unstreamify Operator

The UnstreamifyAbsOperator is the opposite of the StreamifyAbsOperator. It converts a stream of data into a single data.

There are one way to use the UnstreamifyAbsOperator:

Implement A Custom UnstreamifyAbsOperator

Just override the unstreamify method to return a single data.

from typing import AsyncIterator
from dbgpt.core.awel import DAG, UnstreamifyAbsOperator

class SumOperator(UnstreamifyAbsOperator[int, int]):
"""Unstreamify the stream of numbers"""
async def unstreamify(self, it: AsyncIterator[int]) -> int:
return sum([i async for i in it])

with DAG("sum_dag") as dag:
task = SumOperator()

Examples

Sum The Numbers

Create a new file named unstreamify_operator_sum_numbers.py in the awel_tutorial directory and add the following code:

import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, UnstreamifyAbsOperator, StreamifyAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

class SumOperator(UnstreamifyAbsOperator[int, int]):
"""Unstreamify the stream of numbers"""
async def unstreamify(self, it: AsyncIterator[int]) -> int:
return sum([i async for i in it])

with DAG("sum_dag") as dag:
task = NumberProducerOperator()
sum_task = SumOperator()
task >> sum_task

print(asyncio.run(sum_task.call(call_data=5)))
print(asyncio.run(sum_task.call(call_data=10)))

And run the following command to execute the code:

poetry run python awel_tutorial/unstreamify_operator_sum_numbers.py

And you will see the following output printed to the console.

10
45