ACE Agent 中的流式传输#

大多数基于 ACE Agent 构建的机器人使用 LLM 来生成机器人响应。LLM 响应可能来自 NVIDIA NeMo Guardrails (例如,基于 Colang 的机器人,如 Stock 机器人) 或来自插件服务器 (例如,基于 RAG 或 LangChain 的机器人)。 在这两种情况下,都可以通过在 ACE Agent 生成响应时将其流式传输到客户端来减少感知延迟。

ACE Agent 事件接口和 Colang 2.0 机器人仅支持句子级流式传输,而 Colang 1.0 机器人支持令牌级流式传输。

Colang 2.0 机器人中的流式传输#

文本流式传输#

在 Colang 2.0 脚本中,所有机器人动作都使用 UMIM 事件进行通信。这些事件被各种微服务拾取,以执行这些动作事件 (微服务使用 Redis 流通信事件)。来自机器人的文本响应使用 UtteranceBotAction 事件进行通信。Chat Controller 微服务至少需要一个句子或部分句子才能生成 TTS。因此,Colang 2.0 机器人不使用令牌级流式传输来响应机器人,而是依赖于机器人开发人员将机器人响应分解为多个 UtteranceBotAction 事件。来自 Colang llm 库的流 (如 llm continue interaction) 默认会将机器人响应分解为多个动作。

如果您正在使用插件服务器生成文本响应,则插件端点应返回流式响应以最大限度地减少延迟。

  • 如果插件服务器被机器人调用并返回非流式响应,您应该使用 InvokeFulfillmentAction 动作来获取响应。

  • 如果来自插件服务器的响应是文本流,则可以使用 InvokeStreamingFulfillmentAction 动作开始收集流式文本块。您还可以调用 StreamingResponseFulfillmentAction 以接收作为单个响应的所有块。如果您想将响应分解为句子甚至部分句子,则可以使用相同的正则表达式模式。

    # Invoke endpoint from plugin
      $started =  await InvokeStreamingFulfillmentAction(question=$transcript,endpoint="your/endpoint")
      if $started
        # Get first sentence from plugin response
        $response = await StreamingResponseFulfillmentAction(endpoint="your/endpoint",pattern=r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s")
        while $response
          bot say $response
          # Check for next sentence
          $response = await StreamingResponseFulfillmentAction(endpoint="your/endpoint",pattern=r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s")
    
  • 如果来自插件服务器的响应是聊天引擎响应模式中的 JSON 响应流,则 JSON 块将与其响应一起解析。您可以使用 InvokeStreamingChatAction 动作开始收集流式 JSON 块。您还可以重复调用 StreamingResponseChatAction 动作以接收下一个句子。

    # Invoke /chat endpoint from plugin
      $started =  await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True)
      if $started
        # Get first sentence from RAG response
        $response = await StreamingResponseChatAction(endpoint="rag/chat")
        while $response
          bot say $response
          # Check for next sentence
          $response = await StreamingResponseChatAction(endpoint="rag/chat")
    

在此示例以及上面文本流示例中用于断句的模式是 r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<![0-9]\.)(?<![A-Z]\.)(?<=\.|\?|\!)\s"。您可以在 StreamingResponseChatActionStreamingResponseFulfillmentAction 中设置 pattern 参数,以用于您自己的拆分模式。

Colang 1.0 机器人中的流式传输#

文本流式传输#

ACE Agent 聊天引擎使用流式处理程序作为 NeMo Guardrails 和插件服务器之间的通用接口。如果启用流式传输,则流式处理程序将附加到每个查询的 NeMo Guardrails 上下文。推送到流式处理程序的任何块都将进行后处理,并根据请求端点转换为 /chat/event 端点的响应模式。

对于使用 NeMo Guardrails 形成响应的情况,将调用 generate_bot_message 动作。如果 LLM 用于机器人响应生成,则会调用 LLM 的流式端点,并将流式块添加到流式处理程序。

要从插件服务器形成响应,插件端点应返回流式响应。

Streaming in ACE Agent
  • 如果插件服务器被机器人调用并返回非流式响应,则响应不会添加到流中。

  • 如果来自插件服务器的响应是文本流,则默认情况下,流式块会添加到流式处理程序。

  • 如果来自插件服务器的响应是聊天引擎响应模式中的 JSON 响应流,则将解析 JSON 块及其响应。默认情况下,文本属性将添加到流式处理程序。

来自插件服务器的流式响应的默认行为是将块添加到流式处理程序。但是,可以使用插件的 streaming 参数或 Colang 文件中的 chat_plugin 动作禁用此行为。

define flow
  user   $answer = execute plugin(endpoint="/your/endpoint", streaming=False)
  

ACE Agent 聊天引擎具有内置保护机制,可以处理机器人使用静态响应模板或根本不创建文本响应的情况。如果您的机器人在某些情况下使用流式响应,而在其他情况下使用非流式响应,则保持启用流式传输仍然是有益的 - 聊天引擎会将静态响应作为单个块推送到流式处理程序。

流式传输异常#

在某些情况下,ACE Agent 中会禁用流式传输。

  • 如果机器人使用的 LLM 不支持流式传输。

  • 如果在 NeMo Guardrails 中启用了输出护栏。

在这两种情况下,即使在机器人配置文件中启用了流式传输,也会在机器人初始化期间禁用流式传输。

TTS 流式传输#

对于使用语音或头像的机器人,ACE Agent Chat Controller 负责与 ACE Agent 交互并处理流式响应。ACE Agent Chat Controller 默认使用流式传输,但这可以在机器人目录的 speech_config.yaml 文件中被覆盖。

dialog_manager:
  DialogManager:
    server: "http://127.0.0.1:9000"
    use_streaming: false

如果在机器人和 Chat Controller 中都启用了流式传输,则 Chat Controller 会读取传入的流,并在每个句子边界处中断流。然后,每个句子都作为文本流式传输到客户端。如果查询是语音查询,则每个句子都会传递到 TTS 模块,并且 TTS 音频缓冲区也会流式传输到客户端。实际上,对于较长的 LLM 响应,第一个文本和 TTS 块是在 LLM 生成第一个句子之后而不是在 LLM 生成完整令牌之后接收的。