业务逻辑脚本#
Triton 的 集成 功能支持许多用例,在这些用例中,多个模型被组合成一个管线(或更一般地,一个 DAG,有向无环图)。然而,还有许多其他用例不受支持,因为作为模型管线的一部分,它们需要循环、条件语句(if-then-else)、数据相关的控制流以及其他自定义逻辑与模型执行混合在一起。我们将自定义逻辑和模型执行的这种组合称为业务逻辑脚本 (BLS)。
从 21.08 版本开始,您可以在您的 Python 模型中实现 BLS。一组新的实用程序函数允许您对 Triton 提供的其他模型执行推理请求,作为执行 Python 模型的一部分。请注意,BLS 只能在 execute
函数内部使用,并且在 initialize
或 finalize
方法中不受支持。以下示例展示了如何使用此功能
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
...
def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
# `pb_utils.InferenceRequest` supports request_id, correlation_id,
# model version, timeout and preferred_memory in addition to the
# arguments described above.
# Note: Starting from the 24.03 release, the `correlation_id` parameter
# supports both string and unsigned integer values.
# These arguments are optional. An example containing all the arguments:
# inference_request = pb_utils.InferenceRequest(model_name='model_name',
# requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
# inputs=[<list of pb_utils.Tensor objects>],
# request_id="1", correlation_id=4, model_version=1, flags=0, timeout=5,
# preferred_memory=pb_utils.PreferredMemory(
# pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
# 0))
# Execute the inference_request and wait for the response
inference_response = inference_request.exec()
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
else:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors. It is possible to use the same output tensors
# to for the final inference response too.
除了允许您执行阻塞推理请求的 inference_request.exec
函数之外,inference_request.async_exec
允许您执行异步推理请求。当您不需要立即获得推理结果时,这可能很有用。使用 async_exec
函数,可以有多个正在进行的推理请求,并且仅在需要时才等待响应。以下示例展示了如何使用 async_exec
import triton_python_backend_utils as pb_utils
import asyncio
class TritonPythonModel:
...
# You must add the Python 'async' keyword to the beginning of `execute`
# function if you want to use `async_exec` function.
async def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
infer_response_awaits = []
for i in range(4):
# async_exec function returns an
# [Awaitable](https://docs.pythonlang.cn/3/library/asyncio-task.html#awaitables)
# object.
infer_response_awaits.append(inference_request.async_exec())
# Wait for all of the inference requests to complete.
infer_responses = await asyncio.gather(*infer_response_awaits)
for infer_response in infer_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
else:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors.
Python 后端中同步和异步 BLS 的完整示例包含在 示例 部分中。
将 BLS 与解耦模型一起使用#
从 23.03 版本开始,您可以在 默认模式 和 解耦模式 中对解耦模型执行推理请求。通过将 decoupled
参数设置为 True
,exec
和 async_exec
函数将返回解耦模型返回的推理响应的 迭代器。如果 decoupled
参数设置为 False
,则 exec
和 async_exec
函数将返回单个响应,如上面的示例所示。此外,您可以通过 InferenceRequest
的构造函数中的参数 ‘timeout’ 以微秒为单位设置超时时间。如果请求超时,则请求将以错误响应。‘timeout’ 的默认值为 0,表示请求没有超时。
此外,从 23.04 版本开始,您可以灵活地选择特定设备来接收来自 BLS 调用的输出张量。这可以通过在 InferenceRequest
构造函数中设置可选的 preferred_memory
参数来实现。为此,您可以创建一个 PreferredMemory
对象,并将 preferred_memory_type
指定为 TRITONSERVER_MEMORY_GPU
或 TRITONSERVER_MEMORY_CPU
,以及 preferred_device_id
作为整数,以指示您希望在其上接收输出张量的内存类型和设备 ID。如果您未指定 preferred_memory
参数,则输出张量将分配在与接收来自 BLS 调用所针对的模型的输出张量的设备相同的设备上。
以下示例展示了如何使用此功能
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
...
def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
# `pb_utils.InferenceRequest` supports request_id, correlation_id,
# model version, timeout and preferred_memory in addition to the
# arguments described above.
# Note: Starting from the 24.03 release, the `correlation_id` parameter
# supports both string and unsigned integer values.
# These arguments are optional. An example containing all the arguments:
# inference_request = pb_utils.InferenceRequest(model_name='model_name',
# requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
# inputs=[<list of pb_utils.Tensor objects>],
# request_id="1", correlation_id="ex-4", model_version=1, flags=0, timeout=5,
# preferred_memory=pb_utils.PreferredMemory(
# pb_utils.TRITONSERVER_MEMORY_GPU, # or pb_utils.TRITONSERVER_MEMORY_CPU
# 0))
# Execute the inference_request and wait for the response. Here we are
# running a BLS request on a decoupled model, hence setting the parameter
# 'decoupled' to 'True'.
inference_responses = inference_request.exec(decoupled=True)
for inference_response in inference_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
# For some models, it is possible that the last response is empty
if len(infer_response.output_tensors()) > 0:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors. It is possible to use the same output tensors to
# for the final inference response too.
除了允许您对解耦模型执行阻塞推理请求的 inference_request.exec(decoupled=True)
函数之外,inference_request.async_exec(decoupled=True)
允许您执行异步推理请求。当您不需要立即获得推理结果时,这可能很有用。使用 async_exec
函数,可以有多个正在进行的推理请求,并且仅在需要时才等待响应。以下示例展示了如何使用 async_exec
import triton_python_backend_utils as pb_utils
import asyncio
class TritonPythonModel:
...
# You must add the Python 'async' keyword to the beginning of `execute`
# function if you want to use `async_exec` function.
async def execute(self, requests):
...
# Create an InferenceRequest object. `model_name`,
# `requested_output_names`, and `inputs` are the required arguments and
# must be provided when constructing an InferenceRequest object. Make
# sure to replace `inputs` argument with a list of `pb_utils.Tensor`
# objects.
inference_request = pb_utils.InferenceRequest(
model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<pb_utils.Tensor object>])
infer_response_awaits = []
for i in range(4):
# async_exec function returns an
# [Awaitable](https://docs.pythonlang.cn/3/library/asyncio-task.html#awaitables)
# object.
infer_response_awaits.append(
inference_request.async_exec(decoupled=True))
# Wait for all of the inference requests to complete.
async_responses = await asyncio.gather(*infer_response_awaits)
for infer_responses in async_responses:
for infer_response in infer_responses:
# Check if the inference response has an error
if inference_response.has_error():
raise pb_utils.TritonModelException(
inference_response.error().message())
# For some models, it is possible that the last response is empty
if len(infer_response.output_tensors()) > 0:
# Extract the output tensors from the inference response.
output1 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_1')
output2 = pb_utils.get_output_tensor_by_name(
inference_response, 'REQUESTED_OUTPUT_2')
# Decide the next steps for model execution based on the received
# output tensors.
解耦模型的同步和异步 BLS 的完整示例包含在 示例 部分中。
从 22.04 版本开始,BLS 输出张量的生命周期得到了改进,这样,如果您的 Python 模型中不再需要张量,它将自动被释放。这可以增加您可以在模型中执行的 BLS 请求数量,而不会遇到 GPU 或共享内存不足错误。
注意:由于 Python 3.7 中引入了 async
关键字和 asyncio.run
,Python 3.6 或更低版本不支持异步 BLS。
模型加载 API#
从 23.07 版本开始,您可以使用模型加载 API 来加载您的 BLS 模型所需的模型。模型加载 API 等同于 Triton C API,用于加载 tritonserver.h 中记录的模型。以下是如何使用模型加载 API 的示例
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
def initialize(self, args):
self.model_name="onnx_model"
# Check if the model is ready, and load the model if it is not ready.
# You can specify the model version in string format. The version is
# optional, and if not provided, the server will choose a version based
# on the model and internal policy.
if not pb_utils.is_model_ready(model_name=self.model_name,
model_version="1"):
# Load the model from the model repository
pb_utils.load_model(model_name=self.model_name)
# Load the model with an optional override model config in JSON
# representation. If provided, this config will be used for
# loading the model.
config = "{\"backend\":\"onnxruntime\", \"version_policy\":{\"specific\":{\"versions\":[1]}}}"
pb_utils.load_model(model_name=self.model_name, config=config)
# Load the mode with optional override files. The override files are
# specified as a dictionary where the key is the file path (with
# "file:" prefix) and the value is the file content as bytes. The
# files will form the model directory that the model will be loaded
# from. If specified, 'config' must be provided to be the model
# configuration of the override model directory.
with open('models/onnx_int32_int32_int32/1/model.onnx', 'rb') as file:
data = file.read()
files = {"file:1/model.onnx": data}
pb_utils.load_model(model_name=self.model_name,
config=config, files=files)
def execute(self, requests):
# Execute the model
...
# If the model is no longer needed, you can unload it. You can also
# specify whether the dependents of the model should also be unloaded by
# setting the 'unload_dependents' parameter to True. The default value
# is False. Need to be careful when unloading the model as it can affect
# other model instances or other models that depend on it.
pb_utils.unload_model(model_name=self.model_name,
unload_dependents=True)
请注意,仅当服务器在 显式模型控制模式 下运行时,才支持模型加载 API。此外,模型加载 API 只能在服务器运行后使用,这意味着 BLS 模型不应在服务器启动期间加载。您可以使用不同的 客户端端点 在服务器启动后加载模型。当前在 auto_complete_config
和 finalize
函数期间不支持模型加载 API。
将 BLS 与有状态模型一起使用#
有状态模型 需要在推理请求中设置额外的标志,以指示序列的开始和结束。pb_utils.InferenceRequest
对象中的 flags
参数可用于指示请求是否是序列中的第一个或最后一个请求。以下示例指示请求正在启动序列
inference_request = pb_utils.InferenceRequest(model_name='model_name',
requested_output_names=['REQUESTED_OUTPUT_1', 'REQUESTED_OUTPUT_2'],
inputs=[<list of pb_utils.Tensor objects>],
request_id="1", correlation_id=4,
flags=pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START)
为了指示序列的结束,您可以使用 pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END
标志。如果请求同时启动和结束序列(即,序列只有一个请求),您可以使用按位 OR 运算符来同时启用这两个标志
flags = pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_START | pb_utils.TRITONSERVER_REQUEST_FLAG_SEQUENCE_END
局限性#
您需要确保作为模型一部分执行的推理请求不会创建循环依赖。例如,如果模型 A 对自身执行推理请求,并且没有更多模型实例准备好执行推理请求,则模型将永远阻塞在推理执行上。
在解耦模式下运行 Python 模型时,不支持异步 BLS。