Flow API 简介#
Flow API 有效地抽象了底层 pipeline 细节,使开发人员能够专注于以 pythonic 风格完成特定任务的目标。这些高级 API 强调“做什么”而不是“如何做”,使开发人员能够以更直观和简洁的方式表达他们的意图。这种抽象简化了开发过程,并提高了代码的可读性和可维护性。
除了显式声明的参数外,Flow API 还允许经验丰富的用户通过 kwargs 覆盖标准 Deepstream Element 属性,唯一的技巧是将属性名称中的所有“-”替换为“_”。
例如,用户可以在为捕获创建 flow 时指定他们想要使用的 GPU
# Create a capture flow on gpu 1
flow = Flow(Pipeline('caputure')).capture([video_file], gpu_id=1)
在 service maker 中,Flow 支持多媒体和深度学习领域中的各种常见操作。
Capture#
“capture”方法将捕获操作附加到空 flow。此操作将 URI 列表作为输入,从中将在 flow 内创建多个携带解码数据的流。
# Flow for playback a mp4 video
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render()()
Inject#
“inject”方法将注入操作附加到空 flow。该操作接受 BufferProvider 对象列表,并在之后创建多个流。
除了覆盖“generate”方法以创建缓冲区之外,BufferProvider 对象还必须携带以下成员来指示注入过程
成员名称 |
描述 |
---|---|
width |
视频帧的宽度 |
height |
视频帧的高度 |
format |
格式:RGB/I420 |
framerate |
视频的帧率 |
use_gpu |
布尔值,指示数据是否在 GPU 内存中 |
from pyservicemaker import Pipeline, Flow, BufferProvider, Buffer
class MyBufferProvider(BufferProvider):
def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
super().__init__()
self.width = width
self.height = height
self.format = format
self.framerate = framerate
self.device = device
self.count = 0
self.expected = 255
def generate(self, size):
data = [self.count]*(self.width*self.height*3)
if self.count < self.expected:
self.count += 1
return Buffer() if self.count == self.expected else Buffer(data)
p = MyBufferProvider(320, 240)
# playback a mp4 video
Flow(Pipeline("playback")).inject([p]).render()()
Retrieve#
“retrieve”方法将数据检索器附加到当前 flow。该操作接受 BufferRetriever 的实例,该实例实现“consume”方法以访问缓冲区。调用该方法会导致 flow 结束,并且不能再附加任何操作。
# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever
class MyBufferRetriever(BufferRetriever):
def __init__(self):
super().__init__()
self.frames = 0
def consume(self, buffer):
tensor = buffer.extract(0)
assert len(tensor.shape) == 3
self.frames += 1
return 1
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()
Decode#
“decode”方法将解码操作附加到当前 flow。该操作为每个上游添加一个解码器。在通过 buffer provider 注入数据的情况下,这非常有用。
class JpegBufferProvider(BufferProvider):
def __init__(self, file_path:str):
super().__init__()
self._file_path = file_path
self.format = "JPEG"
self.width = 1280
self.height = 720
self.framerate = 0
self.count = 0
self.expected = 255
def generate(self, size):
data = []
with open(self._file_path, "rb") as f:
bytes = f.read()
data = [int(b) for b in bytes]
if self.count < self.expected:
self.count += 1
return Buffer() if self.count == self.expected else Buffer(data)
# decode jpeg from a binary buffer
jpeg_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.jpg"
Flow(Pipeline("test")).inject([JpegBufferProvider(jpeg_file)]).decode().render()()
Batch#
“batch”方法将批处理操作附加到当前 flow,以将所有流组合成单个批处理流。此操作采用“batch_size”、“width”和“height”作为参数。如果未给出这些参数,则该操作将批大小设置为流的数量,并将宽度 x 高度默认设置为 1920 x 1080。
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback all the source in a tiled display
Flow(Pipeline("playback")).capture(uri_list).batch().render()()
Batched Capture#
“batch_capture”方法将捕获操作附加到空 flow 并批处理输入。该操作采用 URI 列表或源配置文件作为输入,并在之后形成批处理流。
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback 4 mp4 videos at the same time
Flow(Pipeline("playback")).batch_capture(uri_list).render()()
Render#
“render”方法将渲染器附加到当前 flow,以显示或丢弃视频。该操作采用渲染模式来决定如何渲染数据,目前支持 DISPLAY(默认)和 DISCARD。此外,可选的命名参数涵盖所有标准 sink 控制参数。调用该方法会导致 flow 结束,并且不能再附加任何操作。
# discard the video frames
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render(mode=RenderMode.DISCARD)()
Encode#
“encode”方法将编码器附加到当前 flow,以将视频数据编码为文件或 rtsp 流。该操作采用以“file://”或“rtsp://”为前缀的目标 URI。如果缺少前缀,则暗示“file://”。在 RTSP 流的情况下,URI 中必须出现端口号。此外,也支持用于编码控制的可选参数。
名称 |
描述 |
---|---|
profile |
profile:0 代表 baseline(默认),1 代表 constrainted baseline,2 代表 main,4 代表 high |
iframeinterval |
编码帧内帧发生频率,默认为 10 |
bitrate |
比特率,默认为 2000000 |
# streaming a udp stream via rtsp
pipeline = Pipeline("test")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).encode("output.mp4", sync=True)()
Infer#
“infer”方法在当前 flow 中启用推理。该操作采用“config”参数作为模型配置文件。可以添加可选的标准 nvinfer 参数来覆盖配置文件中的值。
pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
# object detection using resnet18 for 4 streams
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
Flow(Pipeline("infer")).batch_capture(uri_list).infer(pgie_config).render()()
Track#
“track”方法将跟踪器附加到当前 flow,以跟踪检测到的对象。该操作必须在“infer”之后进行,以获取检测数据。必须适当地设置标准 nvtracker 参数,以使跟踪器正常工作。
pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# object detection and tracking using nvmultiobjecttracker for 4 streams
Flow(Pipeline("tracker")).batch_capture(uri_list).infer(pgie_config).track(
ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
).render()()
Publish#
“publish”方法将一个过程附加到当前 flow,用于将事件发布到远程服务器。该操作采用以下参数来设置 pipeline 和远程服务器之间的通信。
名称 |
描述 |
---|---|
msg_broker_proto_lib |
消息代理使用的底层库 |
msg_broker_conn_str |
服务器的连接字符串 |
topic |
主题名称 |
msg_conv_config |
用于源信息的消息转换器配置 |
# publish the object data to a kafka server
Flow(Pipeline("publish")).batch_capture(
"/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/deepstream_test5_app/source_list_dynamic.yaml"
).infer(
"/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).attach(
what="add_message_meta_probe",
name="message_generator"
).publish(
msg_broker_proto_lib="/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so",
msg_broker_conn_str="qvs-ds-kafka.nvidia.com;9092",
topic="test4app",
)()
调用该方法会导致 flow 结束,并且不能再附加任何操作。
Attach#
“attach”方法将 Probe 附加到当前 flow。需要两个参数
名称 |
描述 |
---|---|
what |
可以是 Probe 对象或共享库中 probe 的名称 |
名称 |
如果 probe 来自共享库,则为实例名称 |
Fork#
“fork”方法 fork 当前 flow,以便可以附加多个 flow。
# Initiate a pipeline to read a mp4 file
# transcode the video to both a local file and via rtsp
# at the same time, do the playback
pipeline = Pipeline("test")
dest = "rtsp://127.0.0.1:8554"
flow = Flow(pipeline).capture(["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h265.mp4"]).batch().fork()
flow.encode(dest, sync=True)
flow.encode("/tmp/sample.mp4")
flow.render(sync=True)
flow()
输出 RTSP 流可以从“rtsp://127.0.0.1:8554/ds-test”接收
FlowAPI 示例应用程序参考表#
参考测试应用程序
service-maker/sources 目录内的路径
描述
示例测试应用程序 1
apps/python/flow_api/deepstream_test1_app
如何为单个 H.264 流推理使用 flowAPI 方法的示例:batch_capture -> infer -> render。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
示例测试应用程序 2
apps/python/flow_api/deepstream_test2_app
如何为单个 H.264 流级联推理使用 flowAPI 方法的示例:batch_capture -> infer(主要检测器) -> track -> infer(辅助分类器) -> render。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测和 2 个分类器模型(即 resnet18_vehiclemakenet_pruned.onnx、resnet18_vehicletypenet_pruned.onnx)。
示例测试应用程序 3
apps/python/flow_api/deepstream_test3_app
基于 flow_api/deepstream_test1(示例测试应用程序 1)构建,以演示如何
在 pipeline 中使用多个源进行推理。
提取流元数据,其中包含有关批处理缓冲区中帧的有用信息。
此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
示例测试应用程序 4
apps/python/flow_api/deepstream_test4_app
基于 flow_api/deepstream_test1 的单个 H.264 流推理,演示如何使用 publish 方法将消息发布到远程服务器,以及使用 fork 方法同时渲染输出。此应用程序使用 resnet18_trafficcamnet_pruned.onnx 进行检测。
Inject 和 Retrieve 示例
apps/python/flow_api/deepstream_appsrc_test_app
演示如何为 retrieve 方法创建 BufferRetriever。带有自定义 BufferRetriever 的 retrieve 方法可用于从 pipeline 中提取缓冲区数据。