Skip to main content
Version: v0.6.0

1.3 Custom Operator

Your First Custom Operator

It is easy to create a custom operator in AWEL. In this section, we will create a custom operator that prints the "Hello, world!" message.

In most cases, you just need to inherit basic operators and override the corresponding methods.

Create a new file named hello_world_custom_operator.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import DAG, MapOperator

class HelloWorldOperator(MapOperator[str, None]):
async def map(self, x: str) -> None:
print(f"Hello, {x}!")

with DAG("awel_hello_world") as dag:
task = HelloWorldOperator()

asyncio.run(task.call(call_data="world"))

And run the following command to execute the code:

poetry run python awel_tutorial/hello_world_custom_operator.py

And you will see "Hello, world!" printed to the console.

Hello, world!

Your First Streaming Operator

Let's create a streaming operator that creates a stream of numbers from 0 to n-1, then doubles each number in another streaming operator.

Create a new file named custom_streaming_operator.py in the awel_tutorial

import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator, TransformStreamAbsOperator

class NumberProducerOperator(StreamifyAbsOperator[int, int]):
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i

class NumberDoubleOperator(TransformStreamAbsOperator[int, int]):
async def transform_stream(self, it: AsyncIterator) -> AsyncIterator[int]:
async for i in it:
# Double the number
yield i * 2

with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
double_task = NumberDoubleOperator()
task >> double_task

async def helper_call_fn(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)

asyncio.run(helper_call_fn(double_task, 10))

And run the following command to execute the code:

poetry run python awel_tutorial/custom_streaming_operator.py

And you will see the following output printed to the console.

0
2
4
6
8
10
12
14
16
18

In this example, we call the call_stream method to execute the streaming operator, please don't forget to use await to get the streaming result.