1.2 How AWEL Works
Introduction
Leet us look again at the DAG from the previous section:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_hello_world") as dag:
input_task = InputOperator(
input_source=SimpleCallDataInputSource()
)
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
input_task >> task
dag.visualize_dag()
asyncio.run(task.call(call_data="world"))
There code contains a few new concepts: DAG
, Operator
, and Task
.
DAG
:DAG
is a class that represents a Directed Acyclic Graph. It is used to define the structure of the tasks and their dependencies.Operator
:InputOperator
andMapOperator
are examples of operators. An operator is a node in the DAG. It can be a source of data, a transformation, or a sink of data. In this example,InputOperator
is a source of data, andMapOperator
is a transformation.Task
: A task is an instance of an operator, it is a dynamic concept.Runner
: A runner is used to execute the tasks in the DAG. When we calltask.call(call_data="world")
, we are using a runner to execute the task. TheDefaultWorkflowRunner
is run your task in the same process. And theRayWorkflowRunner
is run your task in a Ray cluster(Not implemented yet in community version).
DAG
What is a DAG?
A Directed Acyclic Graph (DAG) is a graph that has a set of vertices and a set of directed edges. The edges are directed from one vertex to another, and there are no cycles in the graph. In the context of AWEL, the vertices are the operators, and the edges are the dependencies between the operators.
Operator
What is an Operator?
An operator is a node in the DAG. It can be a source of data, a transformation, or a
call to a LLM service. In the context of AWEL, an operator is a class that inherits
from the dbgpt.core.awel.BaseOperator
class.
According to the type of output data, there are two types of operators: streaming operators and non-streaming operators.
Basic Operators
There are a few basic operators that are used to build up the more complex operators.
InputOperator
: This non-streaming operator is used to get data from an input source.MapOperator
: This non-streaming operator is used to apply a function to the input data and return the transformed data.BranchOperator
: This non-streaming operator is used to decide which path to run based on the input data.JoinOperator
: This non-streaming operator is used to join the data from multiple paths into a single path.StreamifyAbsOperator
: This streaming operator is used to convert the non-streaming operator to a streaming operator.UnstreamifyAbsOperator
: This non-streaming operator is used to convert the streaming data to non-streaming data.TransformStreamAbsOperator
: This streaming operator is used to transform the streaming data to another streaming data.ReduceStreamOperator
: This non-streaming operator is used to reduce the streaming to non-streaming data.TriggerOperator
: This non-streaming operator is used to trigger a task. It is a specialInputOperator
High-level Operators
RequestBuilderOperator
: This non-streaming operator is used to build a model request from the input data.LLMOperator
: This non-streaming operator is used to call a LLM service.StreamingLLMOperator
: This streaming operator is used to call a LLM service and expect a streaming response.LLMBranchOperator
: This non-streaming operator is used to decide which path to run based on the input data.OpenAIStreamingOutputOperator
: This streaming operator is transform the model output to a streaming data compatible with the OpenAI.ChatHistoryPromptComposerOperator
: This non-streaming operator is used to build a high-level task to compose a chat history prompt.
Task
What is a Task?
Task is an instance of an operator. It is a stateless design, it means that the task can be executed multiple times with different input data.
Every task can receive multiple input data from the parent tasks, and return a single output data to the child tasks.
Runner
What is a Runner?
A runner is a class that is used to execute the tasks in the DAG. When we call a task
by task.call(call_data="world")
, we are using a runner to execute the task. It will
trigger all the parent tasks of the task, and then execute the task.
The DefaultWorkflowRunner
is run your task in the same process. And the
RayWorkflowRunner
is run your task in a Ray cluster(Not implemented yet in community
version). Also, you can implement your own runner to run your task in your own
environment.