NVIPC 集成#

配置#

NVIPC 支持使用 YAML 文件来存储配置。建议 L2 合作伙伴使用 YAML 文件。以下是在 Aerial L1 中使用的配置示例。主进程和辅助进程都应具有相同的 ring_len 和 mempool_size 配置。

transport:

  type: shm

  shm_config:

    primary: 1

    prefix: nvipc # Note: prefix string length should < 32

    cuda_device_id: 0

    ring_len: 8192

    mempool_size:

      cpu_msg:

        buf_size: 8192

        pool_len: 4096

      cpu_data:

        buf_size: 576000

        pool_len: 1024

      cpu_large:

        buf_size: 4096000

        pool_len: 64

      cuda_data:

        buf_size: 307200

        pool_len: 0

      gpu_data:

        buf_size: 576000

        pool_len: 0

  app_config:

    grpc_forward: 0

    debug_timing: 0

    pcap_enable: 0

    pcap_cpu_core: 17 # CPU core of background pcap log save thread

    pcap_cache_size_bits: 29 # 2^29 = 512MB, size of /dev/shm/${prefix}_pcap

    pcap_file_size_bits: 31 # 2^31 = 2GB, max size of /var/log/aerial/${prefix}_pcap. Requires pcap_file_size_bits > pcap_cache_size_bits.

    pcap_max_data_size: 8000 # Max DL/UL FAPI data size to capture reduce pcap size.

初始化#

以下是初始化的参考代码。NVIPC 主进程负责创建和初始化 SHM 池、环形队列。NVIPC 辅助进程查找已创建的池和队列。在 Aerial L1 中是主进程,L2 应配置为辅助进程。

// Create configuration

nv_ipc_config_t config;

// Select module_type for primary or secondary process

nv_ipc_module_t module_type = NV_IPC_MODULE_PRIMARY/SECONDARY;

// Recommended initialization: load nvipc configurations from yaml file

load_nv_ipc_yaml_config(&config, yaml_path, module_type);

// Optional: set default configs and overwrite what need change

config.ipc_transport = *NV_IPC_TRANSPORT_SHM*;

**if**\ (set_nv_ipc_default_config(&config, module_type) < 0) {

NVLOGE(TAG, "%s: set configuration failed\\n", \__func\_\_);

**return** -1;

}

// Override the default configurations

config.transport_config.shm.cuda_device_id = test_cuda_device_id;

// Create IPC interface: nv_ipc_t ipc

nv_ipc_t\* ipc;

**if**\ ((ipc = create_nv_ipc_interface(&config)) == *NULL*) {

NVLOGE(TAG, "%s: create IPC interface failed\\n", \__func\_\_);

**return** -1;

}

成功创建 IPC 接口后,可以在 /dev/shm/ 文件夹中看到一些共享内存文件。例如,如果 <prefix>=”nvipc”

ls -al /dev/shm/nvipc\*

nvipc_shm

nvipc_cpu_msg

nvipc_cpu_data

nvipc_cpu_large

反初始化#

以下示例代码用于反初始化。

**if**\ (ipc->ipc_destroy(ipc) < 0) {

NVLOGE(TAG, "%s close IPC interface failed\\n", \__func\_\_);

}

发送#

发送过程如下

allocate buffers –> fill content –> send.

当填充内容时,对于 CUDA 内存,data_buf 是 CUDA 内存指针,无法在 CPU 内存空间中直接访问。NVIPC API 提供了基本的 memcpy 函数,用于在 CPU 内存和 CUDA 内存之间复制。对于更多 CUDA 操作,用户可以使用 CUDA API 直接访问 GPU 内存缓冲区。

nv_ipc_msg_t send_msg,

send_msg.msg_id = fapi_msg_id; // Optional: FAPI message ID

send_msg.msg_len = fapi_msg_len; // Max length is the MSG buffer size,
configurable

send_msg.data_len = fapi_data_len; // Max length is the MSG buffer size,
configurable

send_msg.data_pool = *NV_IPC_MEMPOOL_CPU_DATA*; // Options: CPU_MSG,
CPU_DATA, CUDA_DATA

// Allocate buffer for TX message

**if**\ (ipc->tx_allocate(ipc, &send_msg, 0) != 0)

{

NVLOGE(TAG, "%s error: allocate buffer failed\\n", \__func\_\_);

**return** -1;

}

// Fill the MSG content

int8_t fapi_msg[SHM_MSG_BUF_SIZE];

memcpy(send_msg.msg_buf, fapi_msg, fapi_len);

// Fill the DATA content if data exist.

int8_t fapi_data[SHM_MSG_DATA_SIZE];

