插件服务器#

插件服务器是一个基于 FastAPI 的服务器,它使 ACE Agent 能够通过 REST 接口与第三方应用程序或 API 进行交互。它公开了一个 Swagger 端点,允许开发人员在沙箱环境中轻松编写和验证插件服务器。请参阅插件配置部分,了解如何在您的 bot 中添加插件。插件服务器允许您集成使用 LangChain 或 LlamaIndex 或任何其他工具构建的您自己的 agent,并通过简单的接口允许您添加语音 AI 和虚拟化身 AI 微服务。

将插件与 Colang 结合使用#

默认情况下,ACE Agent 假定插件托管在 http://127.0.0.1:9002 上。当插件服务器托管在不同的 IP 上时,可以使用 bot_config.yaml 文件中的 plugin_server_url 提供其 URL。

configs:
    plugin_server_url: http://<workstation-ip>:<port>

插件服务器作为自定义动作集成。可以调用作为插件一部分托管的端点,并指定给定的端点。

要从插件服务器形成响应,您可以使用以下预构建的自定义动作。

  • 如果 Bot 调用插件服务器并返回非流式响应,您应该使用 InvokeFulfillmentAction 动作来获取响应。

    flow tell time
      user asked current time
      $time = await InvokeFulfillmentAction(request_type="get", endpoint="/time/get_current_time")
      bot say "Current time is {$time}"
    
  • 如果来自插件服务器的响应是文本流,则可以使用 InvokeStreamingFulfillmentAction 动作,该动作开始收集流式文本块。您也可以调用 StreamingResponseFulfillmentAction 以接收作为单个响应的所有块。如果您想将响应分解为句子甚至部分句子,则可以使用相同的正则表达式模式。

    # Invoke endpoint from plugin
      $started =  await InvokeStreamingFulfillmentAction(question=$transcript,endpoint="your/endpoint")
      if $started
        # Get first sentence from plugin response
        $response = await StreamingResponseFulfillmentAction(endpoint="your/endpoint",pattern=r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s")
        while $response
          bot say $response
          # Check for next sentence
          $response = await StreamingResponseFulfillmentAction(endpoint="your/endpoint",pattern=r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s")
    
  • 如果来自插件服务器的响应是聊天引擎响应模式中的 JSON 响应流,则将使用其 Response 解析 JSON 块。您可以使用 InvokeStreamingChatAction 动作,该动作开始收集流式 JSON 块。您也可以重复调用 StreamingResponseChatAction 动作以接收下一个句子。

    # Invoke /chat endpoint from plugin
      $started =  await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True)
      if $started
        # Get first sentence from RAG response
        $response = await StreamingResponseChatAction(endpoint="rag/chat")
        while $response
          bot say $response
          # Check for next sentence
          $response = await StreamingResponseChatAction(endpoint="rag/chat")
    

用于分解句子的模式是 r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s"。您可以在 StreamingResponseChatAction 中设置 pattern 参数,以用于您自己的自定义逻辑模式。将 chat_history 设置为 True 将当前用户聊天历史记录作为元数据字段的一部分发送到 /chat 请求中。

传递 URL 参数#

我们可以使用自定义插件动作,使用逗号分隔的参数来提供 URL 参数。考虑一个示例,其中一个端点提供给定位置的温度,单位为华氏度。您可以通过逗号分隔的参数 weather 插件的 get_temperature 端点将信息传递给插件。

@router.get("/get_temperature")
def get_temperature(location: str, unit: str) -> Optional[int]:
    """Return temperature for given location"""

    # Temperature extraction logic
    return temperature_at_location

在 Colang 流程中使用 get_temperature 端点

flow bot responds temperature
    user asks temperature
    $result = execute InvokeFulfillmentAction(endpoint="/weather/get_temperature", location='santa clara', unit='fahrenheit')
    bot say "Current temperature is {$result}"

传递上下文信息和对话历史记录#

Colang 存储上下文信息,其中包含流程中使用的所有上下文变量。此上下文信息始终在 POST 请求中发送。您可以从插件接受上下文并访问此信息。可以接受存储在 ACE Agent 中的对话历史记录并在插件服务器中使用。

@router.post("/get_temperature")
def get_prompt(location: str, context: Optional[Dict[str, Any]] = {}, conv_history: Optional[List[Dict[str, str]]] = []) -> str:

    unit = context.get("unit", "fahrenheit")
    # Temperature extraction logic
    return temperature_at_location

在 Colang 流程中使用 get_temperature 端点,并带有上下文

flow bot responds temperature
    user asks temperature
    $result = execute InvokeFulfillmentAction(endpoint="/weather/get_temperature", location='santa clara', unit='fahrenheit')
    bot say "Current temperature is {$result}"

维护插件内存#

插件服务器公开了一个自定义装饰器,可以将其附加到插件的端点,以维护每个与该插件对应的 userId 的自定义内存。端点可以通过按引用修改内存对象来更新内存。

from fastapi import APIRouter
from typing import Optional, Dict, Any

from plugin_server.utils import memory_handler

router = APIRouter()

@router.get("/count")
@memory_handler
def count(memory: Optional[Dict[str, Any]] = {}, context: Optional[Dict[str, Any]] = {}) -> int:
    """
    Count the number of times this endpoint has been called by the given user/session.
    Modifies the count in the memory (by reference) and returns the count.
    """
    memory["count"] = memory.get("count", 0) + 1
    return memory["count"]

访问自定义元数据#

在某些情况下,发送到聊天引擎服务器的请求可能在正文的 Metadata 字段中包含某些自定义信息。聊天引擎在发送到插件服务器的任何 POST 请求中传递此信息。可以在请求正文的 metadata 字段中访问元数据。

@router.post("/endpoint")
def endpoint(question: str, context: Dict[str, Any] = {}, metadata: Dict[str, Any] = {}) -> str:

    logger.info(f"Metadata: {metadata}")
    ...

参数列表#

插件支持的命名参数#

参数

描述

endpoint

需要发送的插件路径。

request_type

GET/POST 请求类型。

request_timeout

等待响应的最长时间。

日志记录和监控#

插件服务器包含日志记录功能,使您能够高效地管理和访问日志数据。要启动日志记录支持,您可以使用以下 Python 代码访问名为 plugin 的记录器

import logging
logger = logging.getLogger("fulfillment")

日志文件管理

  • 插件服务器日志写入由服务器管理的日志文件。

  • 默认情况下,日志文件存储在当前工作目录 ($PWD) 中名为 log 的子目录中。

  • 每个日志文件都以以下格式命名:plugin_<host_machine_name>_<current_time>.log

  • 创建一个名为 plugin.log 的符号链接,以便于访问最新的日志文件。

  • 日志文件在达到 10 MB 大小时会进行轮换,确保日志数据保持可管理状态,并且不会占用过多的存储空间。

添加插件要求#

可以将插件要求添加到名为 requirements.txt 的文件中。此文件应放置在插件入口点所在的目录中。requirements.txt 文件应包含插件所需的所有要求。

当插件服务器启动时,它将安装 requirements.txt 文件中的所有要求。安装要求后,插件服务器将启动。

与插件服务器交互#

要启动插件服务器,请使用 aceagent 工具。

aceagent plugin-server deploy --config <plugin_config.yaml>

默认情况下,插件服务器在端口 9002 上启动。您可以在 http://<workstation-ip>:<port>/docs 访问 Swagger API。

Plugin Server

有关带有示例的详细 API 架构,请参阅插件服务器