业务逻辑脚本#

Triton 的 集成 功能支持许多用例,在这些用例中,多个模型被组合成一个管线(或更一般地,一个 DAG,有向无环图)。然而,还有许多其他用例不受支持,因为作为模型管线的一部分,它们需要循环、条件语句(if-then-else)、数据相关的控制流以及其他自定义逻辑与模型执行混合在一起。我们将自定义逻辑和模型执行的这种组合称为业务逻辑脚本 (BLS)

从 21.08 版本开始,您可以在您的 Python 模型中实现 BLS。一组新的实用程序函数允许您对 Triton 提供的其他模型执行推理请求,作为执行 Python 模型的一部分。请注意,BLS 只能在 execute 函数内部使用,并且在 initializefinalize 方法中不受支持。以下示例展示了如何使用此功能

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
  ...
    def execute(self, requests):
      ...
      # Create an InferenceRequest object. `model_name`,
      # `requested_output_names`, and `inputs` are the required arguments and
      # must be provided when constructing an InferenceRequest object. Make
      # sure to replace `inputs` argument with a list of `pb_utils.Tensor`
      # objects.
      inference_request = pb_utils.InferenceRequest(
          model_name='model_name',
          requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
          inputs=[<pb_utils.Tensor object>])

      # `pb_utils.InferenceRequest` supports request_id, correlation_id,
      # model version, timeout and preferred_memory in addition to the
      # arguments described above.
      # Note: Starting from the 24.03 release, the `correlation_id` parameter
      # supports both string and unsigned integer values.
      # These arguments are optional. An example containing all the arguments:
      # inference_request = pb_utils.InferenceRequest(model_name='model_name',
      #   requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
      #   inputs=[<list of pb_utils.Tensor objects>],
      #   request_id="1", correlation_id=4, model_version=1, flags=0, timeout=5,
      #   preferred_memory=pb_utils.PreferredMemory(
      #     pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
      #     0))

      # Execute the inference_request and wait for the response
      inference_response = inference_request.exec()

      # Check if the inference response has an error
      if inference_response.has_error():
          raise pb_utils.TritonModelException(
            inference_response.error().message())
      else:
          # Extract the output tensors from the inference response.
          output1 = pb_utils.get_output_tensor_by_name(
            inference_response, 'REQUESTED_OUTPUT_1')
          output2 = pb_utils.get_output_tensor_by_name(
            inference_response, 'REQUESTED_OUTPUT_2')

          # Decide the next steps for model execution based on the received
          # output tensors. It is possible to use the same output tensors
          # to for the final inference response too.

除了允许您执行阻塞推理请求的 inference_request.exec 函数之外,inference_request.async_exec 允许您执行异步推理请求。当您不需要立即获得推理结果时,这可能很有用。使用 async_exec 函数,可以有多个正在进行的推理请求,并且仅在需要时才等待响应。以下示例展示了如何使用 async_exec

import triton_python_backend_utils as pb_utils
import asyncio


class TritonPythonModel:
  ...

    # You must add the Python 'async' keyword to the beginning of `execute`
    # function if you want to use `async_exec` function.
    async def execute(self, requests):
      ...
      # Create an InferenceRequest object. `model_name`,
      # `requested_output_names`, and `inputs` are the required arguments and
      # must be provided when constructing an InferenceRequest object. Make
      # sure to replace `inputs` argument with a list of `pb_utils.Tensor`
      # objects.
      inference_request = pb_utils.InferenceRequest(
          model_name='model_name',
          requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
          inputs=[<pb_utils.Tensor object>])

      infer_response_awaits = []
      for i in range(4):
        # async_exec function returns an
        # [Awaitable](https://docs.pythonlang.cn/3/library/asyncio-task.html#awaitables)
        # object.
        infer_response_awaits.append(inference_request.async_exec())

      # Wait for all of the inference requests to complete.
      infer_responses = await asyncio.gather(*infer_response_awaits)

      for infer_response in infer_responses:
        # Check if the inference response has an error
        if inference_response.has_error():
            raise pb_utils.TritonModelException(
              inference_response.error().message())
        else:
            # Extract the output tensors from the inference response.
            output1 = pb_utils.get_output_tensor_by_name(
              inference_response, 'REQUESTED_OUTPUT_1')
            output2 = pb_utils.get_output_tensor_by_name(
              inference_response, 'REQUESTED_OUTPUT_2')

            # Decide the next steps for model execution based on the received
            # output tensors.

Python 后端中同步和异步 BLS 的完整示例包含在 示例 部分中。

将 BLS 与解耦模型一起使用#

