快速入门指南#

了解 Pipeline 和 Flow#

用户使用 pyservicemaker 需要了解的第一件事是 pipelinepipeline 由链接在一起的各种处理节点组成,用于处理或操作数据流。每个节点实例执行特定的功能,例如捕获数据、解码、处理或渲染数据。这些节点以链式方式连接,创建 pipeline,数据通过该 pipeline 流动。

可以使用给定的名称创建一个简单的空 pipeline,如下所示

from pyservicemaker import Pipeline

pipeline = Pipeline("sample-pipeline")

到目前为止,pipeline 还不能做任何事情。为了使其发挥作用,开发人员可以通过为 pipeline 创建 flow 来定义 pipeline 的数据流。Flow API 为开发人员提供了灵活性,可以根据其特定目标随后追加操作。这种方法允许模块化和可定制的工作流程,可以在需要时添加操作。每次调用 Flow 方法时,它都会获取上一个 flow 的预期输出流,并将其假定为当前 flow 的输入。因此,Flow 方法的顺序决定了 pipeline 启动时 flow 的工作方式。

创建一个简单的视频播放器应用程序#

下面的代码片段演示了如何创建一个简单的 flow 来解码和显示视频文件。

from pyservicemaker import Pipeline, Flow

pipeline = Pipeline("playback")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).render()()

一旦 flow 完成,就可以使用 '()' 调用它,因为它可调用。此调用使 pipeline 保持运行,直到处理完成。此外,开发人员可以选择调用 Pipeline.start() 和 Pipeline.wait()。下面的代码片段实际上与上面的代码片段等效。

from pyservicemaker import Pipeline, Flow

pipeline = Pipeline("playback")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).render()
pipeline.start()
pipeline.wait()

创建一个对象检测应用程序#

可以使用 infer 方法进行推理,在示例视频播放器应用程序的基础上构建更高级的对象检测 pipeline,如下所示

from pyservicemaker import Pipeline, Flow

pipeline = Pipeline("detector")
infer_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).batch_capture([video_file]).infer(infer_config).render()()

该应用程序在捕获的视频上执行基于 ResNet18 的对象检测模型。模型的配置在 infer_config 中指定,检测到的对象会自动转换为边界框,这些边界框会叠加在显示器上。

自定义对象检测示例应用程序#

在许多用例中,开发人员需要直接访问推理结果以进行各种分析。为了实现这一点,可以通过将探针附加到推理结果来增强应用程序(要深入了解探针,请参阅 处理缓冲区)。此探针允许开发人员定义自定义元数据操作符,这些操作符从与输出缓冲区关联的元数据中提取输出张量。(有关元数据的更多详细信息,请参阅 利用元数据。)

下面是上面代码片段的增强版本。

from pyservicemaker import Pipeline, Flow, Probe, BatchMetadataOperator
import torch

class TensorOutput(BatchMetadataOperator):
    def handle_metadata(self, batch_meta):
        for frame_meta in batch_meta.frame_items:
            for user_meta in frame_meta.tensor_items:
                for n, tensor in user_meta.as_tensor_output().get_layers().items():
                    print(f"tensor name: {n}")
                    print(f"tensor object: {tensor}")
                    # operations on tensors:
                    torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())

pipeline = Pipeline("detector")
infer_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
probe = Probe('tensor_retriver', TensorOutput())
Flow(pipeline).batch_capture([video_file]).infer(infer_config, output_tensor_meta=True).attach(probe).render()()

增强的代码从与 infer flow 的输出缓冲区关联的用户元数据中检索推理结果作为张量。这是通过一个探针完成的,该探针利用自定义的 BatchMetadataOperator 来提取和处理元数据。

了解 Buffer 和 Tensor#

在 pyservicemaker 中,Buffer 表示通过 pipeline 从一个节点流向另一个节点的数据块。它包含实际的媒体数据,例如音频或视频片段,并在节点之间传递以进行处理或输出。在某些情况下,开发人员希望创建自己的缓冲区并将其注入到 pipeline 中,或者从 pipeline 中使用缓冲区。pyservicemaker 提供了两个抽象类来实现这些目标。

BufferProvider#

BufferProvider 指定了一个供开发人员实现的接口,以便生成缓冲区。下面的代码片段演示了如何使用 BufferProvider 的子类从字节生成缓冲区(为了演示,该方法生成灰度图片)

from pyservicemaker import BufferProvider, Buffer

class MyBufferProvider(BufferProvider):

    def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
        super().__init__()
        self.width = width
        self.height = height
        self.format = format
        self.framerate = framerate
        self.device = device

    def generate(self, size):
        data = [128]*(self.width*self.height*3)
        return Buffer() if self.count == self.expected else Buffer(data)

建议提供额外信息,以便 pipeline 更好地理解缓冲区中的数据表示什么,例如格式、宽度、高度等。

开发人员可以创建一个没有任何参数的空缓冲区,并将空缓冲区注入到 pipeline 中将结束它。

为了将缓冲区注入到 pipeline 中,开发人员可以使用带有自定义 BufferProvider 的 inject flow。下面的代码片段展示了如何将视频缓冲区注入到 pipeline 中并显示它

Flow(Pipeline("playback")).inject([MyBufferProvider(640, 480)]).render()()

BufferRetriever#

BufferRetriever 指定了一个可自定义的接口,供开发人员使用缓冲区。它必须与 retrieve flow 一起使用。下面的代码片段展示了如何从 pipeline 获取缓冲区数据。

# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever

class MyBufferRetriever(BufferRetriever):
    def consume(self, buffer):
        tensor = buffer.extract(0)
        return 1

为了利用缓冲区数据,开发人员必须将其提取到张量中。如果数据是批处理的,则可能会提取多个张量,需要使用批次 ID。如果没有批处理,则应始终传递 0 给 extract 方法。BufferRetriever 的子类可以与 retrieve flow 一起使用,以从 pipeline 获取缓冲区

video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()

Tensor#

Tensor 本质上是一个多维数组,广泛用于深度学习领域。pyservicemaker Tensor 方便了 GPU 上的数据处理,并通过与 DLPack 的兼容性提供了与其他框架的无缝互操作性。

重用 pyservicemaker Tensor 中数据的常见做法是克隆张量并将其传递给 PyTorch Tensor。这确保了原始数据保持不变,同时实现了与其他框架的高效集成,以便进行进一步处理。下面的代码片段展示了如何将 pyservicemaker Tensor 转换为 pytorch 张量

torch_tensor = torch.utils.dlpack.from_dlpack(tensor.clone())

类似地,开发人员也可以从 pytorch 张量创建 pyservicemaker Tensor,然后将其包装到缓冲区中。下面的代码片段演示了如何在自定义 BufferProvider 中从 pytorch 张量生成缓冲区,稍后可以由 inject flow 使用。

from pyservicemaker import BufferProvider, ColorFormat, as_tensor
import torch

class MyBufferProvider(BufferProvider):
    def generate(self, size):
        torch_tensor = torch.load('tensor_data.pt')
        ds_tensor = as_tensor(torch_tensor, "HWC")
        return ds_tensor.wrap(ColorFormat.RGB)