Skip to main content
Version: v0.6.0

2.7 Transform Stream Operator

The TransformStreamAbsOperator is used to transform the streaming data to another streaming data.

There are one way to use the TransformStreamAbsOperator:

Implement a custom TransformStreamAbsOperator

Just override the transform method to return a new async iterable.

from typing import AsyncIterator
from dbgpt.core.awel import DAG, TransformStreamAbsOperator

class NumberDoubleOperator(TransformStreamAbsOperator[int, int]):
async def transform_stream(self, it: AsyncIterator) -> AsyncIterator[int]:
async for i in it:
# Double the number
yield i * 2

with DAG("numbers_dag") as dag:
task = NumberDoubleOperator()

Examples

Double The Numbers

Create a new file named transform_stream_operator_double_numbers.py in the awel_tutorial directory and add the following code:

import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, TransformStreamAbsOperator, StreamifyAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

class NumberDoubleOperator(TransformStreamAbsOperator[int, int]):
async def transform_stream(self, it: AsyncIterator) -> AsyncIterator[int]:
async for i in it:
# Double the number
yield i * 2

with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
double_task = NumberDoubleOperator()
task >> double_task
async def print_stream(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)

asyncio.run(print_stream(double_task, 10))

And run the following command to execute the code:

poetry run python awel_tutorial/transform_stream_operator_double_numbers.py

And you will see the following output printed to the console.

0
2
4
6
8
10
12
14
16
18