从 23.03 版本开始,您可以在 默认模式解耦模式 中对解耦模型执行推理请求。通过将 decoupled 参数设置为 Trueexecasync_exec 函数将返回解耦模型返回的推理响应的 迭代器。如果 decoupled 参数设置为 False,则 execasync_exec 函数将返回单个响应,如上面的示例所示。此外,您可以通过 InferenceRequest 的构造函数中的参数 ‘timeout’ 以微秒为单位设置超时时间。如果请求超时,则请求将以错误响应。‘timeout’ 的默认值为 0,表示请求没有超时。

此外,从 23.04 版本开始,您可以灵活地选择特定设备来接收来自 BLS 调用的输出张量。这可以通过在 InferenceRequest 构造函数中设置可选的 preferred_memory 参数来实现。为此,您可以创建一个 PreferredMemory 对象,并将 preferred_memory_type 指定为 TRITONSERVER_MEMORY_GPUTRITONSERVER_MEMORY_CPU,以及 preferred_device_id 作为整数,以指示您希望在其上接收输出张量的内存类型和设备 ID。如果您未指定 preferred_memory 参数,则输出张量将分配在与接收来自 BLS 调用所针对的模型的输出张量的设备相同的设备上。

以下示例展示了如何使用此功能

import triton_python_backend_utils as pb_utils


class TritonPythonModel:
  ...
    def execute(self, requests):
      ...
      # Create an InferenceRequest object. `model_name`,
      # `requested_output_names`, and `inputs` are the required arguments and
      # must be provided when constructing an InferenceRequest object. Make
      # sure to replace `inputs` argument with a list of `pb_utils.Tensor`
      # objects.
      inference_request = pb_utils.InferenceRequest(
          model_name='model_name',
          requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
          inputs=[<pb_utils.Tensor object>])

      # `pb_utils.InferenceRequest` supports request_id, correlation_id,
      # model version, timeout and preferred_memory in addition to the
      # arguments described above.
      # Note: Starting from the 24.03 release, the `correlation_id` parameter
      # supports both string and unsigned integer values.
      # These arguments are optional. An example containing all the arguments:
      # inference_request = pb_utils.InferenceRequest(model_name='model_name',
      #   requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
      #   inputs=[<list of pb_utils.Tensor objects>],
      #   request_id="1", correlation_id="ex-4", model_version=1, flags=0, timeout=5,
      #   preferred_memory=pb_utils.PreferredMemory(
      #     pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
      #     0))

      # Execute the inference_request and wait for the response. Here we are
      # running a BLS request on a decoupled model, hence setting the parameter
      # 'decoupled' to 'True'.
      inference_responses = inference_request.exec(decoupled=True)

      for inference_response in inference_responses:
        # Check if the inference response has an error
        if inference_response.has_error():
            raise pb_utils.TritonModelException(
              inference_response.error().message())

        # For some models, it is possible that the last response is empty
        if len(infer_response.output_tensors()) > 0:
          # Extract the output tensors from the inference response.
          output1 = pb_utils.get_output_tensor_by_name(
            inference_response, 'REQUESTED_OUTPUT_1')
          output2 = pb_utils.get_output_tensor_by_name(
            inference_response, 'REQUESTED_OUTPUT_2')

          # Decide the next steps for model execution based on the received
          # output tensors. It is possible to use the same output tensors to
          # for the final inference response too.

除了允许您对解耦模型执行阻塞推理请求的 inference_request.exec(decoupled=True) 函数之外,inference_request.async_exec(decoupled=True) 允许您执行异步推理请求。当您不需要立即获得推理结果时,这可能很有用。使用 async_exec 函数,可以有多个正在进行的推理请求,并且仅在需要时才等待响应。以下示例展示了如何使用 async_exec

import triton_python_backend_utils as pb_utils
import asyncio


class TritonPythonModel:
  ...

    # You must add the Python 'async' keyword to the beginning of `execute`
    # function if you want to use `async_exec` function.
    async def execute(self, requests):
      ...
      # Create an InferenceRequest object. `model_name`,
      # `requested_output_names`, and `inputs` are the required arguments and
      # must be provided when constructing an InferenceRequest object. Make
      # sure to replace `inputs` argument with a list of `pb_utils.Tensor`
      # objects.
      inference_request = pb_utils.InferenceRequest(
          model_name='model_name',
          requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
          inputs=[<pb_utils.Tensor object>])

      infer_response_awaits = []
      for i in range(4):
        # async_exec function returns an
        # [Awaitable](https://docs.pythonlang.cn/3/library/asyncio-task.html#awaitables)
        # object.
        infer_response_awaits.append(
          inference_request.async_exec(decoupled=True))

      # Wait for all of the inference requests to complete.
      async_responses = await asyncio.gather(*infer_response_awaits)

      for infer_responses in async_responses:
        for infer_response in infer_responses:
          # Check if the inference response has an error
          if inference_response.has_error():
              raise pb_utils.TritonModelException(
                inference_response.error().message())

          # For some models, it is possible that the last response is empty
          if len(infer_response.output_tensors()) > 0:
              # Extract the output tensors from the inference response.
              output1 = pb_utils.get_output_tensor_by_name(
                inference_response, 'REQUESTED_OUTPUT_1')
              output2 = pb_utils.get_output_tensor_by_name(
                inference_response, 'REQUESTED_OUTPUT_2')

              # Decide the next steps for model execution based on the received
              # output tensors.

