2.5 Streamify Operator
The StreamifyAbsOperator is used to convert a single data into a stream of data.
There are one way to use the StreamifyAbsOperator:
Implement A Custom StreamifyAbsOperator
Just override the streamify method to return an async iterable.
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
In above example, the NumberProducerOperator is a custom StreamifyAbsOperator that
creates a stream of numbers from 0 to n-1. It receives a single data n and returns
a stream.
Examples
Create A Stream Of Numbers
Create a new file named streamify_operator_numbers.py in the awel_tutorial directory and add the following code:
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
async def print_stream(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)
asyncio.run(print_stream(task, 10))
And run the following command to execute the code:
poetry run python awel_tutorial/streamify_operator_numbers.py
And you will see the following output printed to the console.
0
1
2
3
4
5
6
7
8
9