示例¶
本节中的示例概述了如何在各种环境中使用 NCCL,结合一种或多种技术
- 每个线程/进程使用多个 GPU
- 使用多线程
- 使用多进程 - 多进程示例使用 MPI 作为并行运行时环境,但任何多进程系统都应能够以类似的方式工作。
确保您始终检查 NCCL 函数的返回值。为了清晰起见,以下示例不包含错误检查。
通信器创建和销毁示例¶
以下示例演示了 NCCL 初始化的常见用例。
示例 1:单进程、单线程、多设备¶
在单进程的特定情况下,可以使用 ncclCommInitAll。这是一个为 4 个设备创建通信器的示例,因此,有 4 个通信器对象
ncclComm_t comms[4];
int devs[4] = { 0, 1, 2, 3 };
ncclCommInitAll(comms, 4, devs);
接下来,您可以使用单个线程和组调用,或多个线程(每个线程都提供一个 comm 对象)来调用 NCCL 集体操作。
在程序结束时,所有通信器对象都将被销毁
for (int i=0; i<4; i++)
ncclCommDestroy(comms[i]);
以下代码描述了一个完整的单进程工作示例,该进程管理多个设备
#include <stdlib.h>
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(res)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[])
{
ncclComm_t comms[4];
//managing 4 devices
int nDev = 4;
int size = 32*1024*1024;
int devs[4] = { 0, 1, 2, 3 };
//allocating and initializing device buffers
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaMalloc((void**)sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc((void**)recvbuff + i, size * sizeof(float)));
CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
//initializing NCCL
NCCLCHECK(ncclCommInitAll(comms, nDev, devs));
//calling NCCL communication API. Group API is required when using
//multiple devices per thread
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nDev; ++i)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
//synchronizing on CUDA streams to wait for completion of NCCL operation
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaStreamSynchronize(s[i]));
}
//free device buffers
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for(int i = 0; i < nDev; ++i)
ncclCommDestroy(comms[i]);
printf("Success \n");
return 0;
}
示例 2:每个进程或线程一个设备¶
当进程或主机线程最多负责一个 GPU 时,可以使用 ncclCommInitRank 作为集体调用来创建通信器。每个线程或进程都将获得自己的对象。
以下代码是在 MPI 上下文中使用每个 MPI 秩一个设备创建通信器的示例。
首先,我们检索有关进程的 MPI 信息
int myRank, nRanks;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
接下来,单个秩将创建一个唯一 ID 并将其发送给所有其他秩,以确保每个人都拥有它
ncclUniqueId id;
if (myRank == 0) ncclGetUniqueId(&id);
MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);
最后,我们创建通信器
ncclComm_t comm;
ncclCommInitRank(&comm, nRanks, id, myRank);
我们现在可以使用通信器调用 NCCL 集体操作。
ncclAllReduce( ... , comm);
最后,我们销毁通信器对象
ncclCommDestroy(comm);
以下代码描述了一个完整的 MPI 多进程工作示例,每个进程一个设备
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char* string) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++){
result = ((result << 5) + result) ^ string[c];
}
return result;
}
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank based on hostname which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
ncclUniqueId id;
ncclComm_t comm;
float *sendbuff, *recvbuff;
cudaStream_t s;
//get NCCL unique ID at rank 0 and broadcast it to all others
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//picking a GPU based on localRank, allocate device buffers
CUDACHECK(cudaSetDevice(localRank));
CUDACHECK(cudaMalloc(&sendbuff, size * sizeof(float)));
CUDACHECK(cudaMalloc(&recvbuff, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(&s));
//initializing NCCL
NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));
//communicating using NCCL
NCCLCHECK(ncclAllReduce((const void*)sendbuff, (void*)recvbuff, size, ncclFloat, ncclSum,
comm, s));
//completing NCCL operation by synchronizing on the CUDA stream
CUDACHECK(cudaStreamSynchronize(s));
//free device buffers
CUDACHECK(cudaFree(sendbuff));
CUDACHECK(cudaFree(recvbuff));
//finalizing NCCL
ncclCommDestroy(comm);
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
示例 3:每个线程多个设备¶
您可以结合使用多进程或多线程以及每个进程或线程多个设备。在这种情况下,我们需要使用组语义。
以下示例结合了 MPI 和每个进程(=MPI 秩)多个设备。
首先,我们检索有关进程的 MPI 信息
int myRank, nRanks;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
MPI_Comm_size(MPI_COMM_WORLD, &nRanks);
接下来,单个秩将创建一个唯一 ID 并将其发送给所有其他秩,以确保每个人都拥有它
ncclUniqueId id;
if (myRank == 0) ncclGetUniqueId(&id);
MPI_Bcast(id, sizeof(id), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
然后,我们创建 ngpus 通信器对象,这些对象是更大的 ngpus*nRanks 组的一部分
ncclComm_t comms[ngpus];
ncclGroupStart();
for (int i=0; i<ngpus; i++) {
cudaSetDevice(devs[i]);
ncclCommInitRank(comms+i, ngpus*nRanks, id, myRank*ngpus+i);
}
ncclGroupEnd();
接下来,我们使用单个线程和组调用,或多个线程(每个线程都提供一个 comm 对象)来调用 NCCL 集体操作。
在程序结束时,我们销毁所有通信器对象
for (int i=0; i<ngpus; i++)
ncclCommDestroy(comms[i]);
以下代码描述了一个完整的 MPI 多进程工作示例,每个进程多个设备
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char* string) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++){
result = ((result << 5) + result) ^ string[c];
}
return result;
}
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
//each process is using two GPUs
int nDev = 2;
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
//picking GPUs based on localRank
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
ncclUniqueId id;
ncclComm_t comms[nDev];
//generating NCCL unique ID at one process and broadcasting it to all
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//initializing NCCL, group API is required around ncclCommInitRank as it is
//called across multiple GPUs in each thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));
}
NCCLCHECK(ncclGroupEnd());
//calling NCCL communication API. Group API is required when using
//multiple devices per thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
//synchronizing on CUDA stream to complete NCCL communication
for (int i=0; i<nDev; i++)
CUDACHECK(cudaStreamSynchronize(s[i]));
//freeing device memory
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for (int i=0; i<nDev; i++) {
ncclCommDestroy(comms[i]);
}
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
示例 4:每个设备的多个通信器¶
NCCL 允许用户为每个设备创建多个通信器。以下代码显示了一个示例,其中包含多个 MPI 进程,每个进程一个设备,以及每个设备的多个通信器
// blocking communicators
CUDACHECK(cudaSetDevice(localRank));
for (int i = 0; i < commNum; ++i) {
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
NCCLCHECK(ncclCommInitRank(&blockingComms[i], nRanks, id, myRank));
}
// non-blocking communicators
CUDACHECK(cudaSetDevice(localRank));
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
config.blocking = 0;
for (int i = 0; i < commNum; ++i) {
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
NCCLCHECK(ncclCommInitRankConfig(&nonblockingComms[i], nRanks, id, myRank, &config));
do {
NCCLCHECK(ncclCommGetAsyncError(nonblockingComms[i], &state));
} while(state == ncclInProgress && checkTimeout() != true);
}
checkTimeout() 应该是一个用户定义的函数。有关更多非阻塞通信器用法,请查看 容错。此外,如果您想拆分通信器而不是创建新的通信器,请查看 ncclCommSplit()
。
通信示例¶
以下示例演示了执行 NCCL 集体的常用模式。
示例 1:每个进程或线程一个设备¶
如果您每个设备都有一个线程或进程,那么每个线程都为其设备调用集体操作,例如 AllReduce
ncclAllReduce(sendbuff, recvbuff, count, datatype, op, comm, stream);
调用后,操作已排队到流中。因此,如果您想等待操作完成,可以调用 cudaStreamSynchronize
cudaStreamSynchronize(stream);
有关 MPI 和每个 MPI 进程单个设备的完整工作示例,请参阅“示例 2:每个进程或线程一个设备”。
示例 2:每个线程多个设备¶
当单个线程管理多个设备时,您需要使用组语义一次在多个设备上启动操作
ncclGroupStart();
for (int i=0; i<ngpus; i++)
ncclAllReduce(sendbuffs[i], recvbuff[i], count, datatype, op, comms[i], streams[i]);
ncclGroupEnd();
在 ncclGroupEnd 之后,所有操作都已排队到流中。因此,如果您想等待操作完成,现在可以调用 cudaStreamSynchronize
for (int i=0; i<ngpus; i++)
cudaStreamSynchronize(streams[i]);
有关 MPI 和每个 MPI 进程多个设备的完整工作示例,请参阅 示例 3:每个线程多个设备。