Skip to main content
Version: v0.6.0

2.5 Streamify Operator

The StreamifyAbsOperator is used to convert a single data into a stream of data.

There are one way to use the StreamifyAbsOperator:

Implement A Custom StreamifyAbsOperator

Just override the streamify method to return an async iterable.

from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

with DAG("numbers_dag") as dag:
task = NumberProducerOperator()

In above example, the NumberProducerOperator is a custom StreamifyAbsOperator that creates a stream of numbers from 0 to n-1. It receives a single data n and returns a stream.

Examples

Create A Stream Of Numbers

Create a new file named streamify_operator_numbers.py in the awel_tutorial directory and add the following code:

import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

with DAG("numbers_dag") as dag:
task = NumberProducerOperator()

async def print_stream(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)

asyncio.run(print_stream(task, 10))

And run the following command to execute the code:

poetry run python awel_tutorial/streamify_operator_numbers.py

And you will see the following output printed to the console.

0
1
2
3
4
5
6
7
8
9

Mock A Streaming LLM Service

Create a new file named streamify_operator_mock_llm_service.py in the awel_tutorial directory and add the following code:

import asyncio
from typing import AsyncIterator, List
from dbgpt.core.awel import DAG, StreamifyAbsOperator

class MockLLMService(StreamifyAbsOperator[str, str]):
"""Mock a streaming LLM service"""
def __init__(self, mock_data: List[str], **kwargs):
self.mock_data = mock_data
super().__init__(**kwargs)

async def streamify(self, user_input: str) -> AsyncIterator[str]:
for data in self.mock_data:
yield data

with DAG("mock_llm_service_dag") as dag:
task = MockLLMService(mock_data=["Hello, ", "how ", "can ", "I ", "help ", "you?"])

async def print_stream(t, user_input: str):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=user_input):
print(i, end="")

asyncio.run(print_stream(task, "Hi"))

And run the following command to execute the code:

poetry run python awel_tutorial/streamify_operator_mock_llm_service.py

And you will see the following output printed to the console.

Hello, how can I help you?