Pipeline API 简介#

对于已经熟悉 DeepStream SDK 的开发人员,pyservicemaker 提供了 Pipeline API,使他们能够充分利用 DeepStream 的功能。

使用 Pipeline API 在 Python 中创建示例 Deepstream 应用程序与使用 C++ API 的过程非常相似,明显的区别在于它不需要 Makefile 或构建过程。

from pyservicemaker import Pipeline
import sys

CONFIG_FILE_PATH = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"

if __name__ == '__main__':
    pipeline = Pipeline("sample-pipeline")
    pipeline.add("nvurisrcbin", "src", {"uri": sys.argv[1]})
    pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720})
    pipeline.add("nvinferbin", "infer", {"config-file-path": CONFIG_FILE_PATH})
    pipeline.add("nvosdbin", "osd").add("nveglglessink", "sink")
    pipeline.link(("src", "mux"), ("", "sink_%u")).link("mux", "infer", "osd", "sink")
    pipeline.start().wait()

一个功能性 pipeline 需要添加、配置和正确链接来自 Deepstream 插件的适当元素。这可以使用 Pipeline API 以流畅的方式无缝实现

Pipeline pipeline("sample-pipeline")
// nvstreammux is the factory name in Deepstream to create a streammuxer Element
// mux is the name of the Element instance
// multiple key value pairs can be appended to configure the added Element
pipeline.add("nvstreammux", "mux", "batch-size", 1, "width", 1280, "height", 720)

“add”方法用于将所有必要的元素节点合并到 pipeline 实例中。此方法采用元素的注册名称和节点名称作为参数,后跟一个字典,用于指定元素的属性。在添加期间赋予元素的节点名称可用于在 pipeline 中引用该元素。

有关每个属性如何影响相应元素的详细信息,DS 插件概述 是主要的和最全面的资源。开发人员还可以使用元素注册名称运行 gst-inspect-1.0 来检查其技术规范,例如,在查找插件手册中的 nvstreammux 后,我们知道该元素用于批处理来自多个输入的缓冲区,并且需要设置“batch-size”、“width”和“height”。

在将元素节点添加到 pipeline 后,“link”方法完成流式传输路径的构建。此方法提供两种变体

  • 更简单的一种接受要按顺序链接的所有实例的名称。

pipeline.link("mux", "infer", "osd", "sink")
  • 更复杂的一种链接两个实例,利用两个元组来指定源名称和目标名称,以及源和目标 pad 以指示特定的媒体流。

pipeline.link(("src", "mux"), ("", "sink_%u"))

第二种“link”方法主要解决动态路径,例如“nvstreammux”元素遇到的那些路径。此元素具有动态输入和一个名为“sink_%u”的模板 pad,这需要使用此方法来建立适当的连接。

要启动 pipeline 并等待流到达其末尾,需要按顺序调用“start”方法和“wait”方法。

pipeline.start().wait()

现在,让我们将代码另存为 sample_app.py 并从控制台运行应用程序

$ python3 sample_app.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4

推理正在按预期使用提供的模型配置运行。但是,如果我们希望检查结果,则需要通过附加探针来执行其他步骤。

探针是用于拦截已处理的缓冲区和元数据的预定义类。通过为其提供 BatchMetadataOperator 的合适实现,我们可以从“nvinfer”生成的元数据中提取对象检测结果。

以下是 BatchMetadataOperator 的示例实现,它计算对象并在下游“osd”元素中显示数字。“handle_metadata”方法在每个缓冲区批处理的 pipeline 中调用,其中“batch_meta”对象包装批处理元数据。开发人员可以迭代“batch_meta”对象的帧元数据,然后迭代对象元数据的帧元数据。

在示例代码中,handle_metadata 检查批处理中每帧的对象信息,计算每个类别的计数,并将包含文本标签的显示元数据对象附加到帧。因此,数字会显示在视频输出中。有关元数据用法的更多详细信息,请参阅 利用元数据

from pyservicemaker import BatchMetadataOperator, Probe, osd

class ObjectCounterMarker(BatchMetadataOperator):
    def handle_metadata(self, batch_meta):
        for frame_meta in batch_meta.frame_items:
            vehcle_count = 0
            person_count = 0
            for object_meta in frame_meta.object_items:
                class_id = object_meta.class_id
                if class_id == 0:
                    vehcle_count += 1
                elif class_id == 2:
                    person_count += 1
            print(f"Object Counter: Pad Idx={frame_meta.pad_index},"
                f"Frame Number={frame_meta.frame_number},"
                f"Vehicle Count={vehcle_count}, Person Count={person_count}")
            text = f"Person={person_count},Vehicle={vehcle_count}"
            display_meta = batch_meta.acquire_display_meta()
            label = osd.Text()
            label.display_text = text.encode('ascii')
            label.x_offset = 10
            label.y_offset = 12
            label.font.name = osd.FontFamily.Serif
            label.font.size = 12
            label.font.color = osd.Color(1.0, 1.0, 1.0, 1.0)
            label.set_bg_color = True
            label.bg_color = osd.Color(0.0, 0.0, 0.0, 1.0)
            display_meta.add_text(label)
            frame_meta.append(display_meta)

