构建基于 LangChain 的机器人#
在本节中,让我们构建一个机器人,它使用 LangChain runnable 来驱动对话。我们将构建一个 LangChain 代理,它执行 DuckDuckGo 搜索并在对话的上下文中回答问题。您可以在本教程中将此代理替换为您自己的链/代理。您可以通过以下链接了解 LangChain Runnable Interface、LangServe、LangGraph 以及 LangChain 文档 中提到的其他一些术语。您可以交叉参考DuckDuckGo LangChain 示例机器人以获取代码或教程步骤中的任何问题。
与 LangChain runnable 的交互将通过自定义插件服务器进行。在本教程中,我们将直接在自定义插件服务器中定义 LangChain runnable。但是,您也可以选择通过 LangServe 部署您的 LangChain runnable,并让您的自定义插件使用远程 runnable 与 LangServe API 进行交互。
LangChain 机器人的最小文件结构如下所示
samples └── langchain_tutorial_bot └── plugin └── langchain_agent.py └── schemas.py └── plugin_config.yaml └── speech_config.yaml └── model_config.yaml
创建一个名为 samples/langchain_tutorial_bot
的空目录,并执行以下步骤来更新机器人配置。
对于不需要使用 Colang 进行处理或防护的 LangChain 或自定义 RAG 管线,您可以直接将聊天控制器连接到插件服务器。请关注 插件服务器架构 部分以获取更多信息。我们将在下面演示此方法。
创建 LangChain 插件#
在本节中,让我们构建自定义 LangChain 插件并将其作为独立组件进行测试。
定义 ACE Agent 用于与插件服务器和聊天控制器通信的输入和输出 API 模式。使用以下内容更新
plugin/schemas.py
。from pydantic import BaseModel, Field from typing import Optional, Dict, List, Any class ChatRequest(BaseModel): Query: Optional[str] = Field(default="", description="The user query which needs to be processed.") UserId: str = Field( description="Mandatory unique identifier to recognize which user is interacting with the Chat Engine." ) Metadata: Optional[Dict[str, Any]] = Field( default={}, description="Any additional information related to the request.", ) class EventRequest(BaseModel): EventType: str = Field(default="", description="The event name which needs to be processed.") UserId: str = Field( description="Mandatory unique identifier to recognize which user is interacting with the Chat Engine." ) class ResponseField(BaseModel): Text: str = Field( default="", description="Text response to be sent out. This field will also be picked by a Text to Speech Synthesis module if enabled for speech based bots.", ) CleanedText: str = Field( default="", description="Text response from the Chat Engine with all SSML/HTML tags removed." ) NeedUserResponse: Optional[bool] = Field( default=True, description="This field can be used by end user applications to deduce if user response is needed or not for a dialog initiated query. This is set to true automatically if form filling is active and one or more slots are missing.", ) IsFinal: bool = Field( default=False, description="This field to indicate the final response chunk when streaming. The chunk with IsFinal=true will contain the full Chat Engine response attributes.", ) class ChatResponse(BaseModel): UserId: str = Field( default="", description="Unique identifier to recognize which user is interacting with the Chat Engine. This is populated from the request JSON.", ) QueryId: str = Field( default="", description="Unique identifier for the user query assigned automatically by the Chat Engine unless specified in request JSON.", ) Response: ResponseField = Field( default=ResponseField(), description="Final response template from the Chat Engine. This field can be picked up from domain rule files or can be formulated directly from custom plugin modules.", ) Metadata: Optional[Dict[str, Any]] = Field( default={"SessionId": "", "StreamId": ""}, description="Any additional information related to the request.", ) class EventResponse(BaseModel): UserId: str = Field( default="", description="Unique identifier to recognize which user is interacting with the Chat Engine. This is populated from the request JSON.", ) Events: List[Dict[str, Any]] = Field( default=[], description="The generated event list for the provided EventType from Chat Engine." ) Response: ResponseField = Field( default=ResponseField(), description="Final response template from the Chat Engine. This field can be picked up from domain rule files or can be formulated directly from custom plugin modules.", )
让我们稍微理解一下 API 及其模式。聊天控制器微服务使用了两个 API:
/chat
和/event
。管道创建和删除等事件由聊天控制器发送到
/event
端点。在大多数情况下,您不需要修改我们将在下一步中定义的默认/event
端点。用户问题由聊天控制器发送到
/chat
端点,以及UserId
和可选的QueryId
。此 API 的响应模式包含一个Response
属性,其中包含响应的详细信息。在本教程中,我们只需要管理两个子字段:Response.Text
(包含我们正在流式传输的块)和Response.IsFinal
(指示流是否完成)。
使用 LangChain 代理以及
/chat
和/event
API 创建实际的自定义插件。使用以下代码更新plugin/langchain_agent.py
from fastapi import APIRouter, status, Body, Response from fastapi.responses import StreamingResponse import logging import os import sys from typing_extensions import Annotated from typing import Union, Dict import json from langchain_community.chat_models import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables.history import RunnableWithMessageHistory from langchain.memory import ChatMessageHistory from langchain.tools.ddg_search import DuckDuckGoSearchRun from langchain_core.runnables import ( RunnableParallel, RunnablePassthrough, ) from langchain.tools import tool logger = logging.getLogger("plugin") router = APIRouter() sys.path.append(os.path.dirname(__file__)) from schemas import ChatRequest, EventRequest, EventResponse, ChatResponse EVENTS_NOT_REQUIRING_RESPONSE = [ "system.event_pipeline_acquired", "system.event_pipeline_released", "system.event_exit", ] duckduckgo = DuckDuckGoSearchRun() @tool def ddg_search(query: str): """Performs a duckduck go search""" logger.info(f"Input to DDG: {query}") answer = duckduckgo.run(query) logger.info(f"Answer from DDG: {answer}") return answer rephraser_prompt = ChatPromptTemplate.from_messages( [ ( "system", f"You are an assistant whose job is to rephrase the question into a standalone question, based on the conversation history." f"The rephrased question should be as short and simple as possible. Do not attempt to provide an answer of your own!", ), MessagesPlaceholder(variable_name="history"), ("human", "{query}"), ] ) wiki_prompt = ChatPromptTemplate.from_messages( [ ( "system", "Answer the given question from the provided context. Only use the context to form an answer.\nContext: {context}", ), ("user", "{query}"), ] ) chat_history_map = {} llm = ChatOpenAI(model="gpt-4-turbo") output_parser = StrOutputParser() chain = ( rephraser_prompt | llm | output_parser | RunnableParallel({"context": ddg_search, "query": RunnablePassthrough()}) | wiki_prompt | llm | output_parser ) chain_with_history = RunnableWithMessageHistory( chain, lambda session_id: chat_history_map.get(session_id), input_messages_key="query", history_messages_key="history", ) @router.post( "/chat", status_code=status.HTTP_200_OK, ) async def chat( request: Annotated[ ChatRequest, Body( description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON." ), ], response: Response, ) -> StreamingResponse: """ This endpoint can be used to provide response to query driven user request. """ req = request.dict(exclude_none=True) logger.info(f"Received request JSON at /chat endpoint: {json.dumps(req, indent=4)}") try: session_id = req["UserId"] question = req["Query"] if session_id not in chat_history_map: chat_history_map[session_id] = ChatMessageHistory(messages=[]) def generator(question: str, session_id: str): full_response = "" if question: for chunk in chain_with_history.stream( {"query": question}, config={"configurable": {"session_id": session_id}} ): if not chunk: continue full_response += chunk json_chunk = ChatResponse() json_chunk.Response.Text = chunk json_chunk = json.dumps(json_chunk.dict()) yield json_chunk json_chunk = ChatResponse() json_chunk.Response.IsFinal = True json_chunk.Response.CleanedText = full_response json_chunk = json.dumps(json_chunk.dict()) yield json_chunk return StreamingResponse(generator(question, session_id), media_type="text/event-stream") except Exception as e: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR return {"StatusMessage": str(e)} @router.post("/event", status_code=status.HTTP_200_OK) async def event( request: Annotated[ EventRequest, Body( description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON." ), ], response: Response, ) -> Union[EventResponse, Dict[str, str]]: """ This endpoint can be used to provide response to an event driven user request. """ req = request.dict(exclude_none=True) logger.info(f"Received request JSON at /event endpoint: {json.dumps(req, indent=4)}") try: resp = EventResponse() resp.UserId = req["UserId"] resp.Response.IsFinal = True if req["EventType"] in EVENTS_NOT_REQUIRING_RESPONSE: resp.Response.NeedUserResponse = False return resp except Exception as e: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR return {"StatusMessage": str(e)}
让我们理解一下上面的代码。
ddg_search
工具接收查询,执行 DuckDuckGo 搜索,并返回最相关的响应。rephraser_prompt
用于根据对话历史(由history
占位符表示)将当前问题改述为单个简单问题。chain
runnable 执行查询的改述,将改述后的查询传递给ddg_search
工具,最后再次调用 LLM 以从 DuckDuckGo 搜索结果中获取答案。chain_with_history
是我们将用于生成答案的实际 runnable。它根据session_id
获取正确的对话历史,并使用正确的输入填充rephraser_prompt
。最后,
/chat
端点使用正确的输入调用chain_with_history
。在本例中,我们返回一个生成器,我们在其中调用 runnable 的stream
方法,将响应格式化为ChatResponse
模式,并将块推送到响应流中。
如果您正在使用自定义 LangChain runnable,或者如果您想修改调用 runnable 的方式,您只需要在
generator
方法中进行更改。注册此插件。将以下内容添加到
plugin_config.yaml
。config: workers: 1 timeout: 30 plugins: - name: langchain path: ./plugin/langchain_agent.py
在
deploy/docker/dockerfiles/plugin_server.Dockerfile
中添加此 runnable 使用的 Python 依赖项。这将在构建插件服务器时安装自定义依赖项。############################## # Install custom dependencies ############################## RUN pip install langchain==0.1.1 \ langchain-community==0.0.13 \ langchain-core==0.1.12 \ duckduckgo-search==5.3.1b1
注意
如果您在插件服务器中看到崩溃或从 DuckDuckGo 获取响应时遇到问题,请尝试使用更新版本的 duckduckgo-search
。
部署插件服务器以进行测试。
如果尚未设置,请设置 OpenAI API 密钥。
export OPENAI_API_KEY=...
使用 Docker 环境 运行机器人。
export BOT_PATH=./samples/langchain_tutorial_bot source deploy/docker/docker_init.sh docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
我们可以通过运行以下 CURL 命令来测试端点
curl -X 'POST' \ 'http://127.0.0.1:9002/langchain/chat' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "Query": "Who is the president of the United States?", "UserId": "user1" }'
完成插件测试后,要停止服务器,请运行
docker compose -f deploy/docker/docker-compose.yml down
将插件连接到聊天控制器#
现在插件已正常工作,让我们创建配置以将聊天控制器连接到插件服务器,并启用语音。
从
samples/chitchat_bot
复制model_config.yaml
和speech_config.yaml
文件。它们代表语音管线的通用设置。更新
speech_config.yaml
文件中dialog_manager
组件中的server
URL,以指向我们在上一节中创建的插件。dialog_manager: DialogManager: server: "http://127.0.0.1:9002/langchain" use_streaming: true
通过此更改,聊天控制器将直接调用插件服务器的
/chat
和/event
端点。使用 Docker 环境部署机器人。
设置
docker-compose.yml
文件所需的环境变量。export BOT_PATH=./samples/langchain_tutorial_bot/ source deploy/docker/docker_init.sh
对于插件服务器架构机器人,我们需要为聊天控制器微服务使用
speech_lite
管线配置。更新deploy/docker/docker_init.sh
中的PIPELINE
变量,或手动设置PIPELINE
环境变量来覆盖它。export PIPELINE=speech_lite
部署 Riva ASR 和 TTS 语音模型。
docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
部署带有 LangChain 插件的插件服务器。
docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
部署带有 gRPC 接口的聊天控制器微服务。
docker compose -f deploy/docker/docker-compose.yml up chat-controller -d
部署语音示例前端应用程序。
docker compose -f deploy/docker/docker-compose.yml up bot-web-ui-client bot-web-ui-speech -d
请注意,我们根本没有部署聊天引擎微服务。
您可以使用浏览器在
http://<YOUR_IP_ADDRESS>:7006
与机器人进行交互。
这是一个与机器人的示例对话。

注意
如果您想将 Colang 与 LangChain Agent 一起使用,或者您想添加更多模态,请参考 使用 Colang 2.0 和事件接口构建机器人 和 构建低延迟语音到语音 RAG 机器人 教程。