构建基于 LangChain 的机器人#

在本节中,让我们构建一个机器人,它使用 LangChain runnable 来驱动对话。我们将构建一个 LangChain 代理,它执行 DuckDuckGo 搜索并在对话的上下文中回答问题。您可以在本教程中将此代理替换为您自己的链/代理。您可以通过以下链接了解 LangChain Runnable InterfaceLangServeLangGraph 以及 LangChain 文档 中提到的其他一些术语。您可以交叉参考DuckDuckGo LangChain 示例机器人以获取代码或教程步骤中的任何问题。

与 LangChain runnable 的交互将通过自定义插件服务器进行。在本教程中,我们将直接在自定义插件服务器中定义 LangChain runnable。但是,您也可以选择通过 LangServe 部署您的 LangChain runnable,并让您的自定义插件使用远程 runnable 与 LangServe API 进行交互。

LangChain 机器人的最小文件结构如下所示

samples
└── langchain_tutorial_bot
    └── plugin
        └── langchain_agent.py
        └── schemas.py
    └── plugin_config.yaml
    └── speech_config.yaml
    └── model_config.yaml

创建一个名为 samples/langchain_tutorial_bot 的空目录,并执行以下步骤来更新机器人配置。

对于不需要使用 Colang 进行处理或防护的 LangChain 或自定义 RAG 管线,您可以直接将聊天控制器连接到插件服务器。请关注 插件服务器架构 部分以获取更多信息。我们将在下面演示此方法。

创建 LangChain 插件#

