Pipeline API 简介#
对于已经熟悉 DeepStream SDK 的开发人员,pyservicemaker 提供了 Pipeline API,使他们能够充分利用 DeepStream 的功能。
使用 Pipeline API 在 Python 中创建示例 Deepstream 应用程序与使用 C++ API 的过程非常相似,明显的区别在于它不需要 Makefile 或构建过程。
from pyservicemaker import Pipeline
import sys
CONFIG_FILE_PATH = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
if __name__ == '__main__':
pipeline = Pipeline("sample-pipeline")
pipeline.add("nvurisrcbin", "src", {"uri": sys.argv[1]})
pipeline.add("nvstreammux", "mux", {"batch-size": 1, "width": 1280, "height": 720})
pipeline.add("nvinferbin", "infer", {"config-file-path": CONFIG_FILE_PATH})
pipeline.add("nvosdbin", "osd").add("nveglglessink", "sink")
pipeline.link(("src", "mux"), ("", "sink_%u")).link("mux", "infer", "osd", "sink")
pipeline.start().wait()
一个功能性 pipeline 需要添加、配置和正确链接来自 Deepstream 插件的适当元素。这可以使用 Pipeline API 以流畅的方式无缝实现
Pipeline pipeline("sample-pipeline")
// nvstreammux is the factory name in Deepstream to create a streammuxer Element
// mux is the name of the Element instance
// multiple key value pairs can be appended to configure the added Element
pipeline.add("nvstreammux", "mux", "batch-size", 1, "width", 1280, "height", 720)
“add”方法用于将所有必要的元素节点合并到 pipeline 实例中。此方法采用元素的注册名称和节点名称作为参数,后跟一个字典,用于指定元素的属性。在添加期间赋予元素的节点名称可用于在 pipeline 中引用该元素。
有关每个属性如何影响相应元素的详细信息,DS 插件概述 是主要的和最全面的资源。开发人员还可以使用元素注册名称运行 gst-inspect-1.0 来检查其技术规范,例如,在查找插件手册中的 nvstreammux 后,我们知道该元素用于批处理来自多个输入的缓冲区,并且需要设置“batch-size”、“width”和“height”。
在将元素节点添加到 pipeline 后,“link”方法完成流式传输路径的构建。此方法提供两种变体
更简单的一种接受要按顺序链接的所有实例的名称。
pipeline.link("mux", "infer", "osd", "sink")
更复杂的一种链接两个实例,利用两个元组来指定源名称和目标名称,以及源和目标 pad 以指示特定的媒体流。
pipeline.link(("src", "mux"), ("", "sink_%u"))
第二种“link”方法主要解决动态路径,例如“nvstreammux”元素遇到的那些路径。此元素具有动态输入和一个名为“sink_%u”的模板 pad,这需要使用此方法来建立适当的连接。
要启动 pipeline 并等待流到达其末尾,需要按顺序调用“start”方法和“wait”方法。
pipeline.start().wait()
现在,让我们将代码另存为 sample_app.py 并从控制台运行应用程序
$ python3 sample_app.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4
推理正在按预期使用提供的模型配置运行。但是,如果我们希望检查结果,则需要通过附加探针来执行其他步骤。
探针是用于拦截已处理的缓冲区和元数据的预定义类。通过为其提供 BatchMetadataOperator 的合适实现,我们可以从“nvinfer”生成的元数据中提取对象检测结果。
以下是 BatchMetadataOperator 的示例实现,它计算对象并在下游“osd”元素中显示数字。“handle_metadata”方法在每个缓冲区批处理的 pipeline 中调用,其中“batch_meta”对象包装批处理元数据。开发人员可以迭代“batch_meta”对象的帧元数据,然后迭代对象元数据的帧元数据。
在示例代码中,handle_metadata 检查批处理中每帧的对象信息,计算每个类别的计数,并将包含文本标签的显示元数据对象附加到帧。因此,数字会显示在视频输出中。有关元数据用法的更多详细信息,请参阅 利用元数据。
from pyservicemaker import BatchMetadataOperator, Probe, osd
class ObjectCounterMarker(BatchMetadataOperator):
def handle_metadata(self, batch_meta):
for frame_meta in batch_meta.frame_items:
vehcle_count = 0
person_count = 0
for object_meta in frame_meta.object_items:
class_id = object_meta.class_id
if class_id == 0:
vehcle_count += 1
elif class_id == 2:
person_count += 1
print(f"Object Counter: Pad Idx={frame_meta.pad_index},"
f"Frame Number={frame_meta.frame_number},"
f"Vehicle Count={vehcle_count}, Person Count={person_count}")
text = f"Person={person_count},Vehicle={vehcle_count}"
display_meta = batch_meta.acquire_display_meta()
label = osd.Text()
label.display_text = text.encode('ascii')
label.x_offset = 10
label.y_offset = 12
label.font.name = osd.FontFamily.Serif
label.font.size = 12
label.font.color = osd.Color(1.0, 1.0, 1.0, 1.0)
label.set_bg_color = True
label.bg_color = osd.Color(0.0, 0.0, 0.0, 1.0)
display_meta.add_text(label)
frame_meta.append(display_meta)
通过在启动 pipeline 之前将上述缓冲区探针附加到现有 pipeline 中的推理插件中,我们从视频流的每一帧中提取对象计数信息,并在控制台输出和视频上的叠加层中显示它
pipeline.attach("infer", Probe("counter", ObjectCounterMarker()))
现在,让我们再次运行 python 应用程序,我们将看到打印出的对象计数
Object Counter: Pad Idx=0,Frame Number=0,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=1,Vehicle Count=15, Person Count=7
Object Counter: Pad Idx=0,Frame Number=2,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=3,Vehicle Count=12, Person Count=6
Object Counter: Pad Idx=0,Frame Number=4,Vehicle Count=15, Person Count=8
Object Counter: Pad Idx=0,Frame Number=5,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=6,Vehicle Count=11, Person Count=5
Object Counter: Pad Idx=0,Frame Number=7,Vehicle Count=13, Person Count=5
Object Counter: Pad Idx=0,Frame Number=8,Vehicle Count=19, Person Count=4
Object Counter: Pad Idx=0,Frame Number=9,Vehicle Count=15, Person Count=5
Object Counter: Pad Idx=0,Frame Number=10,Vehicle Count=13, Person Count=4
除了从 Python 代码创建 Probe 实例外,“attach”方法还可以通过指定模块名称从共享库附加探针。以下代码附加了一个预构建的探针,用于在“osd”上显示对象信息
pipeline.attach("infer", "sample_video_probe", "my_probe")
支持用于 pipeline 构建的 YAML 配置文件,遵循与 C++ API 使用的规范相同的规范。以下是如何在 YAML 配置中定义上述 pipeline
deepstream:
nodes:
- type: nvurisrcbin
name: src
properties:
uri: file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
- type: nvstreammux
name: mux
properties:
batch-size: 1
width: 1280
height: 720
- type: nvinferbin
name: infer
properties:
config-file-path: /opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml
- type: nvosdbin
name: osd
- type: nveglglessink
name: sink
edges:
src: mux
mux: infer
infer: osd
osd: sink
并且在应用 YAML 配置后,应用程序源代码可以简化为单行
Pipeline(name="sample-pipeline", config_file="my_config.yaml").start().wait()
PipelineAPI 示例应用程序参考表#
参考测试应用程序
service-maker/sources 目录内的路径
描述
示例测试应用程序 1
apps/python/pipeline_api/deepstream_test1_app
如何使用 DeepStream 元素进行使用 pipelineAPI 的单 H.264 流推理的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
示例测试应用程序 2
apps/python/pipeline_api/deepstream_test2_app
如何使用 DeepStream 元素进行使用 pipelineAPI 的单 H.264 流级联推理的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvtracker -> nvinfer 或 nvinferserver (辅助分类器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测和 2 个分类器模型 (即,resnet18_vehiclemakenet_pruned.onnx、resnet18_vehicletypenet_pruned.onnx)。
示例测试应用程序 3
apps/python/pipeline_api/deepstream_test3_app
在 pipeline_api/deepstream_test1 (示例测试应用程序 1) 的基础上构建,以演示如何
在 pipeline 中使用多个源进行推理。
使用 uridecodebin 接受任何类型的输入 (例如 RTSP/File)。
配置 nvstreammux 以生成一批帧并在其上进行推理,以更好地利用资源。
提取流元数据,其中包含有关批处理缓冲区中帧的有用信息。
此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
示例测试应用程序 4
apps/python/pipeline_api/deepstream_test4_app
在 pipeline_api/deepstream_test1 的基础上构建,用于单 H.264 流推理:filesrc、decode、nvstreammux、nvinfer 或 nvinferserver、nvdsosd、renderer,以演示如何
在 pipeline 中使用 nvmsgconv 和 nvmsgbroker 插件。
创建 NVDS_META_EVENT_MSG 类型元数据并将其附加到缓冲区。
将 NVDS_META_EVENT_MSG 用于不同类型的对象,例如车辆和人员。
此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
示例测试应用程序 5
apps/python/pipeline_api/deepstream_test5_app
使用 pipelineAPI 构建。演示了
在 pipeline 中使用 nvmsgconv 和 nvmsgbroker 插件进行多流推理。
如何从配置文件将 nvmsgbroker 插件配置为 sink 插件 (对于 KAFKA、Azure 等)。
如何使用远程 Kafka 服务器作为生产者和消费者。
利用 nvmultiurisrcbin 进行动态源管理
此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
Appsrc 和 Appsink 示例
apps/python/pipeline_api/deepstream_appsrc_test_app
演示如何为 Feeder 类创建 BufferProvider 以及如何为 receiver 类创建 BufferRetriever。具有自定义 BufferProvider 的 Feeder 可用于将用户数据注入到 DS pipeline,而具有自定义 BufferRetriever 的 receiver 可用于从 pipeline 中提取缓冲区数据。
Kafka 自定义数据示例
apps/python/pipeline_api/deepstream_kafka_test_app
如何使用 DeepStream 元素进行单 H.264 流推理并将自定义推理数据直接发送到 kafka 服务器(使用 pipelineAPI)的示例:filesrc -> decode -> nvstreammux -> nvinfer 或 nvinferserver (主检测器) -> nvdsosd -> renderer。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。