**if** (send_msg.data_pool == *NV_IPC_MEMPOOL_CPU_DATA*) { // CPU_DATA
case

memcpy(send_msg.data_buf, fapi_data, send_msg.data_len);

} **else** **if** (send_msg.data_pool == *NV_IPC_MEMPOOL_CUDA_DATA*) {
// CUDA_DATA case

**if**\ (ipc->cuda_memcpy_to_device(ipc, send_msg.data_buf, fapi_data,
send_msg.data_len) < 0){

NVLOGE(TAG, "%s CUDA copy failed\\n", \__func\_\_);

}

} **else** { // NO_DATA case

// NO data, do nothing

}

// Send the message

**if**\ (ipc->tx_send_msg(ipc, &send_msg) < 0){

NVLOGE(TAG, "%s error: send message failed\\n", \__func\_\_);

// May need future retry or release the send_msg buffers

// If fail, check configuration: ring queue length > memory pool length

}

接收#

发送过程如下

receive –> handle message –> release buffers
nv_ipc_msg_t recv_ms\ *g*

**if**\ (ipc->rx_recv_msg(ipc, &recv_msg) < 0)

{

LOGV(TAG, "%s: no more message available\\n", \__func\_\_);

**return** -1;

}

// Example: Handle MSG part

int8_t fapi_msg[SHM_MSG_BUF_SIZE];

memcpy(fapi_msg, recv_msg.msg_buf, recv_msg.msg_len);

// Example: Handle DATA part

int8_t fapi_data[SHM_MSG_BUF_SIZE];

**if** (recv_msg.data_pool == *NV_IPC_MEMPOOL_CPU_DATA*) { // CPU_DATA
case

memcpy(fapi_data, recv_msg.data_buf, &recv_msg.data_len);

} **else** **if** (recv_msg.data_pool == *NV_IPC_MEMPOOL_CUDA_DATA*) {
// CUDA_DATA case

**if**\ (ipc->cuda_memcpy_to_host(ipc, fapi_data, recv_msg.data_buf,
recv_msg.data_len) < 0){

LOGE(TAG, "%s CUDA copy failed\\n", \__func\_\_);

}

} **else** { // NO_DATA case

// NO data, do nothing

}

**if**\ (ipc->rx_release(ipc, &recv_msg) < 0){

LOGW(TAG, "%s: release error\\n", \__func\_\_);

}

通知#

提供了两种类型的通知 API:信号量样式和 event_fd 样式。每个 NVIPC 进程都可以选择任何类型,无论对等进程选择什么类型,但在一个进程中保持使用一种类型。

在 SHM IPC 库的底层实现了 event_fdevent_fd 实现的包装器是信号量 API 接口。

通过 create_nv_ipc_interface() 成功创建 IPC 接口后,API 即可使用。

对于信号量类型,它很容易使用

  • 接收器

    ipc->tx_tti_sem_wait(ipc);
    
  • 发送器

    ipc->tx_tti_sem_post(ipc);
    

对于 event_fd 样式,用户应获取 fd 并使用 epoll 函数来监听 I/O 事件。

  • 接收器

    **struct** epoll_event ev, events[MAX_EVENTS];
    
    **int** epoll_fd = epoll_create1(0);
    
    **if**\ (epoll_fd == -1)
    
    {
    
    NVLOGE(TAG, "%s epoll_create failed\\n", \__func\_\_);
    
    }
    
    **int** ipc_rx_event_fd = ipc->get_fd(ipc); // IPC notification API:
    get_fd()
    
    ev.\ *events* = *EPOLLIN*;
    
    ev.\ *data*.\ *fd* = ipc_rx_event_fd;
    
    **if**\ (epoll_ctl(epoll_fd, *EPOLL_CTL_ADD*, ev.\ *data*.\ *fd*, &ev)
    == -1)
    
    {
    
    NVLOGE(TAG, "%s epoll_ctl failed\\n", \__func\_\_);
    
    }
    
    **while**\ (1)
    
    {
    
    **int** nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
    
    **if**\ (nfds == -1)
    
    {
    
    NVLOGE(TAG, "epoll_wait notified: *nfds*\ =%d\\n", nfds);
    
    }
    
    **for**\ (**int** n = 0; n < nfds; ++n)
    
    {
    
    **if**\ (events[n].\ *data*.\ *fd* == ipc_rx_event_fd)
    
    {
    
    ipc->get_value(ipc); // IPC notification API: get_value()
    
    // Receive incoming message here
    
    }
    
    }
    
    }
    
    close(epoll_fd);
    
  • 发送器

    ipc->notify(ipc, 1); // IPC notification API: notify()