Skip to main content
Version: dev

2.8 Input Operator

The input operator is used to read a value from an input source. It allways as the first Operator in a DAG, and it allows you easily to write your own input source.

The input operator is a special operator, it does not have any input, and it has one output.

There are one way to use the input operator:

Build A InputOperator With A Input Source

Just pass the input source to the InputOperator constructor.

from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource

with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data="Hello, World!")
input_task = InputOperator(input_source=input_source)

Examples

This example shows how to use the InputOperator to print the input data, it uses SimpleInputSource which is build with a string data as input source.

Create a new file named input_operator_print_data.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleInputSource

with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data="Hello, World!")
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))
input_task >> print_task

asyncio.run(print_task.call())

And run the following command to execute the code:

poetry run python awel_tutorial/input_operator_print_data.py

And you will see the following output:

Hello, World!

This example shows how to use the InputOperator to print the stream data, it uses SimpleInputSource which is build with a stream data as input source.

Create a new file named input_operator_print_stream_data.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource

async def stream_data():
for i in range(10):
yield i

with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data=stream_data())
input_task = InputOperator(input_source=input_source)

async def print_stream(t: InputOperator):
async for i in await t.call_stream():
print(i)

asyncio.run(print_stream(input_task))

And run the following command to execute the code:

poetry run python awel_tutorial/input_operator_print_stream_data.py

And you will see the following output printed to the console.

0
1
2
3
4
5
6
7
8
9

The call data is the data that is passed to the call method or call_stream method of the operator.

This example shows how to use the InputOperator to print the call data, it uses SimpleCallDataInputSource which is build with a call data as input source.

Create a new file named input_operator_print_call_data.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource

with DAG("awel_input_operator") as dag:
input_source = SimpleCallDataInputSource()
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))
input_task >> print_task

asyncio.run(print_task.call(call_data="Hello, World!"))
asyncio.run(print_task.call(call_data="AWEL is cool!"))

And run the following command to execute the code:

poetry run python awel_tutorial/input_operator_print_call_data.py

And you will see the following output printed to the console.

Hello, World!
AWEL is cool!

Input Source

There two built-in input sources, SimpleInputSource and SimpleCallDataInputSource.

SimpleInputSource

SimpleInputSource is used to create an input source with a single data or a stream data.

SimpleCallDataInputSource

SimpleCallDataInputSource is used to create an input source with a call data which is passed by the call method or call_stream method of the operator.

Create Your Own Input Source

The simplest way to create your own input source is implementing the BaseInputSource and override the _read_data method.

Create a new file named my_input_source.py in the awel_tutorial directory and add the following code:

import asyncio
from dbgpt.core.awel import DAG, InputOperator, MapOperator, BaseInputSource, TaskContext

class MyInputSource(BaseInputSource):
"""Create an input source with a single data"""
def _read_data(self, ctx: TaskContext) -> str:
return "Hello, World!"

with DAG("awel_input_operator") as dag:
input_source = MyInputSource()
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))

input_task >> print_task

asyncio.run(print_task.call())

And run the following command to execute the code:

poetry run python awel_tutorial/my_input_source.py

And you will see the following output printed to the console.

Hello, World!