Flow API 简介#

Flow API 有效地抽象了底层 pipeline 细节,使开发人员能够专注于以 pythonic 风格完成特定任务的目标。这些高级 API 强调“做什么”而不是“如何做”,使开发人员能够以更直观和简洁的方式表达他们的意图。这种抽象简化了开发过程,并提高了代码的可读性和可维护性。

除了显式声明的参数外,Flow API 还允许经验丰富的用户通过 kwargs 覆盖标准 Deepstream Element 属性,唯一的技巧是将属性名称中的所有“-”替换为“_”。

例如,用户可以在为捕获创建 flow 时指定他们想要使用的 GPU

# Create a capture flow on gpu 1
flow = Flow(Pipeline('caputure')).capture([video_file], gpu_id=1)

在 service maker 中,Flow 支持多媒体和深度学习领域中的各种常见操作。

Capture#

“capture”方法将捕获操作附加到空 flow。此操作将 URI 列表作为输入,从中将在 flow 内创建多个携带解码数据的流。

# Flow for playback a mp4 video
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render()()

Inject#

“inject”方法将注入操作附加到空 flow。该操作接受 BufferProvider 对象列表,并在之后创建多个流。

除了覆盖“generate”方法以创建缓冲区之外,BufferProvider 对象还必须携带以下成员来指示注入过程

成员名称

描述

width

视频帧的宽度

height

视频帧的高度

format

格式:RGB/I420

framerate

视频的帧率

use_gpu

布尔值,指示数据是否在 GPU 内存中

from pyservicemaker import Pipeline, Flow, BufferProvider, Buffer

class MyBufferProvider(BufferProvider):

    def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
        super().__init__()
        self.width = width
        self.height = height
        self.format = format
        self.framerate = framerate
        self.device = device
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = [self.count]*(self.width*self.height*3)
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

p = MyBufferProvider(320, 240)
# playback a mp4 video
Flow(Pipeline("playback")).inject([p]).render()()

Retrieve#

“retrieve”方法将数据检索器附加到当前 flow。该操作接受 BufferRetriever 的实例,该实例实现“consume”方法以访问缓冲区。调用该方法会导致 flow 结束,并且不能再附加任何操作。

# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever

class MyBufferRetriever(BufferRetriever):
    def __init__(self):
        super().__init__()
        self.frames = 0

    def consume(self, buffer):
        tensor = buffer.extract(0)
        assert len(tensor.shape) == 3
        self.frames += 1
        return 1
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()

Decode#

“decode”方法将解码操作附加到当前 flow。该操作为每个上游添加一个解码器。在通过 buffer provider 注入数据的情况下,这非常有用。

class JpegBufferProvider(BufferProvider):

    def __init__(self, file_path:str):
        super().__init__()
        self._file_path = file_path
        self.format = "JPEG"
        self.width = 1280
        self.height = 720
        self.framerate = 0
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = []
        with open(self._file_path, "rb") as f:
            bytes = f.read()
            data = [int(b) for b in bytes]
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

# decode jpeg from a binary buffer
jpeg_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.jpg"
Flow(Pipeline("test")).inject([JpegBufferProvider(jpeg_file)]).decode().render()()

Batch#

“batch”方法将批处理操作附加到当前 flow,以将所有流组合成单个批处理流。此操作采用“batch_size”、“width”和“height”作为参数。如果未给出这些参数,则该操作将批大小设置为流的数量,并将宽度 x 高度默认设置为 1920 x 1080。

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback all the source in a tiled display
Flow(Pipeline("playback")).capture(uri_list).batch().render()()

Batched Capture#

“batch_capture”方法将捕获操作附加到空 flow 并批处理输入。该操作采用 URI 列表或源配置文件作为输入,并在之后形成批处理流。

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback 4 mp4 videos at the same time
Flow(Pipeline("playback")).batch_capture(uri_list).render()()

Render#

“render”方法将渲染器附加到当前 flow,以显示或丢弃视频。该操作采用渲染模式来决定如何渲染数据,目前支持 DISPLAY(默认)和 DISCARD。此外,可选的命名参数涵盖所有标准 sink 控制参数。调用该方法会导致 flow 结束,并且不能再附加任何操作。

# discard the video frames
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render(mode=RenderMode.DISCARD)()

Encode#

“encode”方法将编码器附加到当前 flow,以将视频数据编码为文件或 rtsp 流。该操作采用以“file://”或“rtsp://”为前缀的目标 URI。如果缺少前缀,则暗示“file://”。在 RTSP 流的情况下,URI 中必须出现端口号。此外,也支持用于编码控制的可选参数。