解耦模型的同步和异步 BLS 的完整示例包含在 示例 部分中。

从 22.04 版本开始,BLS 输出张量的生命周期得到了改进,这样,如果您的 Python 模型中不再需要张量,它将自动被释放。这可以增加您可以在模型中执行的 BLS 请求数量,而不会遇到 GPU 或共享内存不足错误。

注意:由于 Python 3.7 中引入了 async 关键字和 asyncio.run,Python 3.6 或更低版本不支持异步 BLS。

模型加载 API#

从 23.07 版本开始,您可以使用模型加载 API 来加载您的 BLS 模型所需的模型。模型加载 API 等同于 Triton C API,用于加载 tritonserver.h 中记录的模型。以下是如何使用模型加载 API 的示例

import triton_python_backend_utils as pb_utils

class TritonPythonModel:
    def initialize(self, args):
        self.model_name="onnx_model"
        # Check if the model is ready, and load the model if it is not ready.
        # You can specify the model version in string format. The version is
        # optional, and if not provided, the server will choose a version based
        # on the model and internal policy.
        if not pb_utils.is_model_ready(model_name=self.model_name,
                                       model_version="1"):
            # Load the model from the model repository
            pb_utils.load_model(model_name=self.model_name)

            # Load the model with an optional override model config in JSON
            # representation. If provided, this config will be used for
            # loading the model.
            config = "{\"backend\":\"onnxruntime\", \"version_policy\":{\"specific\":{\"versions\":[1]}}}"
            pb_utils.load_model(model_name=self.model_name, config=config)

            # Load the mode with optional override files. The override files are
            # specified as a dictionary where the key is the file path (with
            # "file:" prefix) and the value is the file content as bytes. The
            # files will form the model directory that the model will be loaded
            # from. If specified, 'config' must be provided to be the model
            # configuration of the override model directory.
            with open('models/onnx_int32_int32_int32/1/model.onnx', 'rb') as file:
                data = file.read()
            files = {"file:1/model.onnx": data}
            pb_utils.load_model(model_name=self.model_name,
                                config=config, files=files)

    def execute(self, requests):
        # Execute the model
        ...
        # If the model is no longer needed, you can unload it. You can also
        # specify whether the dependents of the model should also be unloaded by
        # setting the 'unload_dependents' parameter to True. The default value
        # is False. Need to be careful when unloading the model as it can affect
        # other model instances or other models that depend on it.
        pb_utils.unload_model(model_name=self.model_name,
                              unload_dependents=True)

请注意,仅当服务器在 显式模型控制模式 下运行时,才支持模型加载 API。此外,模型加载 API 只能在服务器运行后使用,这意味着 BLS 模型不应在服务器启动期间加载。您可以使用不同的 客户端端点 在服务器启动后加载模型。当前在 auto_complete_configfinalize 函数期间不支持模型加载 API。

将 BLS 与有状态模型一起使用#

有状态模型 需要在推理请求中设置额外的标志,以指示序列的开始和结束。pb_utils.InferenceRequest 对象中的 flags 参数可用于指示请求是否是序列中的第一个或最后一个请求。以下示例指示请求正在启动序列

inference_request = pb_utils.InferenceRequest(model_name='model_name',
  requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
  inputs=[<list of pb_utils.Tensor objects>],
  request_id="1", correlation_id=4,
  flags=pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START)

为了指示序列的结束,您可以使用 pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END 标志。如果请求同时启动和结束序列(即,序列只有一个请求),您可以使用按位 OR 运算符来同时启用这两个标志

flags = pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START | pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END

局限性#

  • 您需要确保作为模型一部分执行的推理请求不会创建循环依赖。例如,如果模型 A 对自身执行推理请求,并且没有更多模型实例准备好执行推理请求,则模型将永远阻塞在推理执行上。

  • 在解耦模式下运行 Python 模型时,不支持异步 BLS。