在本节中,让我们构建自定义 LangChain 插件并将其作为独立组件进行测试。

  1. 定义 ACE Agent 用于与插件服务器和聊天控制器通信的输入和输出 API 模式。使用以下内容更新 plugin/schemas.py

    from pydantic import BaseModel, Field
    from typing import Optional, Dict, List, Any
    
    
    class ChatRequest(BaseModel):
        Query: Optional[str] = Field(default="", description="The user query which needs to be processed.")
        UserId: str = Field(
            description="Mandatory unique identifier to recognize which user is interacting with the Chat Engine."
        )
    Metadata: Optional[Dict[str, Any]] = Field(
        default={},
        description="Any additional information related to the request.",
    )
    
    
    class EventRequest(BaseModel):
        EventType: str = Field(default="", description="The event name which needs to be processed.")
        UserId: str = Field(
            description="Mandatory unique identifier to recognize which user is interacting with the Chat Engine."
        )
    
    
    class ResponseField(BaseModel):
        Text: str = Field(
            default="",
            description="Text response to be sent out. This field will also be picked by a Text to Speech Synthesis module if enabled for speech based bots.",
        )
        CleanedText: str = Field(
            default="", description="Text response from the Chat Engine with all SSML/HTML tags removed."
        )
        NeedUserResponse: Optional[bool] = Field(
            default=True,
            description="This field can be used by end user applications to deduce if user response is needed or not for a dialog initiated query. This is set to true automatically if form filling is active and one or more slots are missing.",
        )
        IsFinal: bool = Field(
            default=False,
            description="This field to indicate the final response chunk when streaming. The chunk with IsFinal=true will contain the full Chat Engine response attributes.",
        )
    
    
    class ChatResponse(BaseModel):
        UserId: str = Field(
            default="",
            description="Unique identifier to recognize which user is interacting with the Chat Engine. This is populated from the request JSON.",
        )
        QueryId: str = Field(
            default="",
            description="Unique identifier for the user query assigned automatically by the Chat Engine unless specified in request JSON.",
        )
        Response: ResponseField = Field(
            default=ResponseField(),
            description="Final response template from the Chat Engine. This field can be picked up from domain rule files or can be formulated directly from custom plugin modules.",
        )
        Metadata: Optional[Dict[str, Any]] = Field(
            default={"SessionId": "", "StreamId": ""},
            description="Any additional information related to the request.",
        )
    
    
    class EventResponse(BaseModel):
        UserId: str = Field(
            default="",
            description="Unique identifier to recognize which user is interacting with the Chat Engine. This is populated from the request JSON.",
        )
        Events: List[Dict[str, Any]] = Field(
            default=[], description="The generated event list for the provided EventType from Chat Engine."
        )
        Response: ResponseField = Field(
            default=ResponseField(),
            description="Final response template from the Chat Engine. This field can be picked up from domain rule files or can be formulated directly from custom plugin modules.",
        )
    

    让我们稍微理解一下 API 及其模式。聊天控制器微服务使用了两个 API:/chat/event

    1. 管道创建和删除等事件由聊天控制器发送到 /event 端点。在大多数情况下,您不需要修改我们将在下一步中定义的默认 /event 端点。

    2. 用户问题由聊天控制器发送到 /chat 端点,以及 UserId 和可选的 QueryId。此 API 的响应模式包含一个 Response 属性,其中包含响应的详细信息。在本教程中,我们只需要管理两个子字段:Response.Text(包含我们正在流式传输的块)和 Response.IsFinal(指示流是否完成)。

  2. 使用 LangChain 代理以及 /chat/event API 创建实际的自定义插件。使用以下代码更新 plugin/langchain_agent.py

    from fastapi import APIRouter, status, Body, Response
    from fastapi.responses import StreamingResponse
    import logging
    import os
    import sys
    from typing_extensions import Annotated
    from typing import Union, Dict
    import json
    
    from langchain_community.chat_models import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.runnables.history import RunnableWithMessageHistory
    from langchain.memory import ChatMessageHistory
    from langchain.tools.ddg_search import DuckDuckGoSearchRun
    from langchain_core.runnables import (
        RunnableParallel,
        RunnablePassthrough,
    )
    from langchain.tools import tool
    
    logger = logging.getLogger("plugin")
    router = APIRouter()
    
    sys.path.append(os.path.dirname(__file__))
    
    from schemas import ChatRequest, EventRequest, EventResponse, ChatResponse
    
    EVENTS_NOT_REQUIRING_RESPONSE = [
        "system.event_pipeline_acquired",
        "system.event_pipeline_released",
        "system.event_exit",
    ]
    
    duckduckgo = DuckDuckGoSearchRun()
    
    
    @tool
    def ddg_search(query: str):
        """Performs a duckduck go search"""
    
        logger.info(f"Input to DDG: {query}")
        answer = duckduckgo.run(query)
        logger.info(f"Answer from DDG: {answer}")
        return answer
    
    
    rephraser_prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                f"You are an assistant whose job is to rephrase the question into a standalone question, based on the conversation history."
                f"The rephrased question should be as short and simple as possible. Do not attempt to provide an answer of your own!",
            ),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{query}"),
        ]
    )
    
    wiki_prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "Answer the given question from the provided context. Only use the context to form an answer.\nContext: {context}",
            ),
            ("user", "{query}"),
        ]
    )
    
    chat_history_map = {}
    llm = ChatOpenAI(model="gpt-4-turbo")
    output_parser = StrOutputParser()
    
    chain = (
        rephraser_prompt
        | llm
        | output_parser
        | RunnableParallel({"context": ddg_search, "query": RunnablePassthrough()})
        | wiki_prompt
        | llm
        | output_parser
    )
    
    chain_with_history = RunnableWithMessageHistory(
        chain,
        lambda session_id: chat_history_map.get(session_id),
        input_messages_key="query",
        history_messages_key="history",
    )
    
    
    @router.post(
        "/chat",
        status_code=status.HTTP_200_OK,
    )
    async def chat(
        request: Annotated[
            ChatRequest,
            Body(
                description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON."
            ),
        ],
        response: Response,
    ) -> StreamingResponse:
        """
        This endpoint can be used to provide response to query driven user request.
        """
    
        req = request.dict(exclude_none=True)
        logger.info(f"Received request JSON at /chat endpoint: {json.dumps(req, indent=4)}")
    
        try:
    
            session_id = req["UserId"]
            question = req["Query"]
    
            if session_id not in chat_history_map:
                chat_history_map[session_id] = ChatMessageHistory(messages=[])
    
            def generator(question: str, session_id: str):
                full_response = ""
                if question:
                    for chunk in chain_with_history.stream(
                        {"query": question}, config={"configurable": {"session_id": session_id}}
                    ):
                        if not chunk:
                            continue
                        full_response += chunk
    
                        json_chunk = ChatResponse()
                        json_chunk.Response.Text = chunk
                        json_chunk = json.dumps(json_chunk.dict())
                        yield json_chunk
    
                json_chunk = ChatResponse()
                json_chunk.Response.IsFinal = True
                json_chunk.Response.CleanedText = full_response
                json_chunk = json.dumps(json_chunk.dict())
                yield json_chunk
    
            return StreamingResponse(generator(question, session_id), media_type="text/event-stream")
    
        except Exception as e:
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"StatusMessage": str(e)}
    
    
    @router.post("/event", status_code=status.HTTP_200_OK)
    async def event(
        request: Annotated[
            EventRequest,
            Body(
                description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON."
            ),
        ],
        response: Response,
    ) -> Union[EventResponse, Dict[str, str]]:
        """
        This endpoint can be used to provide response to an event driven user request.
        """
    
        req = request.dict(exclude_none=True)
        logger.info(f"Received request JSON at /event endpoint: {json.dumps(req, indent=4)}")
    
        try:
            resp = EventResponse()
            resp.UserId = req["UserId"]
            resp.Response.IsFinal = True
    
            if req["EventType"] in EVENTS_NOT_REQUIRING_RESPONSE:
                resp.Response.NeedUserResponse = False
    
            return resp
        except Exception as e:
            response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
            return {"StatusMessage": str(e)}
    

    让我们理解一下上面的代码。

    1. ddg_search 工具接收查询,执行 DuckDuckGo 搜索,并返回最相关的响应。

    2. rephraser_prompt 用于根据对话历史(由 history 占位符表示)将当前问题改述为单个简单问题。

    3. chain runnable 执行查询的改述,将改述后的查询传递给 ddg_search 工具,最后再次调用 LLM 以从 DuckDuckGo 搜索结果中获取答案。

    4. chain_with_history 是我们将用于生成答案的实际 runnable。它根据 session_id 获取正确的对话历史,并使用正确的输入填充 rephraser_prompt

    5. 最后,/chat 端点使用正确的输入调用 chain_with_history。在本例中,我们返回一个生成器,我们在其中调用 runnable 的 stream 方法,将响应格式化为 ChatResponse 模式,并将块推送到响应流中。

    如果您正在使用自定义 LangChain runnable,或者如果您想修改调用 runnable 的方式,您只需要在 generator 方法中进行更改。

  3. 注册此插件。将以下内容添加到 plugin_config.yaml

    config:
      workers: 1
      timeout: 30
    
    plugins:
      - name: langchain
        path: ./plugin/langchain_agent.py
    
  4. deploy/docker/dockerfiles/plugin_server.Dockerfile 中添加此 runnable 使用的 Python 依赖项。这将在构建插件服务器时安装自定义依赖项。

    ##############################
    # Install custom dependencies
    ##############################
    RUN pip install langchain==0.1.1 \
        langchain-community==0.0.13 \
        langchain-core==0.1.12 \
        duckduckgo-search==5.3.1b1
    