名称

描述

profile

profile:0 代表 baseline(默认),1 代表 constrainted baseline,2 代表 main,4 代表 high

iframeinterval

编码帧内帧发生频率,默认为 10

bitrate

比特率,默认为 2000000

# streaming a udp stream via rtsp
pipeline = Pipeline("test")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).encode("output.mp4", sync=True)()

Infer#

“infer”方法在当前 flow 中启用推理。该操作采用“config”参数作为模型配置文件。可以添加可选的标准 nvinfer 参数来覆盖配置文件中的值。

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
# object detection using resnet18 for 4 streams
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
Flow(Pipeline("infer")).batch_capture(uri_list).infer(pgie_config).render()()

Track#

“track”方法将跟踪器附加到当前 flow,以跟踪检测到的对象。该操作必须在“infer”之后进行,以获取检测数据。必须适当地设置标准 nvtracker 参数,以使跟踪器正常工作。

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# object detection and tracking using nvmultiobjecttracker for 4 streams
Flow(Pipeline("tracker")).batch_capture(uri_list).infer(pgie_config).track(
    ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
    ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
).render()()

Publish#

“publish”方法将一个过程附加到当前 flow,用于将事件发布到远程服务器。该操作采用以下参数来设置 pipeline 和远程服务器之间的通信。

名称

描述

msg_broker_proto_lib

消息代理使用的底层库

msg_broker_conn_str

服务器的连接字符串

topic

主题名称

msg_conv_config

用于源信息的消息转换器配置

# publish the object data to a kafka server
Flow(Pipeline("publish")).batch_capture(
    "/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/deepstream_test5_app/source_list_dynamic.yaml"
).infer(
    "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).attach(
    what="add_message_meta_probe",
    name="message_generator"
).publish(
    msg_broker_proto_lib="/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so",
    msg_broker_conn_str="qvs-ds-kafka.nvidia.com;9092",
    topic="test4app",
)()

调用该方法会导致 flow 结束,并且不能再附加任何操作。

Attach#

“attach”方法将 Probe 附加到当前 flow。需要两个参数

名称

描述

what

可以是 Probe 对象或共享库中 probe 的名称

名称

如果 probe 来自共享库,则为实例名称

Fork#

“fork”方法 fork 当前 flow,以便可以附加多个 flow。

# Initiate a pipeline to read a mp4 file
# transcode the video to both a local file and via rtsp
# at the same time, do the playback
pipeline = Pipeline("test")
dest = "rtsp://127.0.0.1:8554"
flow = Flow(pipeline).capture(["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h265.mp4"]).batch().fork()
flow.encode(dest, sync=True)
flow.encode("/tmp/sample.mp4")
flow.render(sync=True)
flow()

输出 RTSP 流可以从“rtsp://127.0.0.1:8554/ds-test”接收

FlowAPI 示例应用程序参考表#

参考测试应用程序

service-maker/sources 目录内的路径

描述

示例测试应用程序 1

apps/python/flow_api/deepstream_test1_app

如何为单个 H.264 流推理使用 flowAPI 方法的示例:batch_capture -> infer -> render。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

示例测试应用程序 2

apps/python/flow_api/deepstream_test2_app

如何为单个 H.264 流级联推理使用 flowAPI 方法的示例:batch_capture -> infer(主要检测器) -> track -> infer(辅助分类器) -> render。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测和 2 个分类器模型(即 resnet18_vehiclemakenet_pruned.onnx、resnet18_vehicletypenet_pruned.onnx)。

示例测试应用程序 3

apps/python/flow_api/deepstream_test3_app

基于 flow_api/deepstream_test1(示例测试应用程序 1)构建,以演示如何

  • 在 pipeline 中使用多个源进行推理。

  • 提取流元数据,其中包含有关批处理缓冲区中帧的有用信息。

此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

示例测试应用程序 4

apps/python/flow_api/deepstream_test4_app

基于 flow_api/deepstream_test1 的单个 H.264 流推理,演示如何使用 publish 方法将消息发布到远程服务器,以及使用 fork 方法同时渲染输出。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。

Inject 和 Retrieve 示例

apps/python/flow_api/deepstream_appsrc_test_app

演示如何为 retrieve 方法创建 BufferRetriever。带有自定义 BufferRetriever 的 retrieve 方法可用于从 pipeline 中提取缓冲区数据。