Gst-nvmsgbroker#

此插件使用指定的通信协议将有效负载消息发送到服务器。它接受附加了 NvDsPayload 元数据的任何缓冲区,并使用 nvds_msgapi_* 接口将消息发送到服务器。您必须为要使用的协议实现 nvds_msgapi_* 接口,并在 proto-lib 属性中指定实现库。

Gst-nvmsgbroker

输入和输出#

  • 输入

    • 带有 NvDsPayload 的 Gst 缓冲区

  • 控制参数

    • 配置

    • conn-str

    • proto-lib

    • comp-id

    • topic

    • new-api

  • 输出

    • 无,因为这是一个 sink 组件

特性#

下表总结了 Gst-nvmsgbroker 插件的特性。

Gst-nvmsgbroker 插件特性#

特性

描述

发布版本

JSON 格式的有效负载

接受 JSON 格式的消息有效负载

DS 3.0

Kafka 协议支持

Kafka 协议适配器实现

DS 3.0

Azure IOT 支持

与 Azure IOT 框架集成

DS 4.0

AMQP 支持

AMQP 0-9-1 协议适配器实现

DS 4.0

REDIS 支持

使用 Redis Streams 的 Redis 协议适配器

DS 5.1

MQTT 支持

使用 Eclipse mosquitto 的 MQTT 协议适配器

DS 6.3

自定义协议支持

通过适配器接口的自定义实现来支持自定义协议的规定

DS 3.0

可配置参数

通过配置文件配置特定于协议的选项

DS 3.0

Gst 属性#

下表描述了 Gst-nvmsgbroker 插件的 Gst 属性。

Gst-nvmsgbroker 插件 gst 属性#

属性

含义

类型和范围

示例注释

平台

config

nvds_msgapi_* 接口所需的配置文件的绝对路径名

字符串

config=<msgapi_­config.txt>

dGPU Jetson

conn-str

作为与服务器通信的端点的连接字符串

字符串格式必须为 <name>;<port>;<specifier>

conn-str= foo.bar.com;80 ;user-id

dGPU Jetson

proto-lib

包含协议适配器作为 nvds_msgapi_* 实现的库的绝对路径名

字符串

proto-lib=<libnvds_kafka_proto.so>

dGPU Jetson

comp-id

应从中处理元数据的组件的 ID

整数,0 到 4,294,967,295

comp-id=3 默认值:插件处理来自任何组件的元数据

dGPU Jetson

topic

消息主题名称

字符串

topic=dsapp1

dGPU Jetson

new-api

直接使用协议适配器库 api 或使用新的 msgbroker 库包装器 api

整数

0:直接使用适配器 api

1:msgbroker lib 包装器 api(请参阅 nv_msgbroker:消息代理接口

new-api = 0

dGPU Jetson

sleep-time

连续 do_work 调用之间的休眠时间,以毫秒为单位

整数 >= 0。对于 Azure,根据 IoT Hub 服务层消息速率限制,使用值 >= 10。警告:休眠时间过长(例如 10000000 毫秒)可能会导致失败

sleep-time=10

dGPU Jetson

nvds_msgapi:协议适配器接口#

您可以使用 DeepStream 消息传递接口 nvds_msgapi 来实现自定义协议消息处理程序,并将其与 DeepStream 应用程序集成。这种消息处理程序(称为协议适配器)使您能够将 DeepStream 应用程序与后端数据源(例如云中存储的数据)集成。DeepStream 7.1 的新增功能,DeepStream 提供的所有协议适配器的源代码都位于 /opt/nvidia/deepstream/deepstream/sources/libs/*_protocol_adaptor

Gst-nvmsgbroker-nvds_msgapi

Gst-nvmsgbroker 插件调用协议适配器中的函数,如上图所示。这些函数支持

  • 创建连接

  • 通过同步或异步方式发送消息

  • 终止连接

  • 协调客户端和协议适配器对 CPU 资源和线程的使用

  • 获取协议适配器的版本号

nvds_msgapi 接口在头文件 /opt/nvidia/deepstream/deepstream/sources/includes/nvds_msgapi.h 中定义。此头文件定义了一组函数指针,这些函数指针提供类似于 C++ 中接口的接口。以下各节描述了 nvds_msgapi 接口定义的方法。

nvds_msgapi_connect():创建连接#

NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str,
nvds_msgapi_connect_cb_t connect_cb, char *config_path
);

该函数接受连接字符串并配置连接。适配器实现可以选择该函数是否建立连接以适应无连接协议(如 HTTP)。

参数

  • connection_str:指向字符串的指针,该字符串以通用格式 <url>;<port>;<specifier> 指定连接参数。

    • <url><port> 指定远程实体的网络地址。

    • <specifier> 指定特定于协议的信息。其内容取决于协议的实现。例如,它可能是用于建立连接的客户端标识符。

    请注意,此连接字符串格式不是绑定的,适配器可以从其格式中省略某些字段(例如:specifier),前提是省略在文档中进行了描述。这种连接字符串适配的一个特例是适配器期望所有连接参数都在配置文件中指定为字段(请参阅下面的 config path),在这种情况下,连接字符串作为 NULL 传递。

  • connect_cb:用于与连接关联的事件的回调函数。

  • config_path:配置文件的路径名,该配置文件定义适配器使用的协议参数。

返回值

如果成功,则用于后续接口调用的句柄;否则为 NULL。

nvds_msgapi_send() 和 nvds_msgapi_send_async():发送事件#

NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle *h_ptr,
char *topic, uint8_t *payload, size_t nbuf
);
NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr,
char *topic, const uint8_t *payload, size_t nbuf,
nvds_msgapi_send_cb_t send_callback, void *user_ptr
 );

这两个函数都将数据发送到连接的端点。它们接受消息主题和消息有效负载。nvds_send() 函数是同步的。nvds_msgapi_send_async() 函数是异步的;它接受一个回调函数,该函数在“发送”操作完成时调用。这两个函数都允许 API 客户端通过调用 nvds_msgapi_do_work() 来控制适配器逻辑的执行。请参阅 nvds_msgapi_do_work() 函数的描述。

参数

  • h_ptr:连接的句柄,通过调用 nvds_msgapi_connect() 获得。

  • topic:指向字符串的指针,该字符串指定消息的主题;如果主题对于协议适配器的语义没有意义,则可以为 NULL。

  • payload:指向包含消息有效负载的字节数组的指针。

  • nbuf:要发送的字节数。

  • send_callback:指向异步函数在“发送”操作完成时调用的回调函数的指针。回调函数的签名类型为 nvds_msgapi_send_cb_t,定义为

    typedef void (*nvds_msgapi_send_cb_t)(void *user_ptr,
    NvDsMsgApiErrorType completion_flag
    );
    

其中回调的参数为

  • user_ptr:来自调用 nvds_msgapi_send() 或 ``nvds_msgapi_send_async()`` 的用户指针 (user_ptr),它启动了“发送”操作。使回调函数能够识别启动调用。

  • completion_flag:指示异步发送操作完成状态的代码。

nvds_msgapi_subscribe():通过订阅主题来消费数据#

NvDsMsgApiErrorType nvds_msgapi_subscribe (NvDsMsgApiHandle h_ptr, char ** topics, int num_topics, nvds_msgapi_subscribe_request_cb_t  cb, void *user_ctx);

此 API 用于订阅主题并从外部实体消费消息。该 API 是异步的,必须使用已创建的有效 Kafka 连接句柄作为参数来调用。调用者还必须提供指向回调函数的指针,以接收来自连接端点的已消费消息,以及可选的 user_ctx 指针以指定用户上下文

参数

  • h_ptr:连接的句柄,通过调用 nvds_msgapi_connect() 获得

  • topics:指向主题名称字符数组的 2d 指针

  • num_topics:要订阅的主题数量

  • cb:指向回调函数的指针,用于接收订阅主题上已消费的消息的通知

  • user_ctx:要传递给回调以获取上下文的用户指针

指定为订阅 API 中的参数的回调函数的指针的类型为 nvds_msgapi_subscribe_request_cb_t,定义为

    typedef void (*nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr);

where the callback’s parameters are:

* ``flag``: To specify the error status of message consumed
* ``msg``: Consumed message / payload
* ``msg_len``: Length of message in bytes
* ``topic``: Topic name where the message was received
* ``user_ptr``: pointer passed during subscribe() for context

nvds_msgapi_do_work():适配器逻辑的增量执行#

void nvds_msgapi_do_work();

协议适配器必须在处理 nvds_msgapi_send()nvds_msgapi_send_async() 调用的过程中定期将控制权让给客户端。客户端必须定期调用 nvsd_msgapi_do_work(),以使协议适配器恢复执行。这确保了协议适配器接收到足够的 CPU 资源。客户端可以使用此约定来控制协议适配器对多线程和线程调度的使用。如果底层协议需要,协议适配器可以使用它来支持心跳功能。

当协议适配器在客户端线程中执行时,需要 nvds_msgapi_do_work() 约定。或者,协议适配器可以在其自己的线程中执行耗时的操作。在这种情况下,协议适配器无需将控制权让给客户端,客户端无需调用 nvsd_msgapi_do_work(),并且 nvds_msgapi_do_work() 的实现可以是空操作。协议适配器的文档必须指定客户端是否必须调用 nvds_msgapi_do_work(),以及何时调用。

nvds_msgapi_disconnect():终止连接#

NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr);

如果底层协议需要,该函数将终止连接,并释放与 h_ptr 关联的资源。

参数

  • h_ptr:连接的句柄,通过调用 nvds_msgapi_connect() 获得。

nvds_msgapi_getversion():获取版本号#

char *nvds_msgapi_getversion();

此函数返回一个字符串,用于标识此协议适配器实现支持的 nvds_msgapi 版本。该字符串必须使用格式 <major>.<minor>,其中 <major> 是主版本号,<minor> 是次版本号。主版本号的更改表示 API 更改,这可能会导致不兼容。当主版本号更改时,次版本号将重置为 1。

nvds_msgapi_get_protocol_name():获取协议名称#

char *nvds_msgapi_get_protocol_name(void);

此函数返回一个字符串,用于标识适配器库中使用的底层协议。例如,"KAFKA”“AMQP”“AZURE_DEVICE_CLIENT”“AZURE_MODULE_CLIENT”

nvds_msgapi_connection_signature():获取连接签名#

NvDsMsgApiErrorType nvds_msgapi_connection_signature(char *broker_str, char *cfg, char *output_str, int max_len);

此函数返回一个字符串,该字符串唯一标识传递给适配器库以建立连接的连接参数的签名。如果成功,则返回连接签名字符串(由 SHA256 生成)。如果出现错误或连接参数无效,则返回空字符串 ""。

参数

  • broker_str:用于创建连接的 Broker 连接字符串

  • cfg:配置文件的路径

  • output_str:输出连接签名

  • max_len:output_str 的最大长度

nvds_kafka_proto:Kafka 协议适配器#

DeepStream 包括一个支持 Apache Kafka 的协议适配器。该适配器为 DeepStream 应用程序提供了开箱即用的功能,可以将消息发布到 Kafka brokers。

安装依赖项#

Kafka 适配器使用 librdkafka 进行底层协议实现。必须先安装此库才能使用。要安装 librdkakfa,请输入以下命令

git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
git checkout tags/v2.2.0
./configure --enable-ssl
make
sudo make install
sudo cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream/lib/
sudo ldconfig

安装其他依赖项

sudo apt-get install libglib2.0 libglib2.0-dev
sudo apt-get install libjansson4 libjansson-dev

使用适配器#

您可以通过将 Gst-nvmsgbroker 插件的 proto-lib 属性设置为适配器的共享库 libnvds_kafka_proto.so 的路径名,在应用程序中使用 Kafka 适配器。插件的 conn-str 属性必须设置为格式为以下内容的字符串

<kafka broker address>;<port>

这将实例化 Gst-nvmsgbroker 插件,并使其使用 Kafka 协议适配器来发布应用程序发送到指定 broker 地址和主题的 broker 的消息。

配置协议设置#

您可以按照 edenhill/librdkafka 的文档中所述,定义 Kafka 协议适配器的配置设置。您可以在 Gst-nvmsgbroker 配置文件中设置这些选项。与 DeepStream 的其余部分一样,配置文件使用 gkey 格式。Kafka 设置必须位于名为 [message-broker] 的组中,并且必须指定为名为 proto-cfg 的键的一部分。这些设置可以是以分号分隔的一系列键值对。例如

[message-broker]
proto-cfg="message.timeout.ms=2000;retries=5"
consumer-group-id = groupid
partition-key = keyid

Kafka 适配器允许您指定消息中要用于定义分区键的字段的名称。对于每条消息,都会提取指定的消息字段,并将其与消息一起发送到主题分区器。分区器使用它来识别处理消息的 Kafka 集群中的分区。分区键信息必须在使用名为 partition-key 的条目的 Gst-nvmsgbroker 配置文件的 [message-broker] 组中指定。使用点表示法指定嵌入在 JSON 对象层次结构中的字段。例如,对于下面显示的示例 JSON 消息,传感器对象中的 id 字段被标识为 sensor.id

{
 "sensor" {
 "id": "cam1"
 }
}

此外,Kafka 适配器允许您指定消费者组 ID。消费者组是一个字符串,用于唯一标识此 Kafka 消费者所属的消费者进程组。键名 consumer-group-id 可以在 Gst-nvmsgbroker 配置文件的 [message-broker] 组中指定。如果未指定此字段,则将使用默认消费者组名称“test-consumer-group”。

注意

对于 DeepStream 参考应用程序和随 DeepStream SDK 分发的 360 D 应用程序,您可以将 proto-cfg 设置添加到传递给应用程序的顶级配置文件的 [message-broker] 组中。

程序化集成#

您可以使用 nvds_msgapi 接口调用其函数,将 Kafka 适配器集成到自定义用户代码中。请注意有关接口定义的函数的以下几点

  • 传递给 nvdm_msgapi_connect() 的连接字符串的格式为 <kafka broker address>;<port>

  • 对于“发送”函数,主题名称必须作为参数传递给 nvds_msgapi_send()nvds_msgapi_send_async()

  • 对于订阅 API,必须将指向主题名称字符数组的 2D 指针作为参数传递。此外,必须提供指向用户回调函数的指针。当有来自远程实体的新消息时,Kafka 消费者将通过调用用户回调函数将消息转发给应用程序。

  • 应用程序必须至少每秒调用一次 nvds_msgapi_do_work(),最好更频繁地调用。调用 nvds_msgapi_do_work() 的频率决定了处理等待发送的消息的速率。

  • 多个应用程序线程共享连接句柄是安全的。librdkafka 库是线程安全的,因此 Kafka 协议适配器不需要为直接调用此库的函数实现单独的锁定机制。

  • Kafka 协议适配器希望客户端管理连接句柄的使用和停用。客户端必须确保一旦句柄断开连接,就不能将其用于“发送”调用或调用 nvds_msgapi_do_work()。虽然库尝试确保在应用程序使用已停用的句柄调用这些函数时正常失败,但它不会以线程安全的方式执行此操作。

Kafka 的安全性#

要了解有关 Kafka 安全性的更多信息,请参阅 NVIDIA DeepStream SDK 开发者指南 6.0 版本 中的 安全边缘到云消息传递 部分。 监控适配器执行 Kafka 适配器基于 nvds_logger 框架生成日志消息,以帮助您监控执行情况。适配器为 INFO、DEBUG 和 ERROR 严重级别生成单独的日志,如 nvds_logger:日志记录框架 中所述。您可以通过设置日志记录设置脚本中过滤日志消息的级别来限制生成的日志消息。

注意

如果严重级别设置为 DEBUG,则 nvds_logger 框架会记录 Kafka 协议适配器发送的每条消息的完整内容。

Azure MQTT 协议适配器库#

DeepStream 包括协议适配器,这些适配器支持从设备到云的直接消息传递(使用 Azure 设备客户端适配器)以及通过 Azure IoT Edge 运行时(使用 Azure 模块客户端适配器)。这些适配器为 DeepStream 应用程序提供了开箱即用的功能,可以使用 MQTT 协议将消息发布到 Azure IoT Hub。Azure IoT 协议适配器封装在其各自的共享库中,这些共享库位于 DeepStream 包中的 /opt/nvidia/deepstream/deepstream/lib 中。Azure 设备客户端适配器库名为 libnvds_azure_proto.so。Azure 模块客户端适配器库名为 libnvds_azure_edge_proto.so

安装依赖项#

Azure 适配器使用 Azure IoT C SDK (v1.11.0) 中的 libiothub_client.so 进行底层协议实现。安装 DeepStream 包后,您可以在以下位置找到预编译的库:/opt/nvidia/deepstream/deepstream/lib/libiothub_client.so。您也可以通过输入以下命令手动编译 libiothub_client.so

git clone https://github.com/Azure/azure-iot-sdk-c.git
cd azure-iot-sdk-c
git checkout tags/1.11.0
git submodule update --init
mkdir cmake
cd cmake
cmake -Dbuild_as_dynamic:BOOL=ON -Duse_edge_modules:BOOL=ON ..
cmake --build .  # append '-- -j <n>' to run <n> jobs in parallel

要安装其他一些必需的依赖项,请输入以下命令之一。

  • 对于使用 Ubuntu 22.04 的 x86 计算机

    sudo apt-get install -y libcurl3 libssl-dev uuid-dev libglib2.0 libglib2.0-dev
    
  • 对于其他平台或操作系统

    sudo apt-get install -y libcurl4-openssl-dev libssl-dev uuid-dev libglib2.0 libglib2.0-dev
    

设置 Azure IoT#

Azure IoT 适配器需要一个正常运行的 Azure IoT Hub 实例,它可以将消息发布到该实例。要设置 Azure IoT Hub 实例(如果需要),请参阅以下说明:https://docs.microsoft.com/en-us/azure/iot-hub/tutorial-connectivity。创建 Azure IoT 实例后,创建与运行 DeepStream 的设备对应的设备条目。

要在边缘设备上设置 Azure IoT Edge 运行时,请参阅 https://docs.microsoft.com/en-us/azure/iot-edge/how-to-install-iot-edge-linux 中的说明。

注意

请注意您的 IoT Hub 服务层级的消息速率限制:https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-quotas-throttling#standard-tier-operations。请相应地为 nvmsgbroker 插件设置 sleep-time 属性,以防止节流。

配置适配器设置#

将 Azure IoT 特定信息放置在自定义配置文件中,例如,命名为 cfg_azure.txt。设备客户端和模块客户端的配置文件条目略有不同。

  • 对于 Azure 设备客户端

    [message-broker]
    connection_str = HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;
    custom_msg_properties =  <key1>=<value1>; <key2>=<value2>; <key3>=<value3>;
    
  • 对于 Azure 模块客户端

    [message-broker]
    #custom_msg_properties =  <key1>=<value1>; <key2>=<value2>; <key3>=<value3>;
    

以下是一些关于配置文件属性的有用信息

  • connection_str:您可以从 Azure IoT Hub Web 界面获取 Azure 连接字符串。连接字符串唯一标识与 IoT Hub 实例关联的每个设备。它位于“设备详细信息”部分下的“主连接字符串”条目中。

  • custom_msg_properties:使用此属性将自定义键/值对嵌入从设备发送到 Azure IoT 的 MQTT 消息中。您可以嵌入多个键值对,用分号分隔,如本例所示

    custom_msg_properties = ex2: key1=value1;key2=value2;key3=value3;
    

注意

connection_strcustom_msg_properties 字符串均限制为 512 个字符。

使用适配器#

要在应用程序中使用 Azure 设备客户端适配器,请将 Gst-nvmsgbroker 插件的 proto-lib 属性设置为适配器共享库的路径名 - 设备客户端情况为 libnvds_azure_proto.so,模块客户端情况为 libnvds_azure_edge_proto.so。使用适配器的下一步是指定连接详细信息。指定连接详细信息的过程对于 Azure 设备客户端和模块客户端情况有所不同,如下节所述。

设备客户端适配器的连接详细信息#

将插件的 conn-str 属性设置为完整 Azure 连接字符串,格式如下

HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;SharedAccessKey=<my-policy-key>

或者,您可以在 Azure 配置文件中指定连接字符串详细信息

[message-broker]
connection_str = HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;SharedAccessKey=<my-policy-key>

模块客户端适配器的连接详细信息#

将连接字符串留空,因为 Azure IoT Edge 库会自动从文件 /etc/iotedge/config.yaml 中获取连接字符串。配置连接详细信息后,您可以使用 nvds_msgapi 接口调用其函数,将 Azure 设备客户端和模块客户端适配器集成到自定义用户代码中。请注意关于接口定义的函数的以下几点

  • 传递给 nvds_msgapi_connect() 的连接字符串对于 Azure 设备客户端和模块客户端都可以为 NULL。对于设备客户端,Azure 配置文件有一个选项可以指定连接字符串。对于模块客户端,连接字符串始终在 /etc/iotedge/config.yaml 中指定。

  • “发送”函数都使用 Gst-nvmsgbroker 插件的属性“topic”中指定的主题名称。它可能为空。

  • 应用程序必须在每次调用 nvds_msgapi_send_async() 后调用 nvds_msgapi_do_work()。调用 nvds_msgapi_do_work() 的频率决定了处理等待发送的消息的速率。

  • 多个应用程序线程共享连接句柄是安全的。libiothubclient 库是线程安全的,因此 Azure 协议适配器无需为直接调用此库的函数实现单独的锁定机制。

  • Azure 协议适配器期望客户端管理连接句柄的使用和停用。客户端必须确保一旦句柄断开连接,它就不会用于“发送”调用或调用 nvds_msgapi_do_work()。虽然库尝试确保在应用程序使用已停用的句柄调用这些函数时能够优雅地失败,但它并非以线程安全的方式执行此操作。

监控适配器执行#

Azure 设备客户端和模块客户端使用不同的日志记录机制。

Azure 设备客户端库日志消息#

Azure 设备客户端适配器使用 nvds_logger 框架生成日志消息,这些消息可以帮助您监控执行情况。适配器为 INFO、DEBUG 和 ERROR 严重性级别生成单独的日志,如 nvds_logger:日志记录框架 中所述。您可以通过在日志记录设置脚本中设置过滤日志消息的级别来限制生成的日志消息。

注意

如果严重性级别设置为 DEBUG,nvds_logger 框架将记录 Azure 设备客户端协议适配器发送的每条消息的完整内容。

Azure 模块客户端库日志消息#

来自 Azure 模块客户端适配器库的日志消息会输出到 stdout,并且日志输出会捕获在 docker/iotedge 模块日志中。

消息主题和路由#

您可以在 GStreamer 属性 topic 中指定消息主题。但是,Azure 设备客户端和模块客户端以不同的方式使用 topic 属性。Azure 设备客户端不支持主题。因此,将忽略 topic 属性的值,并且您无法使用它来过滤 Azure IoT Hub 上的消息。Azure 模块客户端使用 topic 属性来确定消息的路由,即消息在系统内如何传递。有关消息路由的更多信息,请参阅:https://docs.microsoft.com/en-us/azure/iot-edge/module-composition#declare-routes)

AMQP 协议适配器#

DeepStream 包含一个 AMQP 协议适配器,DeepStream 应用程序可以直接使用它来发布使用 AMQP 0-9-1 消息协议的消息。AMQP 协议适配器共享库位于 deepstream 包中:/opt/nvidia/deepstream/deepstream/lib/libnvds_amqp_proto.so

安装依赖项#

DeepStream 的 AMQP 协议适配器使用 librabbitmq.so 库。

安装 rabbitmq-c 库

sudo apt-get install librabbitmq-dev

安装 glib 2.0

sudo apt-get install libglib2.0 libglib2.0-dev

AMQP 消息代理#

AMQP 协议与兼容 AMQP 0-9-1 的消息代理通信。如果您还没有正常运行的代理,可以通过安装 rabbitmq-server 包来部署一个,该包可在以下网址获取:https://rabbitmq.cn/install-debian.html 您可以在本地系统或要安装代理的远程计算机上安装此软件包。要安装软件包,请输入命令

sudo apt-get install rabbitmq-server

要确定 rabbitmq 服务是否正在运行,请输入命令

sudo service rabbitmq-server status

如果 rabbitmq 未运行,请输入此命令启动它

sudo service rabbitmq-server start

配置适配器设置#

您可以将 AMQP 协议适配器特定信息放置在自定义配置中,例如,cfg_amqp.txt。以下是在本地计算机上安装的 AMQP 代理的配置文件条目示例

[message-broker]
hostname = localhost
username = guest
password = guest
port = 5672
exchange = amq.topic
topic = topicname
amqp-framesize = 131072
#amqp-heartbeat = 0
#share-connection = 1

配置文件中的属性为

  • hostname:安装 AMQP 代理的主机的主机名

  • username:用于登录代理的用户名 – 即将被弃用,请使用环境变量

  • password:用于登录代理的密码 – 即将被弃用,请使用环境变量

  • port:用于与 AMQP 代理通信的端口

  • exchange:要在其上发布消息的交换机的名称

  • topic:消息主题

  • amqp-framesize:要从此连接向代理请求的线路上的 AMQP 帧的最大大小。4096 是最小值,2^31-1 是最大值,一个好的默认值是 131072 (128KB)。

  • amqp-heartbeat:取消注释 cfg_amqp.txt 中的字段 amqp-heartbeat,并将其值设置为大于 0 的整数以启用心跳协议。

    此整数是代理请求的心跳帧之间的秒数。在错过 2 个帧后,rabbitmq 会认为连接不可达。值为 0 将禁用心跳协议。

  • share-connection:如果您需要生成连接签名,请取消注释 cfg_amqp.txt 中的字段 share-connection,并将其值设置为 1。

    此签名是一个唯一的字符串,通过解析用于建立连接的所有 amqp 连接相关参数生成。取消注释此字段表示创建的连接可以与同一进程中的其他组件共享。

使用适配器#

要在 DeepStream 应用程序中使用 AMQP 协议客户端适配器,请将 Gst-nvmsgbroker 插件的 proto-lib 属性设置为适配器共享库的路径名,libnvds_amqp_proto.so

proto-lib = <path to libnvds_amqp_proto.so>

您可以在 AMQP 适配器特定配置文件(例如,cfg_amqp.txt)中指定 AMQP 连接详细信息,如上所述。这是推荐的方法。AMQP 配置文件的路径由 Gst 属性 config 指定

config = <path to cfg_amqp.txt>

或者,您可以在 Gst 插件的 conn-str 属性中指定 AMQP 协议的主机名、端口号和用户名,并在配置文件中指定密码。在 Gst 属性中

conn-str = hostname;5672;username
config   = <pathname of AMQP configuration file>

在 AMPQ 配置文件中

[message-broker]
password = <password>

您可以设置 Gst-nvmsgbroker 插件的 topic 属性来指定消息主题。

topic =  <topicname>

或者,您可以在 AMQP 配置文件 (cfg_amqp.txt) 中指定主题。在 Gst 属性中,设置

config    = <path to cfg_amqp.txt>

在 AMQP 配置文件中

[message-broker]
Topic = topicname

注意:通过配置文件和连接字符串传入用户名和密码的选项即将被弃用。请按照以下步骤在环境变量中设置用户凭据。弃用后,唯一可接受的连接字符串格式将为“url;port”。

用户身份验证#

要设置用户名和密码,请导出以下环境变量

export USER_AMQP=username
export PASSWORD_AMQP=password

如果用户名和密码也在配置中设置,适配器将根据环境变量而不是配置进行设置。

程序化集成#

配置连接后,您可以使用 nvds_msgapi 接口调用其函数,将 AMQP 协议适配器集成到您的应用程序中。请注意关于接口定义的函数的以下几点

  • 传递给 nvds_msgapi_connect() 的连接字符串格式为 ``Hostname;<port>;username``。

  • 对于“发送”函数,主题名称由 Gst-nvmsgbroker 插件的 topic 属性或 AMQP 配置文件中的 topic 参数指定。

  • 应用程序必须在每次调用 nvds_msgapi_send_async() 后调用 nvds_msgapi_do_work()。调用 nvds_msgapi_do_work() 的频率决定了处理等待发送的消息的速率。

AMQP 协议适配器期望客户端管理连接句柄的使用和停用。客户端必须确保一旦句柄断开连接,它就不会用于“发送”调用或调用 nvds_msgapi_do_work()。虽然库尝试确保优雅地失败,但如果应用程序使用已停用的句柄调用这些函数,则它并非以线程安全的方式执行此操作。

注意

正如 alanxz/rabbitmq-c 中所述,您不能在使用 librabbitmq 库的线程之间共享套接字、amqp_connection_state_t 或通道。此库专为事件驱动的单线程应用程序设计,尚不满足线程应用程序的要求。为了解决此限制,您的应用程序必须为每个线程打开一个 AMQP 连接(以及一个关联的套接字)。如果需要从多个线程访问单个 AMQP 连接或其任何通道,则必须实现适当的锁定机制。通常,为每个线程分配专用连接更简单。

监控适配器执行#

AMQP 协议适配器使用 nvds_logger 框架生成日志消息,这些消息可以帮助您监控执行情况。适配器为 INFO、DEBUG 和 ERROR 严重性级别生成单独的日志,如 nvds_logger:日志记录框架中所述。您可以通过在日志记录设置脚本中设置过滤日志消息的级别来限制生成的日志消息。

注意

如果严重性级别设置为 DEBUG,nvds_logger 将记录 AMQP 协议适配器发送的每条消息的完整内容。

REDIS 协议适配器#

DeepStream 包含一个 REDIS 协议适配器,DeepStream 应用程序可以直接使用它来发布使用 REDIS 流的消息 https://redis.ac.cn/topics/streams-intro REDIS 协议适配器共享库位于 deepstream 包中:/opt/nvidia/deepstream/deepstream/lib/libnvds_redis_proto.so

安装依赖项#

DeepStream 的 REDIS 协议适配器使用 libhiredis.so 库,该库从 Hiredis v1.0.2 构建。

构建具有安装说明的依赖项

  • libhiredis

    git clone https://github.com/redis/hiredis.git
    cd hiredis
    git checkout tags/v1.2.0
    make USE_SSL=1
    sudo cp libhiredis* /opt/nvidia/deepstream/deepstream/lib/
    sudo ln -sf /opt/nvidia/deepstream/deepstream/lib/libhiredis.so /opt/nvidia/deepstream/deepstream/lib/libhiredis.so.1.1.0
    sudo ldconfig
    

    请注意,为了使用 TLS/SSL 安全性,请确保构建 libhiredis 时启用了 SSL 支持,方法是使用 README 中指定的 USE_SSL 选项:redis/hiredis

  • glib 2.0

    apt-get install libglib2.0 libglib2.0-dev
    

REDIS 服务器#

  • 在您的机器上安装并设置 redis-server。按照此处的说明操作并下载 redis 版本 (6.0.8):https://redis.ac.cn/download

    wget http://download.redis.io/releases/redis-6.0.8.tar.gz
    tar xzf redis-6.0.8.tar.gz
    cd redis-6.0.8
    make
    
  • 运行服务器

    src/redis-server &
    

配置适配器设置#

您可以将 REDIS 协议适配器特定信息放置在自定义配置文件中,例如,cfg_redis.txt。以下是在本地计算机上安装的 redis 服务器的配置文件条目示例

[message-broker]
hostname=localhost
port=6379
password=password
payloadkey=metadata
consumergroup=mygroup
consumername=myname
streamsize=10000

配置文件中的属性为

用户身份验证#

在 redis 服务器配置文件中启用 requirepass 以在代理中设置密码(请参阅 https://redis.ac.cn/docs/latest/operate/oss_and_stack/management/security/)。

通过设置以下环境变量在适配器中设置密码

export PASSWORD_REDIS=password

如果密码也在配置中设置,适配器将根据环境变量而不是配置进行设置。

使用适配器#

要在 DeepStream 应用程序中使用 REDIS 协议客户端适配器,请将 Gst-nvmsgbroker 插件的 proto-lib 属性设置为适配器共享库的路径名,libnvds_redis_proto.so

proto-lib = <path to libnvds_redis_proto.so>

您可以在 REDIS 适配器特定配置文件(例如,cfg_redis.txt)中指定 REDIS 连接详细信息,如上所述。这是推荐的方法。REDIS 配置文件的路径由 Gst 属性 config 指定

config = <path to cfg_redis.txt>

或者,您可以在 Gst 插件的 conn-str 属性中指定 REDIS 协议的主机名、端口号,并在配置文件中指定密码。在 Gst 属性中

conn-str = hostname;6379
config   = <pathname of REDIS configuration file>

您可以设置 Gst-nvmsgbroker 插件的 topic 属性来指定消息主题。

topic =  <redis streamname>

程序化集成#

配置连接后,您可以使用 nvds_msgapi 接口调用其函数,将 REDIS 协议适配器集成到您的应用程序中。请注意关于接口定义的函数的以下几点

  • 传递给 nvds_msgapi_connect() 的连接字符串格式为 ``Hostname;<port>``。

  • 对于“发送”函数,主题名称由 Gst-nvmsgbroker 插件的 topic 属性指定。

  • 应用程序必须在每次调用 nvds_msgapi_send_async() 后调用 nvds_msgapi_do_work()。调用 nvds_msgapi_do_work() 的频率决定了处理等待发送的消息的速率。

REDIS 协议适配器期望客户端管理连接句柄的使用和停用。客户端必须确保一旦句柄断开连接,它就不会用于“发送”调用或调用 nvds_msgapi_do_work()。虽然库尝试确保优雅地失败,但如果应用程序使用已停用的句柄调用这些函数,则它并非以线程安全的方式执行此操作。

监控适配器执行#

REDIS 协议适配器使用 nvds_logger 框架生成日志消息,这些消息可以帮助您监控执行情况。适配器为 INFO、DEBUG 和 ERROR 严重性级别生成单独的日志,如 nvds_logger:日志记录框架中所述。您可以通过在日志记录设置脚本中设置过滤日志消息的级别来限制生成的日志消息。

注意

如果严重性级别设置为 DEBUG,nvds_logger 将记录 REDIS 协议适配器发送的每条消息的完整内容。

MQTT 协议适配器#

DeepStream 6.4 包括 MQTT 协议适配器的发布,DeepStream 应用程序可以直接使用它来发布使用 MQTT V5.0 协议的消息。MQTT 协议适配器共享库位于 deepstream 包中:/opt/nvidia/deepstream/deepstream/lib/libnvds_mqtt_proto.so

安装依赖项#

DeepStream 的 MQTT 协议适配器使用 libmosquitto.so 库,该库从 mosquitto 2.0.15 构建。

  • glib 2.0

    sudo apt-get install libglib2.0 libglib2.0-dev
    
  • ssl

    sudo apt-get install libssl-dev
    
  • cjson

    sudo apt-get install libcjson-dev
    
  • mosquitto 2.0.15

    wget https://mosquitto.org/files/source/mosquitto-2.0.15.tar.gz
    tar -xvf mosquitto-2.0.15.tar.gz
    cd mosquitto-2.0.15
    make
    make install
    sudo cp /usr/local/lib/libmosquitto* /opt/nvidia/deepstream/deepstream/lib/
    sudo ldconfig
    

Eclipse Mosquitto#

Eclipse mosquitto 是一个轻量级和开源消息代理,实现了 MQTT 协议。在此处阅读更多相关信息:https://mosquitto.org/。代理、mosquitto_pub 和 mosquitto_sub 命令行客户端以及协议适配器依赖的 C 库都通过上一节中的说明安装。

运行 mosquitto 代理

adduser --system mosquitto
mosquitto

要使用特定的 mosquitto 代理配置文件运行

mosquitto -c <path_to_config_file>

例如

mosquitto -c /etc/mosquitto/mosquitto.conf

启用身份验证

  1. 创建一个简单的文本文件,并输入用户名和密码对,每行一个,如下所示

    user1:password1
    user2:password2
    
  2. 使用以下命令加密密码文件: :: mosquitto_passwd -U <密码文件路径>

  3. 更改 /etc/mosquitto/mosquitto.conf 中的以下配置(或编写新的配置文件)

    allow_anonymous false
    password_file <path_to_password_file>
    
  4. 使用更新的配置文件运行 mosquitto 代理

要在协议适配器中设置用户和密码

导出以下环境变量

export USER_MQTT=username
export PASSWORD_MQTT=password

如果用户名和密码也在配置中设置,适配器将根据环境变量而不是配置进行设置。

启用 TLS 加密

mosquitto 提供 TLS 支持,用于加密网络连接和身份验证。可以将代理配置为接受 CA 证书、代理服务器证书和代理服务器密钥,并要求尝试连接的客户端进行认证。CA 证书、客户端服务器证书和客户端服务器密钥可以通过适配器配置文件传递给 mqtt 协议适配器。

  1. 更改 /etc/mosquitto/mosquitto.conf 中的以下配置(或编写新的配置文件)

    # At least one of cafile or capath must be defined
    # cafile is a file ending in ".crt" containing PEM encoded CA certificates
    cafile <path to CA crt file>
    # capath is a directory contaning PEM encoded CA certificate files ending in ".pem".
    # For capath to work correctly, the certificates files must have ".pem" as the file ending
    # and you must run "openssl rehash <path to capath>" each time you add/remove a certificate.
    capath <path to directory containing CA certificates>
    
    # Path to the PEM encoded server certificate for the broker.
    certfile <path to broker crt file>
    
    # Path to the PEM encoded keyfile for the broker.
    keyfile <path to broker key file>
    
    # By setting require_certificate to true,
    # the client must provide a valid certificate in order for the network
    # connection to proceed. This allows access to the broker to be controlled
    # outside of the mechanisms provided by MQTT.
    require_certificate true
    
  2. 使用更新的配置文件运行代理

  3. 使用 mosquitto_pub 测试代理配置

    $ mosquitto_pub -p 1883 --cafile <path to CA crt file> --cert <path to client crt file> --key <path to client key file> -h localhost -m hello -t /world
    or
    $ mosquitto_pub -p 1883 --capath <path to directory containing CA certificates> --cert <path to client crt file> --key <path to client key file> -h localhost -m hello -t /world
    
  4. 使用 cafile/capath 更新 mqtt 协议适配器配置。如果在代理配置中 require_certificate 设置为 true,则还必须提供客户端证书文件和客户端密钥文件。

配置适配器设置#

您可以将 MQTT 协议适配器特定信息放置在自定义配置文件中,例如,命名为 cfg_mqtt.txt。以下是在本地计算机上安装的 MQTT 代理的配置文件条目示例

[message-broker]
username = user
password = password
client-id = uniqueID
#enable-tls = 1
#tls-cafile =
#tls-capath =
#tls-certfile =
#tls-keyfile =
#share-connection = 1
#loop-timeout = 2000
#keep-alive = 60

配置文件中的属性为

  • username - 要设置为与 mosquitto 代理进行身份验证的用户名(如果已启用)– 即将被弃用,请使用环境变量

  • password - 要设置为与 mosquitto 代理进行身份验证的密码(如果已启用)– 即将被弃用,请使用环境变量

  • client-id - 用作客户端 ID 的字符串。如果为空,将生成随机客户端 ID。每个客户端 ID 必须是唯一的才能连接到同一代理

  • enable-tls - 0 表示否,1 表示是

    启用 TLS 加密。如果启用,则必须设置 tls-cafile 和 tls-capath 中的至少一个。

  • tls-cafile - 包含 PEM 编码的受信任 CA 证书文件的文件路径。必须提供 cafile 或 capath。

  • tls-capath - 包含 PEM 编码的受信任 CA 证书文件的目录路径。必须提供 cafile 或 capath。

    为了使 capath 正常工作,证书文件必须以“.pem”作为文件结尾,并且每次添加/删除证书时,都必须运行“openssl rehash <capath 路径>”。

  • tls-certfile - 包含此客户端的 PEM 编码证书文件的文件路径。如果未提供,则不会使用客户端证书。

  • tls-keyfile - 包含此客户端的 PEM 编码私钥的文件路径。如果未提供,则不会使用客户端证书。

  • share-connection - 0 表示否,1 表示是

    如果您需要生成连接签名,请取消注释 cfg_mqtt.txt 中的字段 share-connection,并将其值设置为 1。此签名是一个唯一的字符串,通过解析用于建立连接的所有 mqtt 连接相关参数生成。取消注释此字段表示创建的连接可以与同一进程中的其他组件共享。

  • loop-timeout - 传递给 mosquitto_loop() 调用的超时时间(以毫秒为单位),该调用由 nvds_msgapi_do_work() 调用。默认为 2000 毫秒。

    这是指在调用超时之前等待网络活动的最长时间(以毫秒为单位)。设置为 0 表示立即返回。

  • keep-alive - 在此时间段内未交换其他消息的情况下,代理应向客户端发送 PING 消息的秒数。

使用适配器#

要在 DeepStream 应用程序中使用 MQTT 协议客户端适配器,请将 Gst-nvmsgbroker 插件的 proto-lib 属性设置为适配器共享库的路径名,libnvds_mqtt_proto.so

proto-lib = <path to libnvds_mqtt_proto.so>

您可以在 Gst 插件的 conn-str 属性中指定 MQTT 协议的主机名和端口号,并在配置文件中指定用户名和密码。在 Gst 属性中

conn-str = localhost;1883
config   = <pathname of MQTT configuration file>

在 MQTT 配置文件中

[message-broker]
username = user
password = password

您可以设置 Gst-nvmsgbroker 插件的 topic 属性来指定消息主题。

topic =  <topicname>

程序化集成#

配置连接后,您可以使用 nvds_msgapi 接口调用其函数,将 MQTT 协议适配器集成到您的应用程序中。请注意关于接口定义的函数的以下几点

  • 传递给 nvds_msgapi_connect() 的连接字符串格式为 Hostname;<port>

  • 对于 nvds_msgapi_send_async(),主题名称由 Gst-nvmsgbroker 插件的 topic 属性指定。

  • 应用程序必须在每次调用 nvds_msgapi_send_async() 后调用 nvds_msgapi_do_work()。调用 nvds_msgapi_do_work() 的频率决定了处理等待发送的消息的速率。

MQTT 协议适配器期望客户端管理连接句柄的使用和停用。客户端必须确保一旦句柄断开连接,它就不会用于“发送”调用或调用 nvds_msgapi_do_work()。虽然库尝试确保优雅地失败,但如果应用程序使用已停用的句柄调用这些函数,则它并非以线程安全的方式执行此操作。

注意

MQTT 协议适配器不支持同步发送函数 nvds_msgapi_send()。只能调用 nvds_msgapi_send_async() 发送消息。

监控适配器执行#

MQTT 协议适配器使用 nvds_logger 框架生成日志消息,这些消息可以帮助您监控执行情况。适配器为 INFO、DEBUG 和 ERROR 严重性级别生成单独的日志,如 nvds_logger:日志记录框架中所述。您可以通过在日志记录设置脚本中设置过滤日志消息的级别来限制生成的日志消息。

注意

如果严重性级别设置为 DEBUG,nvds_logger 将记录 MQTT 协议适配器发送的每条消息的完整内容。

nv_msgbroker:消息代理接口#

Deepstream 5.0 引入了一个新的消息代理库,可用于与多个外部代理建立连接。此库充当上一节中描述的消息适配器库的包装器,并提供自己的 API。Gst-msgbroker 插件可以选择直接调用适配器库 API 以连接到外部实体,或使用 nvmsgbroker 库接口以同时连接到多个外部实体。此外,Deepstream 6.0 中引入了 nvmsgbroker 库中的自动重连功能,其中定期尝试重新建立与外部实体的连接。此库的源代码现在位于 /opt/nvidia/deepstream/deepstream/sources/libs/nvmsgbroker,自 DeepStream 7.1 起。

自 Deepstream-6.2 起,提供了一个新的配置 work-interval,允许用户设置适配器逻辑执行之间的等待时间。

Gst-nvmsgbroker-nv_msgbroker

Gst-nvmsgbroker 插件可以调用 nvmsgbroker 库中的 Api,如上图所示。Api 支持

  • 创建连接

  • 通过异步方式发送消息

  • 终止连接

  • 获取 nvmsgbroker 库版本号

nvmsgbroker 接口在头文件 /opt/nvidia/deepstream/deepstream/sources/includes/nvmsgbroker.h 中定义。此头文件定义了一组函数指针,这些指针提供类似于 C++ 中接口的接口。以下部分描述了 nvmsgbroker 接口定义的方法。

nv_msgbroker_connect():创建连接#

NvMsgBrokerClientHandle   nv_msgbroker_connect(char *broker_conn_str, char *broker_proto_lib, nv_msgbroker_connect_cb_t connect_cb, char *cfg);

该函数接受连接字符串并配置连接。代理适配器 proto lib 实现可以选择该函数是否建立连接以适应无连接协议(如 HTTP)。

参数

  • broker_conn_str:协议适配器特定的格式的连接字符串

  • broker_proto_lib:消息协议适配器库的完整路径

  • connect_cb:指向与连接关联的事件的回调函数的指针。

  • Cfg:传递给协议适配器的配置文件的路径名

    连接回调

    typedef void (*nv_msgbroker_connect_cb_t)(NvMsgBrokerClientHandle h_ptr, NvMsgBrokerErrorType status );

    其中回调的参数为

    • h_ptr:连接句柄,用于标识发起调用的调用。

    • status:指示连接状态的代码。

返回值

成功时用于后续接口调用的连接句柄,否则为 NULL。

nv_msgbroker_send_async(): 异步发送事件#

NvMsgBrokerErrorType  nv_msgbroker_send_async (NvMsgBrokerClientHandle h_ptr, NvMsgBrokerClientMsg message, nv_msgbroker_send_cb_t cb, void *user_ctx);

此 API 将数据发送到连接的端点。它接受消息主题和消息负载。“发送”操作是异步的;它接受一个回调函数,该函数在“发送”操作完成时被调用。

参数

  • h_ptr:消息代理库的连接句柄

  • message:消息包,其中包含消息负载、负载长度、主题的详细信息

  • cb:回调函数,用于通知发送状态

  • user_ctx:传递给回调函数的上下文指针

    消息包结构

    typedef struct {
    char *topic;
    void *payload;
    size_t payload_len;
    } NvMsgBrokerClientMsg;
    

    发送回调

    typedef void (*nv_msgbroker_send_cb_t)(void *user_ptr,  NvMsgBrokerErrorType flag);

    其中回调的参数为

    • user_ptr:用户指针 (user_ptr),来自调用 nv_msgbroker_send_async() 发起的“发送”操作。 使回调函数能够识别发起调用的来源。

    • flag:指示发送操作完成状态的代码。

nv_msgbroker_subscribe(): 通过订阅主题消费数据#

NvMsgBrokerErrorType nv_msgbroker_subscribe(NvMsgBrokerClientHandle h_ptr, char ** topics, int num_topics,  nv_msgbroker_subscribe_cb_t cb, void *user_ctx);

此 API 用于订阅主题并从外部实体消费消息。此 API 是异步的,必须使用已创建的有效连接句柄作为参数调用。调用者还必须提供指向回调函数的指针,以接收来自连接端点的已消费消息,以及一个可选的 user_ctx 指针,用于指定用户上下文。

参数

  • h_ptr:连接句柄,通过调用 nv_msgbroker_connect() 获得

  • topics:指向主题名称字符数组的 2d 指针

  • num_topics:要订阅的主题数量

  • cb:指向回调函数的指针,用于接收订阅主题上已消费的消息的通知

  • user_ctx:要传递给回调以获取上下文的用户指针

    在订阅 API 中指定为参数的回调函数指针类型为 nv_msgbroker_subscribe_cb_t,定义如下:

    typedef void (*nv_msgbroker_subscribe_cb_t)(NvMsgBrokerErrorType flag, void *msg, int msglen, char *topic, void *user_ptr);

    其中回调的参数为

    • flag:用于指定消费消息的错误状态

    • msg:已消费的消息/负载

    • msg_len:消息长度,以字节为单位

    • topic:接收到消息的主题名称

    • user_ptr:在 subscribe() 期间传递的上下文指针

nv_msgbroker_disconnect(): 终止连接#

NvMsgBrokerErrorType nv_msgbroker_disconnect(NvMsgBrokerClientHandle h_ptr);

如果底层协议需要,该函数将终止连接,并释放与 h_ptr 关联的资源。

参数

  • h_ptr:连接句柄,通过调用 nv_msgbroker_connect() 获得

nv_msgbroker_version(): 获取版本号#

char *nv_msgbroker_version();

此函数返回一个字符串,用于标识此协议适配器实现支持的 nv_msgbroker 版本。该字符串必须使用 <major>.<minor> 格式,其中 <major> 是主版本号,<minor> 是次版本号。主版本号的更改表示 API 更改,可能会导致不兼容。当主版本号更改时,次版本号将重置为 1。

自动重连功能#

nvmsgbroker 库具有自动重连功能,其中在与端点的网络连接断开后,会定期尝试与外部实体重新连接。cfg_nvmsgbroker.txt 中列出了 nvmsgbroker 库使用此功能适用的配置。

(0):disable
(1):enable
auto-reconnect=1

#connection retry interval in seconds
retry-interval=<value>

#connection max retry limit in seconds
max-retry-limit=<value>

#interval at which to perform work, in microseconds
work-interval=10000

注意

自动重连功能目前不支持与 MQTT 协议适配器一起使用。

工作间隔配置#

用户现在可以通过更改 cfg_nvmsgbroker.txt 中的 work-interval 配置来设置协议适配器执行工作的间隔(以微秒为单位)。默认值为 10000 微秒,即 10 毫秒。

...
#interval at which to perform work, in microseconds
work-interval=10000

nvds_logger:日志记录框架#

DeepStream 提供了一个名为 nvds_logger 的日志记录框架。Kafka 协议适配器使用此框架生成运行时日志。nvds_logger 基于 syslog,并提供许多相关功能,包括:

  • 优先级选择(日志级别)

  • 日志过滤和重定向

  • 跨不同并发运行的 DeepStream 实例的共享日志记录

  • 使用 logrotate 进行日志轮换和管理

  • 跨平台支持

启用日志记录#

要启用日志记录,请运行 setup_nvds_logger.sh 脚本。请注意,此脚本必须使用 sudo 运行。您可能需要修改与此脚本关联的权限以使其可执行。该脚本接受一个可选参数,用于指定要写入的日志文件的路径名。默认情况下,路径名为 /tmp/nvds/ds.log。启用日志记录后,您可以通过读取日志文件来访问生成的日志消息。默认情况下,您必须具有 sudo 权限才能读取日志文件。基于 syslog 的日志配置的标准技术可以消除此要求。

过滤日志#

nvds_logger 允许日志与类似于 syslog 提供的严重性级别相关联。您可以通过修改设置脚本来根据严重性级别过滤日志消息。默认情况下,该脚本启用 INFO 级别(级别 6)及以上级别的消息的日志记录。您可以按照脚本注释中的说明进行修改:# Modify log severity level as required and rerun this script

#              0       Emergency: system is unusable
#              1       Alert: action must be taken immediately
#              2       Critical: critical conditions
#              3       Error: error conditions
#              4       Warning: warning conditions
#              5       Notice: normal but significant condition
#              6       Informational: informational messages
#              7       Debug: debug-level messages
# refer https://tools.ietf.org/html/rfc5424.html for more information

echo "if (\$msg contains 'DSLOG') and (\$syslogseverity <= 6) then $nvdslogfilepath" >> 11-nvds.conf

轮换和管理日志#

建议您通过定期轮换日志文件来限制其大小。logrotate 是用于此目的的常用实用程序。您可以在 cron 作业中使用它,以便定期自动存档日志文件,并在所需的时间间隔后丢弃它们。

生成日志#

您可以通过在源代码中包含 /opt/nvidia/deepstream/deepstream/sources/includes/nvds_logger.h 并链接到 libnvds_logger.so 库来实现使用 logger 的模块。以编程方式生成日志涉及三个步骤:

  1. 在写入任何日志消息之前,调用 nvds_log_open()

  2. 调用 nvds_log() 以写入日志消息。

  3. 完成后调用 nvds_log_close() 以刷新并关闭日志。

请注意,nvds_logger 是一种基于进程的日志记录机制,因此建议的步骤是从主应用程序例程而不是各个插件调用 nvds_log_open()。 同样,在主应用程序关闭应用程序退出之前,从主应用程序调用 nvds_log_close()