注意

如果您在插件服务器中看到崩溃或从 DuckDuckGo 获取响应时遇到问题,请尝试使用更新版本的 duckduckgo-search

  1. 部署插件服务器以进行测试。

    1. 如果尚未设置,请设置 OpenAI API 密钥。

      export OPENAI_API_KEY=...
      
    2. 使用 Docker 环境 运行机器人。

      export BOT_PATH=./samples/langchain_tutorial_bot
      source deploy/docker/docker_init.sh
      docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
      
  2. 我们可以通过运行以下 CURL 命令来测试端点

    curl -X 'POST' \
      'http://127.0.0.1:9002/langchain/chat' \
      -H 'accept: application/json' \
      -H 'Content-Type: application/json' \
      -d '{
      "Query": "Who is the president of the United States?",
      "UserId": "user1"
    }'
    
  3. 完成插件测试后,要停止服务器,请运行

    docker compose -f deploy/docker/docker-compose.yml down
    

将插件连接到聊天控制器#

现在插件已正常工作,让我们创建配置以将聊天控制器连接到插件服务器,并启用语音。

  1. samples/chitchat_bot 复制 model_config.yamlspeech_config.yaml 文件。它们代表语音管线的通用设置。

  2. 更新 speech_config.yaml 文件中 dialog_manager 组件中的 server URL,以指向我们在上一节中创建的插件。

    dialog_manager:
      DialogManager:
        server: "http://127.0.0.1:9002/langchain"
        use_streaming: true
    

    通过此更改,聊天控制器将直接调用插件服务器的 /chat/event 端点。

  3. 使用 Docker 环境部署机器人。

  4. 设置 docker-compose.yml 文件所需的环境变量。

    export BOT_PATH=./samples/langchain_tutorial_bot/
    source deploy/docker/docker_init.sh
    
  5. 对于插件服务器架构机器人,我们需要为聊天控制器微服务使用 speech_lite 管线配置。更新 deploy/docker/docker_init.sh 中的 PIPELINE 变量,或手动设置 PIPELINE 环境变量来覆盖它。

    export PIPELINE=speech_lite
    
  6. 部署 Riva ASR 和 TTS 语音模型。

    docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
    
  7. 部署带有 LangChain 插件的插件服务器。

    docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
    
  8. 部署带有 gRPC 接口的聊天控制器微服务。

    docker compose -f deploy/docker/docker-compose.yml up chat-controller -d
    
  9. 部署语音示例前端应用程序。

    docker compose -f deploy/docker/docker-compose.yml up bot-web-ui-client bot-web-ui-speech -d
    

    请注意,我们根本没有部署聊天引擎微服务。

    您可以使用浏览器在 http://<YOUR_IP_ADDRESS>:7006 与机器人进行交互。

这是一个与机器人的示例对话。

LangChain Bot

注意

如果您想将 Colang 与 LangChain Agent 一起使用,或者您想添加更多模态,请参考 使用 Colang 2.0 和事件接口构建机器人构建低延迟语音到语音 RAG 机器人 教程。