2.8 Input Operator
The input operator is used to read a value from an input source. It allways as the first Operator in a DAG, and it allows you easily to write your own input source.
The input operator is a special operator, it does not have any input, and it has one output.
There are one way to use the input operator:
Build A InputOperator
With A Input Source
Just pass the input source to the InputOperator
constructor.
from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource
with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data="Hello, World!")
input_task = InputOperator(input_source=input_source)
Examples
Print The Input Data
This example shows how to use the InputOperator
to print the input data, it uses
SimpleInputSource
which is build with a string data as input source.
Create a new file named input_operator_print_data.py
in the awel_tutorial
directory
and add the following code:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleInputSource
with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data="Hello, World!")
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))
input_task >> print_task
asyncio.run(print_task.call())
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_data.py
And you will see the following output:
Hello, World!
Print Stream Data
This example shows how to use the InputOperator
to print the stream data, it uses
SimpleInputSource
which is build with a stream data as input source.
Create a new file named input_operator_print_stream_data.py
in the awel_tutorial
directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource
async def stream_data():
for i in range(10):
yield i
with DAG("awel_input_operator") as dag:
input_source = SimpleInputSource(data=stream_data())
input_task = InputOperator(input_source=input_source)
async def print_stream(t: InputOperator):
async for i in await t.call_stream():
print(i)
asyncio.run(print_stream(input_task))
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_stream_data.py
And you will see the following output printed to the console.
0
1
2
3
4
5
6
7
8
9
Print Call Data
The call data is the data that is passed to the call
method or call_stream
method
of the operator.
This example shows how to use the InputOperator
to print the call data, it uses
SimpleCallDataInputSource
which is build with a call data as input source.
Create a new file named input_operator_print_call_data.py
in the awel_tutorial
directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_input_operator") as dag:
input_source = SimpleCallDataInputSource()
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))
input_task >> print_task
asyncio.run(print_task.call(call_data="Hello, World!"))
asyncio.run(print_task.call(call_data="AWEL is cool!"))
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_call_data.py
And you will see the following output printed to the console.
Hello, World!
AWEL is cool!
Input Source
There two built-in input sources, SimpleInputSource
and SimpleCallDataInputSource
.
SimpleInputSource
SimpleInputSource
is used to create an input source with a single data or a stream data.
SimpleCallDataInputSource
SimpleCallDataInputSource
is used to create an input source with a call data which
is passed by the call
method or call_stream
method of the operator.
Create Your Own Input Source
The simplest way to create your own input source is implementing the BaseInputSource
and override the _read_data
method.
Create a new file named my_input_source.py
in the awel_tutorial
directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, InputOperator, MapOperator, BaseInputSource, TaskContext
class MyInputSource(BaseInputSource):
"""Create an input source with a single data"""
def _read_data(self, ctx: TaskContext) -> str:
return "Hello, World!"
with DAG("awel_input_operator") as dag:
input_source = MyInputSource()
input_task = InputOperator(input_source=input_source)
print_task = MapOperator(map_function=lambda x: print(x))
input_task >> print_task
asyncio.run(print_task.call())
And run the following command to execute the code:
poetry run python awel_tutorial/my_input_source.py
And you will see the following output printed to the console.
Hello, World!