通过在启动 pipeline 之前将上述缓冲区探针附加到现有 pipeline 中的推理插件中,我们从视频流的每一帧中提取对象计数信息,并在控制台输出和视频上的叠加层中显示它

pipeline.attach("infer", Probe("counter", ObjectCounterMarker()))

现在,让我们再次运行 python 应用程序,我们将看到打印出的对象计数

Object Counter: Pad Idx=0,Frame Number=0,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=1,Vehicle Count=15, Person Count=7
Object Counter: Pad Idx=0,Frame Number=2,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=3,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=4,Vehicle Count=15, Person Count=8
Object Counter: Pad Idx=0,Frame Number=5,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=6,Vehicle Count=11, Person Count=5
Object Counter: Pad Idx=0,Frame Number=7,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=8,Vehicle Count=19, Person Count=4
Object Counter: Pad Idx=0,Frame Number=9,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=10,Vehicle Count=13, Person Count=4

除了从 Python 代码创建 Probe 实例外,“attach”方法还可以通过指定模块名称从共享库附加探针。以下代码附加了一个预构建的探针,用于在“osd”上显示对象信息

pipeline.attach("infer", "sample_video_probe", "my_probe")

支持用于 pipeline 构建的 YAML 配置文件,遵循与 C++ API 使用的规范相同的规范。以下是如何在 YAML 配置中定义上述 pipeline

deepstream:
  nodes:
  - type: nvurisrcbin
    name: src
    properties:
        uri: file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
  - type: nvstreammux
    name: mux
    properties:
      batch-size: 1
      width: 1280
      height: 720
  - type: nvinferbin
    name: infer
    properties:
      config-file-path: /opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml
  - type: nvosdbin
    name: osd
  - type: nveglglessink
    name: sink
  edges:
    src: mux
    mux: infer
    infer: osd
    osd: sink

并且在应用 YAML 配置后,应用程序源代码可以简化为单行

Pipeline(name="sample-pipeline", config_file="my_config.yaml").start().wait()

PipelineAPI 示例应用程序参考表#

参考测试应用程序

service-maker/sources 目录内的路径

描述

示例测试应用程序 1

apps/python/pipeline_api/deepstream_test1_app

如何使用 DeepStream 元素进行使用 pipelineAPI 的单 H.264 流推理的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

示例测试应用程序 2

apps/python/pipeline_api/deepstream_test2_app

如何使用 DeepStream 元素进行使用 pipelineAPI 的单 H.264 流级联推理的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvtracker -> nvinfer 或 nvinferserver (辅助分类器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测和 2 个分类器模型 (即,resnet18_vehiclemakenet_pruned.onnx、resnet18_vehicletypenet_pruned.onnx)。

示例测试应用程序 3

apps/python/pipeline_api/deepstream_test3_app

在 pipeline_api/deepstream_test1 (示例测试应用程序 1) 的基础上构建,以演示如何

  • 在 pipeline 中使用多个源进行推理。

  • 使用 uridecodebin 接受任何类型的输入 (例如 RTSP/File)。

  • 配置 nvstreammux 以生成一批帧并在其上进行推理,以更好地利用资源。

  • 提取流元数据,其中包含有关批处理缓冲区中帧的有用信息。

此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

示例测试应用程序 4

apps/python/pipeline_api/deepstream_test4_app

在 pipeline_api/deepstream_test1 的基础上构建,用于单 H.264 流推理:filesrc、decode、nvstreammux、nvinfer 或 nvinferserver、nvdsosd、renderer,以演示如何

  • 在 pipeline 中使用 nvmsgconv 和 nvmsgbroker 插件。

  • 创建 NVDS_META_EVENT_MSG 类型元数据并将其附加到缓冲区。

  • 将 NVDS_META_EVENT_MSG 用于不同类型的对象,例如车辆和人员。

此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

示例测试应用程序 5

apps/python/pipeline_api/deepstream_test5_app

使用 pipelineAPI 构建。演示了

  • 在 pipeline 中使用 nvmsgconv 和 nvmsgbroker 插件进行多流推理。

  • 如何从配置文件将 nvmsgbroker 插件配置为 sink 插件 (对于 KAFKA、Azure 等)。

  • 如何使用远程 Kafka 服务器作为生产者和消费者。

  • 利用 nvmultiurisrcbin 进行动态源管理

此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

Appsrc 和 Appsink 示例

apps/python/pipeline_api/deepstream_appsrc_test_app

演示如何为 Feeder 类创建 BufferProvider 以及如何为 receiver 类创建 BufferRetriever。具有自定义 BufferProvider 的 Feeder 可用于将用户数据注入到 DS pipeline,而具有自定义 BufferRetriever 的 receiver 可用于从 pipeline 中提取缓冲区数据。

Kafka 自定义数据示例

apps/python/pipeline_api/deepstream_kafka_test_app

如何使用 DeepStream 元素进行单 H.264 流推理并将自定义推理数据直接发送到 kafka 服务器(使用 pipelineAPI)的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。