Skip to main content
Version: v0.6.0

RAG With AWEL

In this example, we will show how to use the AWEL library to create a RAG program.

Now, let us create a python file first_rag_with_awel.py.

In this example, we will load your knowledge from a URL and store it in a vector store.

Install Dependencies

First, you need to install the dbgpt library.

pip install "dbgpt[rag]>=0.5.2"

Prepare Embedding Model

To store the knowledge in a vector store, we need an embedding model, DB-GPT supports a lot of embedding models, here are some of them:

from dbgpt.rag.embedding import DefaultEmbeddingFactory

embeddings = DefaultEmbeddingFactory.openai()

Load Knowledge And Store In Vector Store

Then we can create a DAG which loads the knowledge from a URL and stores it in a vector store.

import asyncio
import shutil
from dbgpt.core.awel import DAG
from dbgpt.rag import ChunkParameters
from dbgpt.rag.knowledge import KnowledgeType
from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator
from dbgpt.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig

# Delete old vector store directory(/tmp/awel_rag_test_vector_store)
shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)

vector_store = ChromaStore(
vector_store_config=ChromaVectorConfig(
name="test_vstore",
persist_path="/tmp/awel_rag_test_vector_store",
embedding_fn=embeddings
)
)

with DAG("load_knowledge_dag") as knowledge_dag:
# Load knowledge from URL
knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
assembler_task = EmbeddingAssemblerOperator(
index_store=vector_store,
chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
)
knowledge_task >> assembler_task

chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/awel/"))
print(f"Chunk length: {len(chunks)}")

Retrieve Knowledge From Vector Store

Then you can retrieve the knowledge from the vector store.


from dbgpt.core.awel import MapOperator
from dbgpt.rag.operators import EmbeddingRetrieverOperator

with DAG("retriever_dag") as retriever_dag:
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
index_store=vector_store,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))
retriever_task >> content_task

chunks = asyncio.run(content_task.call("What is the AWEL?"))
print(chunks)

Prepare LLM

To build a RAG program, we need a LLM, here are some of the LLMs that DB-GPT supports:

First, you should install the openai library.

pip install openai

Then set your API key in the environment OPENAI_API_KEY.

from dbgpt.model.proxy import OpenAILLMClient

llm_client = OpenAILLMClient()

Create RAG Program

Lastly, we can create a RAG with the retrieved knowledge.


from dbgpt.core.awel import InputOperator, JoinOperator, InputSource
from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
from dbgpt.model.operators import LLMOperator

prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
If the answer cannot be obtained from the provided content, please say:
"The information provided in the knowledge base is not sufficient to answer this question.".
It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
known information:
{context}
question:
{question}
"""

with DAG("llm_rag_dag") as rag_dag:
input_task = InputOperator(input_source=InputSource.from_callable())
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
index_store=vector_store,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))

merge_task = JoinOperator(lambda context, question: {"context": context, "question": question})

prompt_task = PromptBuilderOperator(prompt)
# The model is gpt-3.5-turbo, you can replace it with other models.
req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = LLMOperator(llm_client=llm_client)
result_task = MapOperator(lambda r: r.text)

input_task >> retriever_task >> content_task >> merge_task
input_task >> merge_task

merge_task >> prompt_task >> req_build_task >> llm_task >> result_task

print(asyncio.run(result_task.call("What is the AWEL?")))

The output will be:

AWEL stands for Agentic Workflow Expression Language, which is a set of intelligent agent workflow expression language designed for large model application development. It simplifies the process by providing functionality and flexibility through its layered API design architecture, including the operator layer, AgentFrame layer, and DSL layer. Its goal is to allow developers to focus on business logic for LLMs applications without having to deal with intricate model and environment details.

Congratulations! You have created a RAG program with AWEL.

Full Code

And let's look the full code of first_rag_with_awel.py:

import asyncio
import shutil
from dbgpt.core.awel import DAG, MapOperator, InputOperator, JoinOperator, InputSource
from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
from dbgpt.rag import ChunkParameters
from dbgpt.rag.knowledge import KnowledgeType
from dbgpt.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator,
EmbeddingRetrieverOperator
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
from dbgpt.model.operators import LLMOperator
from dbgpt.model.proxy import OpenAILLMClient

# Here we use the openai embedding model, if you want to use other models, you can
# replace it according to the previous example.
embeddings = DefaultEmbeddingFactory.openai()
# Here we use the openai LLM model, if you want to use other models, you can replace
# it according to the previous example.
llm_client = OpenAILLMClient()

# Delete old vector store directory(/tmp/awel_rag_test_vector_store)
shutil.rmtree("/tmp/awel_rag_test_vector_store", ignore_errors=True)

vector_store = ChromaStore(
vector_store_config=ChromaVectorConfig(
name="test_vstore",
persist_path="/tmp/awel_rag_test_vector_store",
embedding_fn=embeddings
),
)

with DAG("load_knowledge_dag") as knowledge_dag:
# Load knowledge from URL
knowledge_task = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
assembler_task = EmbeddingAssemblerOperator(
index_store=vector_store,
chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
)
knowledge_task >> assembler_task

chunks = asyncio.run(assembler_task.call("https://docs.dbgpt.site/docs/awel/"))
print(f"Chunk length: {len(chunks)}\n")

prompt = """Based on the known information below, provide users with professional and concise answers to their questions.
If the answer cannot be obtained from the provided content, please say:
"The information provided in the knowledge base is not sufficient to answer this question.".
It is forbidden to make up information randomly. When answering, it is best to summarize according to points 1.2.3.
known information:
{context}
question:
{question}
"""

with DAG("llm_rag_dag") as rag_dag:
input_task = InputOperator(input_source=InputSource.from_callable())
retriever_task = EmbeddingRetrieverOperator(
top_k=3,
index_store=vector_store,
)
content_task = MapOperator(lambda cks: "\n".join(c.content for c in cks))

merge_task = JoinOperator(
lambda context, question: {"context": context, "question": question})

prompt_task = PromptBuilderOperator(prompt)
# The model is gpt-3.5-turbo, you can replace it with other models.
req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = LLMOperator(llm_client=llm_client)
result_task = MapOperator(lambda r: r.text)

input_task >> retriever_task >> content_task >> merge_task
input_task >> merge_task

merge_task >> prompt_task >> req_build_task >> llm_task >> result_task

print(asyncio.run(result_task.call("What is the AWEL?")))

Visualize DAGs

And we can visualize the DAGs with the following code:

knowledge_dag.visualize_dag()
rag_dag.visualize_dag()

If you execute the code in Jupyter Notebook, you can see the DAGs in the notebook.

display(knowledge_dag.show())
display(rag_dag.show())

The graph of the knowledge_dag is:

And the graph of the rag_dag is: