2.8 Input Operator
The input operator is used to read a value from an input source. It allways as the first Operator in a DAG, and it allows you easily to write your own input source.
The input operator is a special operator, it does not have any input, and it has one output.
There are one way to use the input operator:
Build A InputOperator With A Input Source
Just pass the input source to the InputOperator constructor.
from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource
with DAG("awel_input_operator") as dag:
    input_source = SimpleInputSource(data="Hello, World!")
    input_task = InputOperator(input_source=input_source)
Examples
Print The Input Data
This example shows how to use the InputOperator to print the input data, it uses
SimpleInputSource which is build with a string data as input source.
Create a new file named input_operator_print_data.py in the awel_tutorial directory
and add the following code:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleInputSource
with DAG("awel_input_operator") as dag:
    input_source = SimpleInputSource(data="Hello, World!")
    input_task = InputOperator(input_source=input_source)
    print_task = MapOperator(map_function=lambda x: print(x))
    input_task >> print_task
asyncio.run(print_task.call())
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_data.py
And you will see the following output:
Hello, World!
Print Stream Data
This example shows how to use the InputOperator to print the stream data, it uses
SimpleInputSource which is build with a stream data as input source.
Create a new file named input_operator_print_stream_data.py in the awel_tutorial
directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, InputOperator, SimpleInputSource
async def stream_data():
    for i in range(10):
        yield i
with DAG("awel_input_operator") as dag:
    input_source = SimpleInputSource(data=stream_data())
    input_task = InputOperator(input_source=input_source)
async def print_stream(t: InputOperator):
    async for i in await t.call_stream():
        print(i)
asyncio.run(print_stream(input_task))
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_stream_data.py
And you will see the following output printed to the console.
0
1
2
3
4
5
6
7
8
9
Print Call Data
The call data is the data that is passed to the call method or call_stream method
of the operator.
This example shows how to use the InputOperator to print the call data, it uses
SimpleCallDataInputSource which is build with a call data as input source.
Create a new file named input_operator_print_call_data.py in the awel_tutorial directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_input_operator") as dag:
    input_source = SimpleCallDataInputSource()
    input_task = InputOperator(input_source=input_source)
    print_task = MapOperator(map_function=lambda x: print(x))
    input_task >> print_task
asyncio.run(print_task.call(call_data="Hello, World!"))
asyncio.run(print_task.call(call_data="AWEL is cool!"))
And run the following command to execute the code:
poetry run python awel_tutorial/input_operator_print_call_data.py
And you will see the following output printed to the console.
Hello, World!
AWEL is cool!
Input Source
There two built-in input sources, SimpleInputSource and SimpleCallDataInputSource.
SimpleInputSource
SimpleInputSource is used to create an input source with a single data or a stream data.
SimpleCallDataInputSource
SimpleCallDataInputSource is used to create an input source with a call data which
is passed by the call method or call_stream method of the operator.
Create Your Own Input Source
The simplest way to create your own input source is implementing the BaseInputSource and override the _read_data method.
Create a new file named my_input_source.py in the awel_tutorial directory and add the following code:
import asyncio
from dbgpt.core.awel import DAG, InputOperator, MapOperator, BaseInputSource, TaskContext
class MyInputSource(BaseInputSource):
    """Create an input source with a single data"""
    def _read_data(self, ctx: TaskContext) -> str:
        return "Hello, World!"
with DAG("awel_input_operator") as dag:
    input_source = MyInputSource()
    input_task = InputOperator(input_source=input_source)
    print_task = MapOperator(map_function=lambda x: print(x))
    input_task >> print_task
asyncio.run(print_task.call())
And run the following command to execute the code:
poetry run python awel_tutorial/my_input_source.py
And you will see the following output printed to the console.
Hello, World!