2.5 Streamify Operator
The StreamifyAbsOperator
is used to convert a single data into a stream of data.
There are one way to use the StreamifyAbsOperator
:
Implement A Custom StreamifyAbsOperator
Just override the streamify
method to return an async iterable.
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
In above example, the NumberProducerOperator
is a custom StreamifyAbsOperator
that
creates a stream of numbers from 0 to n-1
. It receives a single data n
and returns
a stream.