2.5 Streamify Operator
The StreamifyAbsOperator
is used to convert a single data into a stream of data.
There are one way to use the StreamifyAbsOperator
:
Implement A Custom StreamifyAbsOperator
Just override the streamify
method to return an async iterable.
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
In above example, the NumberProducerOperator
is a custom StreamifyAbsOperator
that
creates a stream of numbers from 0 to n-1
. It receives a single data n
and returns
a stream.
Examples
Create A Stream Of Numbers
Create a new file named streamify_operator_numbers.py
in the awel_tutorial
directory and add the following code:
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
async def print_stream(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)
asyncio.run(print_stream(task, 10))
And run the following command to execute the code:
poetry run python awel_tutorial/streamify_operator_numbers.py
And you will see the following output printed to the console.
0
1
2
3
4
5
6
7
8
9
Mock A Streaming LLM Service
Create a new file named streamify_operator_mock_llm_service.py
in the awel_tutorial
directory and add the following code:
import asyncio
from typing import AsyncIterator, List
from dbgpt.core.awel import DAG, StreamifyAbsOperator
class MockLLMService(StreamifyAbsOperator[str, str]):
"""Mock a streaming LLM service"""
def __init__(self, mock_data: List[str], **kwargs):
self.mock_data = mock_data
super().__init__(**kwargs)
async def streamify(self, user_input: str) -> AsyncIterator[str]:
for data in self.mock_data:
yield data
with DAG("mock_llm_service_dag") as dag:
task = MockLLMService(mock_data=["Hello, ", "how ", "can ", "I ", "help ", "you?"])
async def print_stream(t, user_input: str):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=user_input):
print(i, end="")
asyncio.run(print_stream(task, "Hi"))
And run the following command to execute the code:
poetry run python awel_tutorial/streamify_operator_mock_llm_service.py
And you will see the following output printed to the console.
Hello, how can I help you?