MPI 模型
如图MPI的各个运算节点是分布式的.每一个节点可以视为是一个“Thread”,但这里的不同之处在于这些节点没有所谓的共享内存,或者说Global Memory。所以,在后面也会看到,一般会有一个节点专门处理数据传输和分配的问题。MPI和CUDA的另一个不同之处在于MPI只有一级结构,即所有的节点都在一个全局命名空间下,不像CUDA那样有Grid/Block/Thread三级层次。MPI同样也是基于SPMD模型,所有的节点执行相同的指令,而每个节点根据自己的ID来确定指令处理的数据,产生相应的输出。
MPI API介绍
以下面这段代码来介绍API: 这段代码的功能是实现向量相加
int main(int argc, char *argv[]) {
int size = ;
int pid = -;
int np = -;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Comm_size(MPI_COMM_WORLD, &np);
if (np < ) {
if (pid == ) printf("Need 3 or more processes.\n");
MPI_Abort(MPI_COMM_WORLD, );
return ;
}
if (pid < np - )
compute_node(size / (np - ));
else
data_server(size);
MPI_Finalize();
return ;
}
1. MPI_Init()和MPI_Finalize()用于初始化和结束MPI框架;
2. MPI_COMM_WORLD代表了所有分配到的节点的集群;
3. MPI_Comm_rank()用于获取节点在集群中的标号,相当与CUDA中的threadIdx.x;
4. MPI_Comm_size()用于获取集群节点的数量,相当于blockDim.x;
5. MPI_Abort()用于中止执行。
上面代码中,有一个节点,也就是np-1节点,来负责数据的传输和分配,而其他的节点则负责计算。
数据传输data_server(size)是如何实现的呢?
MPI 通信
void data_server(unsigned int size) {
int np;
int first = ;
unsigned int num_bytes = size * sizeof(float);
float *a = ; float *b = ; float *c = ;
MPI_Comm_size(MPI_COMM_WORLD, &np);
a = (float *) malloc(num_bytes);
b = (float *) malloc(num_bytes);
c = (float *) malloc(num_bytes);
random_data(a, size);
random_data(b, size);
float *ptr_a = a;
float *ptr_b = b; // send data
for (int i = ; i < np - ; i++) {
MPI_Send(ptr_a, size / (np - ), MPI_FLOAT, i, DATA_DISTRIBUTE, MPI_COMM_WORLD);
ptr_a += size / (np - );
MPI_Send(ptr_b,size / (np - ), MPI_FLOAT, i, DATA_DISTRIBUTE, MPI_COMM_WORLD);
ptr_b += size / (np - );
} // wait for nodes to compute
MPI_Barrier(MPI_COMM_WORLD);
// collect output data
MPI_Status status;
for (int i = ; i < np -; i++) {
MPI_Recv(c + i * size /(np - ), size / (np - ), MPI_REAL, i, DATA_COLLECT, MPI_COMM_WORLD, &status);
}
store_output(c);
free(a); free(b); free(c);
}
int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
buf: 发送buffer的地址值.
count: 发送buffer的元素个数.
datatype: 发送buffer的数据类型.
dest: 个人理解为目标处理单元的索引,比如当前这个就是发送给第i个处理单元.
tag: 信息tag
comm: 传播者,handler
int MPI_Barrier(MPI_Comm comm): 阻塞调用者直到组内所有成员都调用它. 类似于cuda中的__syncthreads();
说完MPI 通信,下面来说MPI 计算部分.
MPI Compute
若节点支持CUDA,则还可以与CUDA结合起来进一步提高运算速度。以上面的计算节点为例:
void compute_node(unsigned int vector_size ) {
int np;
unsigned int num_bytes = vector_size*sizeof(float);
float *h_a, *h_b, *h_output;
float* d_A, d_B, d_output;
MPI_Status status;
MPI_Comm_size(MPI_COMM_WORLD, &np);
int server_process = np - ;
/* Allocate memory */
cudaHostAlloc((void **)&h_a, num_bytes, cudaHostAllocDefault);
cudaHostAlloc((void **)&h_b, num_bytes, cudaHostAllocDefault);
cudaHostAlloc((void **)&h_output, num_bytes, cudaHostAllocDefault);
/* Get the input data from server process */
MPI_Recv(h_a, vector_size, MPI_FLOAT, server_process, DATA_DISTRIBUTE, MPI_COMM_WORLD, &status);
MPI_Recv(h_b, vector_size, MPI_FLOAT, server_process, DATA_DISTRIBUTE, MPI_COMM_WORLD, &status);
/* Transfer data to CUDA device */
cudaMalloc((void **) &d_A, size);
cudaMemcpy(d_A, h_A, size, cudaMemcpyHostToDevice);
cudaMalloc((void **) &d_B, size);
cudaMemcpy(d_B, h_B, size, cudaMemcpyHostToDevice);
cudaMalloc((void **) &d_output, size);
/* Compute the partial vector addition */
dim3 Db(BLOCK_SIZE);
dim3 Dg((vector_size + BLOCK_SIZE – )/BLOCK_SIZE);
vector_add_kernel<<<Dg, Db>>>(d_output, d_a, d_b, vector_size);
MPI_Barrier(d_output);
/* Send the output */
MPI_Send(output, vector_size, MPI_FLOAT, server_process, DATA_COLLECT, MPI_COMM_WORLD);
/* Release device memory */
cudaFree(d_a);
cudaFree(d_b);
cudaFree(d_output);
}
上面使用了Pinned Memory,可以提高数据传输的效率。这里所做的工作,就是将原来串行的向量.
如果节点不支持cuda,则可以像普通C语言那样写:
for(int i=0; i<vector_size; ++i) {
output[i] = input_a[i] + input_b[i]
}