示例¶
本节中描述的示例的源代码可在 NVSHMEM 包的 examples 文件夹中找到。
基于属性的初始化示例¶
以下代码显示了 简单移位程序 的 MPI 版本,该程序在 NVSHMEM 编程模型中进行了解释。 它展示了基于属性的 NVSHMEM 初始化 API 的使用,其中 MPI 通信器可用于设置 NVSHMEM。
#include <stdio.h>
#include "mpi.h"
#include "nvshmem.h"
#include "nvshmemx.h"
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] CUDA failed with %s \n", \
__FILE__, __LINE__, cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void simple_shift(int *destination) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
nvshmem_int_p(destination, mype, peer);
}
int main (int argc, char *argv[]) {
int mype_node, msg;
cudaStream_t stream;
int rank, nranks;
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr = NVSHMEMX_INIT_ATTR_INITIALIZER;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
int *destination = (int *) nvshmem_malloc (sizeof(int));
simple_shift<<<1, 1, 0, stream>>>(destination);
nvshmemx_barrier_all_on_stream(stream);
CUDA_CHECK(cudaMemcpyAsync(&msg, destination, sizeof(int),
cudaMemcpyDeviceToHost, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("%d: received message %d\n", nvshmem_my_pe(), msg);
nvshmem_free(destination);
nvshmem_finalize();
MPI_Finalize();
return 0;
}
以下代码显示了 简单移位程序 的唯一 ID 版本,该程序在 NVSHMEM 编程模型中进行了解释。 它展示了基于属性的 NVSHMEM 初始化 API 的使用,其中唯一 ID 参数可用于设置 NVSHMEM。
#include <stdio.h>
#include "mpi.h"
#include "nvshmem.h"
#include "nvshmemx.h"
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] CUDA failed with %s \n", \
__FILE__, __LINE__, cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void simple_shift(int *destination) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
nvshmem_int_p(destination, mype, peer);
}
int main (int argc, char *argv[]) {
int mype_node, msg;
cudaStream_t stream;
int rank, nranks;
nvshmemx_init_attr_t attr = NVSHMEMX_INIT_ATTR_INITIALIZER;
nvshmemx_uniqueid_t id = NVSHMEMX_UNIQUEID_INITIALIZER;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
// PE 0 queries the unique ID
if (rank == 0) {
nvshmemx_get_uniqueid(&id);
}
// PE 0 broadcast the unique ID to all peers
MPI_Bcast(&id, sizeof(nvshmemx_uniqueid_t), MPI_UINT8_T, 0, MPI_COMM_WORLD);
nvshmemx_set_attr_uniqueid_args(rank, nranks, &id, &attr);
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_UNIQUEID, &attr);
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
int *destination = (int *) nvshmem_malloc (sizeof(int));
simple_shift<<<1, 1, 0, stream>>>(destination);
nvshmemx_barrier_all_on_stream(stream);
CUDA_CHECK(cudaMemcpyAsync(&msg, destination, sizeof(int),
cudaMemcpyDeviceToHost, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("%d: received message %d\n", nvshmem_my_pe(), msg);
nvshmem_free(destination);
nvshmem_finalize();
MPI_Finalize();
return 0;
}
集合启动示例¶
以下代码显示了基于单环归约的示例实现,其中代码的多次迭代(包括计算、通信和同步)表示为单个内核。
此示例还演示了 NVSHMEM 集合启动的使用,当从 CUDA 内核内部使用 NVSHMEM 同步 API 时,这是必需的。
此示例没有 MPI 依赖项。 NVSHMEM 可用于移植现有的 MPI 应用程序和开发新的应用程序。
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
#define NVSHMEM_CHECK(stmt) \
do { \
int result = (stmt); \
if (NVSHMEMX_SUCCESS != result) { \
fprintf(stderr, "[%s:%d] nvshmem failed with error %d \n", __FILE__, __LINE__, \
result); \
exit(-1); \
} \
} while (0)
__global__ void reduce_ring(int *target, int mype, int npes) {
int peer = (mype + 1) % npes;
int lvalue = mype;
for (int i = 1; i < npes; i++) {
nvshmem_int_p(target, lvalue, peer);
nvshmem_barrier_all();
lvalue = *target + mype;
nvshmem_barrier_all();
}
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
// application picks the device each PE will use
CUDA_CHECK(cudaSetDevice(mype_node));
int *u = (int *)nvshmem_calloc(1, sizeof(int));
int *h = (int *)calloc(1, sizeof(int));
void *args[] = {&u, &mype, &npes};
dim3 dimBlock(1);
dim3 dimGrid(1);
NVSHMEM_CHECK(
nvshmemx_collective_launch((const void *)reduce_ring, dimGrid, dimBlock, args, 0, 0));
CUDA_CHECK(cudaDeviceSynchronize());
cudaMemcpy(h, u, sizeof(int), cudaMemcpyDeviceToHost);
printf("results on device [%d] is %d \n",mype, h[0]);
nvshmem_free(u);
free(h);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
流上示例¶
以下示例展示了如何使用 nvshmemx_*_on_stream
函数将 SHMEM 操作排队到 CUDA 流上,以便按流顺序执行。 具体来说,该示例展示了以下内容
- 如何使集合 SHMEM 归约操作等待流中前面的内核。
- 如何使内核等待来自先前集合 SHMEM 归约操作的通信结果。
该示例展示了减轻 CPU 对 GPU 计算和通信控制的一种用例。
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#define THRESHOLD 42
#define CORRECTION 7
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void accumulate(int *input, int *partial_sum) {
int index = threadIdx.x;
if (0 == index) *partial_sum = 0;
__syncthreads();
atomicAdd(partial_sum, input[index]);
}
__global__ void correct_accumulate(int *input, int *partial_sum, int *full_sum) {
int index = threadIdx.x;
if (*full_sum > THRESHOLD) {
input[index] = input[index] - CORRECTION;
}
if (0 == index) *partial_sum = 0;
__syncthreads();
atomicAdd(partial_sum, input[index]);
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
int *input;
int *partial_sum;
int *full_sum;
int input_nelems = 512;
int to_all_nelems = 1;
cudaStream_t stream;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
input = (int *)nvshmem_malloc(sizeof(int) * input_nelems);
partial_sum = (int *)nvshmem_malloc(sizeof(int));
full_sum = (int *)nvshmem_malloc(sizeof(int));
accumulate<<<1, input_nelems, 0, stream>>>(input, partial_sum);
nvshmemx_int_sum_reduce_on_stream(NVSHMEM_TEAM_WORLD, full_sum, partial_sum, to_all_nelems, stream);
correct_accumulate<<<1, input_nelems, 0, stream>>>(input, partial_sum, full_sum);
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("[%d of %d] run complete \n", mype, npes);
CUDA_CHECK(cudaStreamDestroy(stream));
nvshmem_free(input);
nvshmem_free(partial_sum);
nvshmem_free(full_sum);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
线程组示例¶
本节中的示例展示了当块中的所有线程都依赖于先前的通信操作的结果时,如何使用 nvshmemx_collect32_block
来利用线程加速 SHMEM 收集操作。 对于此实例,部分向量和在不同的 PE 之间计算,并进行 SHMEM 收集操作以获得跨 PE 的完整和。
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#define NTHREADS 512
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void distributed_vector_sum(int *x, int *y, int *partial_sum, int *sum,
int use_threadgroup, int mype, int npes) {
int index = threadIdx.x;
int nelems = blockDim.x;
partial_sum[index] = x[index] + y[index];
if (use_threadgroup) {
/* all threads realize the entire fcollect operation */
nvshmemx_int_fcollect_block(NVSHMEM_TEAM_WORLD, sum, partial_sum, nelems);
} else {
/* thread 0 realizes the entire fcollect operation */
if (0 == index) {
nvshmem_int_fcollect(NVSHMEM_TEAM_WORLD, sum, partial_sum, nelems);
}
}
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
int *x;
int *y;
int *partial_sum;
int *sum;
int use_threadgroup = 1;
int nthreads = NTHREADS;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
npes = nvshmem_n_pes();
mype = nvshmem_my_pe();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
x = (int *)nvshmem_malloc(sizeof(int) * nthreads);
y = (int *)nvshmem_malloc(sizeof(int) * nthreads);
partial_sum = (int *)nvshmem_malloc(sizeof(int) * nthreads);
sum = (int *)nvshmem_malloc(sizeof(int) * nthreads * npes);
void *args[] = {&x, &y, &partial_sum, &sum, &use_threadgroup, &mype, &npes};
dim3 dimBlock(nthreads);
dim3 dimGrid(1);
nvshmemx_collective_launch((const void *)distributed_vector_sum, dimGrid, dimBlock, args, 0, 0);
CUDA_CHECK(cudaDeviceSynchronize());
printf("[%d of %d] run complete \n", mype, npes);
nvshmem_free(x);
nvshmem_free(y);
nvshmem_free(partial_sum);
nvshmem_free(sum);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
块放置示例¶
在以下示例中,块 0 中的每个线程都调用 nvshmemx_float_put_block
。 或者,每个线程都可以调用 nvshmem_float_p
,但是 nvshmem_float_p
有一个缺点,即当目标 GPU 通过 InfiniBand 连接时,每个元素都有一个 RMA 消息,这可能不利于性能。
在这种情况下使用 nvshmem_float_put
的缺点是,当目标 GPU 通过 P2P 连接时,单个线程会将整个数据复制到目标 GPU。 而 nvshmemx_float_put_block
可以利用块中的所有线程将数据并行复制到目标 GPU。
#include <stdio.h>
#include <assert.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
#define THREADS_PER_BLOCK 1024
__global__ void set_and_shift_kernel(float *send_data, float *recv_data, int num_elems, int mype,
int npes) {
int thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
/* set the corresponding element of send_data */
if (thread_idx < num_elems) send_data[thread_idx] = mype;
int peer = (mype + 1) % npes;
/* Every thread in block 0 calls nvshmemx_float_put_block. Alternatively,
every thread can call shmem_float_p, but shmem_float_p has a disadvantage
that when the destination GPU is connected via IB, there will be one rma
message for every single element which can be detrimental to performance.
And the disadvantage with shmem_float_put is that when the destination GPU is p2p
connected, it cannot leverage multiple threads to copy the data to the destination
GPU. */
int block_offset = blockIdx.x * blockDim.x;
nvshmemx_float_put_block(recv_data + block_offset, send_data + block_offset,
min(blockDim.x, num_elems - block_offset),
peer); /* All threads in a block call the API
with the same arguments */
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
float *send_data, *recv_data;
int num_elems = 8192;
int num_blocks;
nvshmem_init();
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
// application picks the device each PE will use
CUDA_CHECK(cudaSetDevice(mype_node));
send_data = (float *)nvshmem_malloc(sizeof(float) * num_elems);
recv_data = (float *)nvshmem_malloc(sizeof(float) * num_elems);
assert(send_data != NULL && recv_data != NULL);
assert(num_elems % THREADS_PER_BLOCK == 0); /* for simplicity */
num_blocks = num_elems / THREADS_PER_BLOCK;
set_and_shift_kernel<<<num_blocks, THREADS_PER_BLOCK>>>(send_data, recv_data, num_elems, mype,
npes);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
/* Do data validation */
float *host = new float[num_elems];
CUDA_CHECK(cudaMemcpy(host, recv_data, num_elems * sizeof(float), cudaMemcpyDefault));
int ref = (mype - 1 + npes) % npes;
bool success = true;
for (int i = 0; i < num_elems; ++i) {
if (host[i] != ref) {
printf("Error at %d of rank %d: %f\n", i, mype, host[i]);
success = false;
break;
}
}
if (success) {
printf("[%d of %d] run complete \n", mype, npes);
} else {
printf("[%d of %d] run failure \n", mype, npes);
}
nvshmem_free(send_data);
nvshmem_free(recv_data);
nvshmem_finalize();
return 0;
}
环形广播示例¶
在以下示例中,PE 0 通过将消息发送到 PE 1 来广播消息,PE 1 将消息发送到 PE 2,依此类推。 此示例演示了多个 NVSHMEM API,包括使用 nvshmem_fence
来排序通信,以及使用 nvshmem_signal_wait_until
和 nvshmemx_signal_op
进行点对点同步。
#include <stdio.h>
#include <stdint.h>
#include <cuda.h>
#include <nvshmem.h>
#include <nvshmemx.h>
__global__ void ring_bcast(int *data, size_t nelem, int root, uint64_t *psync) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
if (mype == root)
*psync = 1;
nvshmem_signal_wait_until(psync, NVSHMEM_CMP_NE, 0);
if (mype == npes-1) return;
nvshmem_int_put(data, data, nelem, peer);
nvshmem_fence();
nvshmemx_signal_op(psync, 1, NVSHMEM_SIGNAL_SET, peer);
*psync = 0;
}
int main(void) {
size_t data_len = 32;
cudaStream_t stream;
nvshmem_init();
int mype = nvshmem_my_pe();
int mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
cudaSetDevice(mype_node);
cudaStreamCreate(&stream);
int *data = (int *) nvshmem_malloc(sizeof(int) * data_len);
int *data_h = (int *) malloc(sizeof(int) * data_len);
uint64_t *psync = (uint64_t *) nvshmem_calloc(1, sizeof(uint64_t));
for (size_t i = 0; i < data_len; i++)
data_h[i] = mype + i;
cudaMemcpyAsync(data, data_h, sizeof(int) * data_len, cudaMemcpyHostToDevice, stream);
int root = 0;
dim3 gridDim(1), blockDim(1);
void *args[] = { &data, &data_len, &root, &psync };
nvshmemx_barrier_all_on_stream(stream);
nvshmemx_collective_launch((const void *)ring_bcast, gridDim, blockDim, args, 0, stream);
nvshmemx_barrier_all_on_stream(stream);
cudaMemcpyAsync(data_h, data, sizeof(int) * data_len, cudaMemcpyDeviceToHost, stream);
cudaStreamSynchronize(stream);
for (size_t i = 0; i < data_len; i++) {
if (data_h[i] != i)
printf("PE %d error, data[%zu] = %d expected data[%zu] = %d\n",
mype, i, data_h[i], i, (int) i);
}
nvshmem_free(data);
nvshmem_free(psync);
free(data_h);
nvshmem_finalize();
return 0;
}
环形 Allreduce 示例¶
在以下示例中,PE0 从其左邻居接收消息块,执行本地归约,并将结果块发送到其右邻居 (PE1),依此类推。 最终,每个 PE(但最后一个)将其自己的块广播给右邻居。 此示例演示了多个 NVSHMEM API,包括使用 nvshmem_int_put_signal_nbi
和 nvshmem_signal_wait_until
进行点对点通信和同步。
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*
* NVIDIA CORPORATION and its licensors retain all intellectual property
* and proprietary rights in and to this software, related documentation
* and any modifications thereto. Any use, reproduction, disclosure or
* distribution of this software and related documentation without an express
* license agreement from NVIDIA CORPORATION is strictly prohibited.
*
* See COPYRIGHT.txt for license information
*/
/* This example performs an allreduce operation using ring algorithm when
GPUs are connected via remote interconect like IB/RoCE/EFA, etc.
It does ring reduce followed by ring broadcast. We use single threaded put_signal API
as single thread is sufficient for remote transfers. The example is expected
to be performant only when GPUs are connected via remote interconnect. */
#include <stdio.h>
#include <stdint.h>
#include <cuda.h>
#include <nvshmem.h>
#include <nvshmemx.h>
#include <unistd.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
/* atol() + optional scaled suffix recognition: 1K, 2M, 3G, 1T */
static inline int atol_scaled(const char *str, size_t *out) {
int scale, n;
double p = -1.0;
char f;
n = sscanf(str, "%lf%c", &p, &f);
if (n == 2) {
switch (f) {
case 'k':
case 'K':
scale = 10;
break;
case 'm':
case 'M':
scale = 20;
break;
case 'g':
case 'G':
scale = 30;
break;
case 't':
case 'T':
scale = 40;
break;
default:
return 1;
}
} else if (p < 0) {
return 1;
} else
scale = 0;
*out = (size_t)ceil(p * (1lu << scale));
return 0;
}
size_t min_size = 1024 * 1024 * 32;
size_t max_size = min_size * 16;
size_t num_blocks = 32;
size_t threads_per_block = 512;
size_t iters = 4;
size_t warmup_iters = 1;
size_t step_factor = 2;
size_t chunk_size = 262144;
// perform Allreduce using ring
__global__ void ring_reduce(int *dst, const int *src, size_t nreduce, uint64_t *signal,
size_t chunk_size) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
int thread_id = threadIdx.x;
int num_threads = blockDim.x;
int num_blocks = gridDim.x;
int block_idx = blockIdx.x;
size_t elems_per_block = nreduce / num_blocks;
// Change src, dst, nreduce, signal to what this block is going to process
// Each CTA will work independently
if (elems_per_block * (blockIdx.x + 1) > nreduce) return;
src = src + block_idx * elems_per_block;
dst = dst + block_idx * elems_per_block;
nreduce = elems_per_block;
signal = signal + block_idx;
size_t chunk_elems = chunk_size / sizeof(int);
size_t num_chunks = nreduce / chunk_elems;
// reduce phase
for (size_t chunk = 0; chunk < num_chunks; chunk++) {
if (mype != 0) {
if (thread_id == 0) nvshmem_signal_wait_until(signal, NVSHMEM_CMP_GE, chunk + 1);
__syncthreads();
for (size_t i = thread_id; i < chunk_elems; i += num_threads) {
dst[i] = dst[i] + src[i];
}
__syncthreads();
}
if (thread_id == 0)
nvshmem_int_put_signal_nbi(dst, (mype == 0) ? src : dst, chunk_elems, signal, 1,
NVSHMEM_SIGNAL_ADD, peer);
src = src + chunk_elems;
dst = dst + chunk_elems;
}
// Broadcast phase
dst = dst - num_chunks * chunk_elems;
if (thread_id == 0) {
for (size_t chunk = 0; chunk < num_chunks; chunk++) {
if (mype < npes - 1) { // Last pe already has the final result
nvshmem_signal_wait_until(signal, NVSHMEM_CMP_GE,
(mype == 0) ? chunk + 1 : num_chunks + chunk + 1);
}
if (mype < npes - 2)
nvshmem_int_put_signal_nbi(dst, dst, chunk_elems, signal, 1, NVSHMEM_SIGNAL_ADD,
peer);
dst = dst + chunk_elems;
}
*signal = 0; // reset for next iteration
}
}
int main(int argc, char **argv) {
int c;
while ((c = getopt(argc, argv, "b:e:f:n:w:c:t:m:")) != -1) {
switch (c) {
case 'b':
atol_scaled(optarg, &min_size);
break;
case 'e':
atol_scaled(optarg, &max_size);
break;
case 'f':
atol_scaled(optarg, &step_factor);
break;
case 'n':
atol_scaled(optarg, &iters);
break;
case 'w':
atol_scaled(optarg, &warmup_iters);
break;
case 'c':
atol_scaled(optarg, &num_blocks);
break;
case 't':
atol_scaled(optarg, &threads_per_block);
break;
case 'm':
atol_scaled(optarg, &chunk_size);
break;
case '?':
if (optopt == 'c')
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
else if (isprint(optopt))
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
else
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
return 1;
default:
abort();
}
}
size_t min_ints = min_size / sizeof(int);
assert(min_ints % num_blocks == 0);
nvshmem_init();
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
cudaStream_t stream;
cudaEvent_t start, stop;
CUDA_CHECK(cudaEventCreate(&start));
CUDA_CHECK(cudaEventCreate(&stop));
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
size_t max_ints = max_size / sizeof(int);
int *dst = (int *)nvshmem_malloc(max_size);
int *src = (int *)nvshmem_malloc(max_size);
int *data_h = (int *)malloc(max_size);
uint64_t *signal = (uint64_t *)nvshmem_calloc(num_blocks, sizeof(uint64_t));
dim3 gridDim(num_blocks), blockDim(threads_per_block);
for (size_t i = 0; i < max_ints; i++) data_h[i] = i;
CUDA_CHECK(cudaMemcpyAsync(src, data_h, max_size, cudaMemcpyHostToDevice, stream));
nvshmemx_barrier_all_on_stream(stream);
for (size_t size = min_size; size <= max_size; size *= step_factor) {
size_t num_ints = size / sizeof(int);
void *args[] = {&dst, &src, &num_ints, &signal, &chunk_size};
// do warmup
for (size_t i = 0; i < warmup_iters; i++) {
nvshmemx_collective_launch((const void *)ring_reduce, gridDim, blockDim, args, 0,
stream);
nvshmemx_barrier_all_on_stream(stream);
}
CUDA_CHECK(cudaStreamSynchronize(stream));
// main loop
CUDA_CHECK(cudaEventRecord(start, stream));
for (size_t i = 0; i < iters; i++) {
nvshmemx_collective_launch((const void *)ring_reduce, gridDim, blockDim, args, 0,
stream);
nvshmemx_barrier_all_on_stream(stream);
}
CUDA_CHECK(cudaEventRecord(stop, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
if (!mype) {
float ms;
CUDA_CHECK(cudaEventElapsedTime(&ms, start, stop));
printf("%zuB \t %fms\n", size, ms / iters);
}
// validate output
CUDA_CHECK(cudaMemcpy(data_h, dst, size, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < num_ints; i++) {
if (data_h[i] != (int)i * npes)
printf("PE %d error, data[%zu] = %d expected data[%zu] = %d\n", mype, i, data_h[i],
i, (int)i * npes);
}
}
CUDA_CHECK(cudaEventDestroy(start));
CUDA_CHECK(cudaEventDestroy(stop));
nvshmem_free(dst);
nvshmem_free(src);
nvshmem_free(signal);
free(data_h);
nvshmem_finalize();
return 0;
}