写点什么

MPI 高性能计算和集合通信编程

作者:王玉川
  • 2024-10-15
    上海
  • 本文字数:13134 字

    阅读完需:约 43 分钟

MPI高性能计算和集合通信编程

MPI (Message Passing Interface),是高性能计算(High Performance Computing - HPC)领域中广泛使用的并行编程模式。


本来 MPI 跟我的工作没啥紧密的关系,不过随着近年来大模型和分布式训练的发展,各家大大小小的芯片公司纷纷模仿 NVIDIA 的 NCCL 集合通信库,推出了各种各样的 xCCL。而各种集合通信库的实现里面,都需要实现或定制 MPI 的集合通信原语。


因此,在了解了一些集合通信库的实现后,特意回头来看看这项技术的源头 MPI。理解了这些基础的通信原语,有利于更加深入的理解各种各样的集合通信库。


本文将基于 Ubuntu 和 OpenMP,来展示如何进行 MPI 的编程;并逐个调用集合通信原语,展示这些原语如何使用、能得到什么样的结果。

环境准备

为了进行 MPI 的编程,需要先安装这些软件:


sudo apt install mpich libopenmpi-dev
复制代码


mpich 是 MPI 代码的编译器;libopenmpi-dev 是 OpenMP 的开发包。

Hello MPI

既然学习一个新语言的时候,大家都用 Hello World 作为起点,那么我们也用 Hello MPI 来作为 MPI 学习的起点吧。


先创建一个 hello_mpi.cpp,输入以下代码、编译、执行。看看效果之后,再来介绍 MPI 编程的基础概念。


代码如下:


#include <iostream>#include <mpi.h>
void MpiWithRank(int current_rank, int world_size){ std::cout << "Hello MPI from rank: " << current_rank << "/" << world_size << std::endl;}
int main(int argc, char **argv){ // Begin // Initialize MPI MPI_Init(&argc, &argv);
int current_rank = 0; int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
MpiWithRank(current_rank, world_size);
// End MPI_Finalize();
return 0;}
复制代码


然后,编译这个文件:


mpic++ hello_mpi.cpp -o hello_mpi
复制代码


编译成功之后,执行命令:


mpirun -np 4 hello_mpi
复制代码


一切正常的话,将会得到类似下面的输出:


Hello MPI from rank: 3/4Hello MPI from rank: 0/4Hello MPI from rank: 1/4Hello MPI from rank: 2/4
复制代码


现在,我们来介绍一下这些代码和步骤都做了些啥。


MPI 编程里面,有几个最基础的函数:


int MPI_Init(&argc, &argv):它作为 MPI 程序的第一个函数调用,负责初始化 MPI 并行环境。


int MPI_Finalize():它作为 MPI 程序的最后一个函数调用,退出 MPI 并行环境。


int MPI_Comm_size(MPI_COMM_WORLD, &world_size):获得默认通信域内的进程数目 world_size。也可以理解为,在默认的进程组里面,存在多少个进程在并行工作。


int MPI_Comm_rank(MPI_COMM_WORLD, &current_rank):获得本进程在默认通信域的编号,该编号从 0 开始,一直增加到 world_size-1。比如我们前面的例子里,运行了四个进程做并行工作,那么 world_size 就是 4,而每个进程的 rank 分别是 0、1、2、3。


两个命令行里面,mpic++用来编译 MPI + C++代码,并得到可执行文件。如果编译 MPI + C 代码的话,则换成 mpicc 即可。


mpirun 则用来运行编译好的可执行文件,-np 4 代表需要启动 4 个进程,这四个进程构成通信域/进程组,一起并行工作。


4 个进程启动后,它们在各自的 MpiWithRank 函数里,分别输出了各自的 rank 序号和通信域的 world_size。


这个就是 MPI 程序的最基础代码了。后面所有的例子,都是基于这个框架和步骤,逐渐增加功能的。

Send/Recv

Send 和 Recv 是基础的通信操作,在 MPI 的概念里,意味着从某个 rank/进程发出一些数据,然后在另一个 rank/进程里接收这些数据。


在大模型分布式训练中,执行流水线并行时,不同的 stage 之间,就可以通过 Send/Recv 来收发前向和反向的数据。


涉及到的函数如下所示:


int MPI_Send(void *buff, int count, MPI_Datatype datatype, int recipient, int tag, MPI_Comm communicator):buff 是打算发送的数据;count 是数据的个数;datatype 是数据的类型;recipient 表示数据要发送到哪个 rank;tag 是收发双方用来校验消息的,本文中没用它;communicator 是使用哪个通信域/进程组。


int MPI_Recv(void *buff, int count, MPI_Datatype datatype, int sender, int tag, MPI_Comm communicator, MPI_Status *status):参数和 MPI_Send 很类似,主要的区别在于,buff 用来保存收到的数据;sender 表示数据是从哪个 rank 发送过来的。


接下来,新建一个 send_recv.cpp,并输入以下内容:


#include <iostream>#include <mpi.h>
void SendRecv(int current_rank, int world_size){ // Recv data from previous rank int src_rank = current_rank - 1; if (src_rank < 0) { // For rank 0, recv from the last rank src_rank = world_size - 1; }
int dst_rank = current_rank + 1; if (dst_rank >= world_size) { // For last rank, send to the first rank dst_rank = 0; }
int data_send = 11; int tag = 121; MPI_Status status; if (current_rank == 0) { // Send to next rank std::cout << "Rank " << current_rank << ": Send data " << data_send << " to rank " << dst_rank << std::endl; MPI_Send(&data_send, 1, MPI_INT, dst_rank, tag, MPI_COMM_WORLD);
// Recv from last rank MPI_Recv(&data_send, 1, MPI_INT, src_rank, tag, MPI_COMM_WORLD, &status); std::cout << "Rank " << current_rank << ": Recv data " << data_send << " from rank " << src_rank << std::endl; } else { // Recv from previous rank MPI_Recv(&data_send, 1, MPI_INT, src_rank, tag, MPI_COMM_WORLD, &status); std::cout << "Rank " << current_rank << ": Recv data " << data_send << " from rank " << src_rank << std::endl;
// Modify the data before sending to next rank data_send += 1;
// Send to next rank std::cout << "Rank " << current_rank << ": Send data " << data_send << " to rank " << dst_rank << std::endl; MPI_Send(&data_send, 1, MPI_INT, dst_rank, tag, MPI_COMM_WORLD); }}
int main(int argc, char **argv){ // Begin // Initialize MPI MPI_Init(&argc, &argv);
int current_rank = 0; int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
SendRecv(current_rank, world_size);
// End MPI_Finalize();
return 0;}
复制代码


编译:


mpic++ send_recv.cpp -o send_recv
复制代码


运行:


mpirun -np 4 send_recv
复制代码


可以得到输出:


Rank 0: Send data 11 to rank 1Rank 1: Recv data 11 from rank 0Rank 1: Send data 12 to rank 2Rank 2: Recv data 12 from rank 1Rank 2: Send data 13 to rank 3Rank 3: Recv data 13 from rank 2Rank 3: Send data 14 to rank 0Rank 0: Recv data 14 from rank 3
复制代码


用下面这张图来解释一下这段代码做了些什么:



一共运行了 4 个进程并行工作,进程 rank 0 把 11 发送给下一个进程 rank 1;rank 1 收到后,将数据增加 1 之后,按顺序发给下一个进程 rank 2;以此类推,最后一个进程再把最终的数据发送回 rank 0。由此构成了一个环形的 Send/Recv 操作。


了解完 Send/Recv 之后,我们接下来看第一个集合通信操作:广播。

Broadcast

广播操作执行的时候,会把指定的数据,从源 rank 复制到通信域里面的其他所有的 ranks。


在大模型的分布式训练中,执行数据并行时,需要把模型/参数从一个 GPU 复制到 DP 域内的其它 GPU 时,用的就是广播操作。


对应的函数是:


int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int emitter_rank, MPI_Comm communicator):具体参数的含义和前面的 Send/Recv 基本类似,不一样的是这里用 emitter_rank 来表示从哪个 rank 广播数据到其他所有的 ranks。


在调用 MPI_Bcast 的时候,虽然接收方的那些 ranks 不执行广播操作,但一样需要调用这个函数。具体 MPI_Bcast 的实现我还没去看过,但可以猜到的是,如果不调用的话,这些被广播的进程们,它们怎么知道要接收数据、要把收到的数据放到什么地方呢?其他的集合通信操作也是同样的道理。


看看具体怎么用的。


新建一个 broadcast.cpp 文件:


#include <iostream>#include <mpi.h>
void Broadcast(int current_rank, int world_size){ int data_bcast = 0; int root_rank = 0; if (current_rank == root_rank) { // Root rank to broadcast data data_bcast = 10; }
std::cout << "Rank " << current_rank << ": original data is " << data_bcast << std::endl;
MPI_Barrier(MPI_COMM_WORLD); MPI_Bcast(&data_bcast, 1, MPI_INT, root_rank, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD); std::cout << "Rank " << current_rank << ": new data is " << data_bcast << std::endl;}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int current_rank = 0; int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
Broadcast(current_rank, world_size);
MPI_Finalize();}
复制代码


然后编译这个文件:


mpic++ ./broadcast.cpp -o broadcast
复制代码


接下来执行它:


mpirun -np 4 broadcast
复制代码


可以得到如下的输出:


Rank 0: original data is 10Rank 1: original data is 0Rank 2: original data is 0Rank 3: original data is 0Rank 1: new data is 10Rank 0: new data is 10Rank 3: new data is 10Rank 2: new data is 10
复制代码


这个代码执行的操作如下图所示:



rank 0 把数据 10,广播到了通信域内的其他所有 rank。


另外,例子里为了数据打印整齐,用了 MPI_Barrier 让各个进程进行同步,这里不详述。


接下来,看看收集操作。

Gather

收集操作会把通信域内所有进程/ranks 的数据,收集到指定的 rank 中。收集后的数据顺序,跟各个 rank 的序号一致。


对应的函数是:


int MPI_Gather(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, int count_recv, MPI_Datatype datatype_recv, int root, MPI_Comm communicator):从参数可以看出,执行这个函数时,会把各个 rank 上面的 buffer_send,发送到 root rank 上面的 buffer_recv;并且,这些发送来的数据,会按照 rank 的次序,在 root rank 依次排列。


下面通过代码来看看怎么使用这个函数。


新建 gather.cpp 文件:


#include <iostream>#include <sstream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void GatherNumbers(int current_rank, int world_size){ int root_rank = 0; int data_sent = (current_rank + 1) * 100; std::cout << "Rank " << current_rank << ": Send data " << data_sent << " to root for gathering" << std::endl;
if (current_rank == root_rank) { // Alloc buffer to gather data int buffer[world_size]; for (int i = 0; i < world_size; i++) { buffer[i] = 0; } std::cout << "Before gather, the data in root rank is: " << std::endl; PrintArray(buffer, world_size);
// Gather data MPI_Gather(&data_sent, 1, MPI_INT, buffer, 1, MPI_INT, root_rank, MPI_COMM_WORLD);
std::cout << "After gather, the data in root rank is: " << std::endl; PrintArray(buffer, world_size); } else { // Send data to root MPI_Gather(&data_sent, 1, MPI_INT, NULL, 0, MPI_INT, root_rank, MPI_COMM_WORLD); }}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
GatherNumbers(current_rank, world_size); //GatherString(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行这个文件:


mpic++ ./gather.cpp -o gathermpirun -np 4 gatherRank 1: Send data 200 to root for gatheringRank 2: Send data 300 to root for gatheringRank 3: Send data 400 to root for gatheringRank 0: Send data 100 to root for gatheringBefore gather, the data in root rank is:0 0 0 0After gather, the data in root rank is:100 200 300 400
复制代码


可以看到 rank 0 把所有 ranks 的数据,按照次序收集到了数组里。


具体过程如下图所示:



MPI_Gather 收集数据时,是做了一个假设:每个 rank 所发过来的的数据所占空间一致。针对每个 rank 的数据长度不一致的情况,可以使用 MPI_Gatherv 函数。


接下来看看收集的反向操作:分散。

Scatter

分散操作与收集操作相反,它会把指定 rank 上的数据,按照各个 rank 的序号,逐个分给通信域内的所有 ranks。


对应的函数是:


int MPI_Scatter(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, int count_recv, MPI_Datatype datatype_recv, int root, MPI_Comm communicator):函数很长,看起来有点吓人。它的意思是,把 root rank 上的 buffer_send 数据,依次分散到 communicator 通信域内各个 rank 的 buffer_recv 上。


这里还有些细节没展开,比如原始数据的长度不够、太长、不能被 rank 个数整除等等,这些特殊情况下怎么处理。


看看怎么使用这个函数来分散数据。


新建 scatter.cpp 文件:


#include <iostream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void ScatterNumbers(int current_rank, int world_size){ int root_rank = 0; int data_recv = 0; if (current_rank == root_rank) { // Create an array whose size is the same as the world size int buffer[world_size]; // Set the values of array for (int i = 0; i < world_size; i++) { // buffer containing all values buffer[i] = (i + 1) * 100; }
// Before scatter, the data is std::cout << "Rank 0: Array values to be scattered: " << std::endl; PrintArray(buffer, world_size);
// Dispatch buffer values to all the processes in the same communicator // If the array length is bigger than world size, it will only is the previous world_size items // If it is smaller than world size, the extra ranks will receive random number MPI_Scatter(buffer, 1, MPI_INT, &data_recv, 1, MPI_INT, root_rank, MPI_COMM_WORLD);
std::cout << "Rank " << current_rank << ": scatter value " << data_recv << std::endl; } else { // Receive the dispatched data MPI_Scatter(NULL, 1, MPI_INT, &data_recv, 1, MPI_INT, root_rank, MPI_COMM_WORLD);
// After scatter std::cout << "Rank " << current_rank << ": scatter value " << data_recv << std::endl; }}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int current_rank = 0; int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
ScatterNumbers(current_rank, world_size); //ScatterString(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./scatter.cpp -o scattermpirun -np 4 ./scatterRank 0: Array values to be scattered:100 200 300 400Rank 0: scatter value 100Rank 1: scatter value 200Rank 2: scatter value 300Rank 3: scatter value 400
复制代码


该代码将 rank 0 上的数组,按照各个 rank 的次序,依次分散到不同的 rank 中。在此过程中,rank 0 上的原始数组是不会发生变化的。


具体过程如下图所示:



和 MPI_Gatherv 类似,如果需要给每个 rank 分散不同长度的数据,可以使用 MPI_Scatterv 函数。


下一个操作是归约。

Reduce

归约操作在大模型训练和大数据里面是很重要的操作。比如说,通过 Reduce 操作,可以把通信域内所有 ranks 的数据,发送到指定的 root rank;然后在 root rank 上,对所有的数据进行计算,比如求和、求最大值、求最小值、求乘积等等。


对应的函数如下:


int MPI_Reduce(const void* send_buffer, void* receive_buffer, int count, MPI_Datatype datatype, MPI_Op operation, int root, MPI_Comm communicator):又是一个很长的函数,它的意思是,把 communicator 通信域内各个 rank 上的 send_buffer,发送到 root rank 的 receive_buffer,然后由 root 进行 operation 类型的操作。


看看具体怎么使用这个函数。


新建文件 reduce.cpp:


#include <iostream>#include <sstream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void ReduceAverage(int current_rank, int world_size){ int root_rank = 0; int global_data = 0;
// Assign data for each rank int local_data = (current_rank + 1) * 100; std::cout << "Rank " << current_rank << ": Send " << local_data << " to root rank for reduce average" << std::endl;
MPI_Reduce(&local_data, &global_data, 1, MPI_INT, MPI_SUM, root_rank, MPI_COMM_WORLD);
if (current_rank == root_rank) { // Show the reduce result at root rank std::cout << "Rank " << current_rank << ": Reduce sum " << global_data << std::endl; std::cout << "Rank " << current_rank << ": Reduce average " << (float)global_data/world_size << std::endl; }}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
ReduceAverage(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./reduce.cpp -o reducempirun -np 4 reduceRank 1: Send 200 to root rank for reduce averageRank 2: Send 300 to root rank for reduce averageRank 3: Send 400 to root rank for reduce averageRank 0: Send 100 to root rank for reduce averageRank 0: Reduce sum 1000Rank 0: Reduce average 250
复制代码


该代码执行的操作如下图所示:



每个 rank 将各自的数据发送到 rank 0,然后 rank 0 进行求和操作 (代码里面还带了求平均值的操作,MPI 本身不带求均值的操作符)。


Reduce 和 Gather 看起来有点像,都是各个 rank 把数据发到 rank root。主要的区别在于 root 收到后怎么处理,Gather 做的是把各个数据按照 rank 次序,拼接成更长的数据;而 Reduce 做的是对各个数据进行数学操作,得到另一个数据。


接下来,看看一个比较复杂的操作。

Reduce-Scatter

顾名思义,这个操作是 Reduce 和 Scatter 的组合。


通过这个操作,可以把通信域内的所有数据先做一次归约 reduce,再把归约结果按照 rank 次序,分散 scatter 到各个 rank 去。


对应的函数为:


int MPI_Reduce_scatter(const void* send_buffer, void* receive_buffer, const int* counts, MPI_Datatype datatype, MPI_Op operation, MPI_Comm communicator):它的意思是,将 communicator 通信域里面每个进程的 send_buffer 数据做归约,然后再把归约后的数据,按照进程的 rank 次序,依次发到每个进程的 receive_buffer 里面。counts 是个数组,用来约定每个进程分到几个数据。


在下面的例子,我们在每个进程里新建一个数组,数组的长度和 world_size 一致。然后执行 MPI_Reduce_scatter,就可以将这些数组先归约再分散了。最后,每个进程会收到一个数字。如果需要不同的进程收到不一样个数的数字,可以通过调整 scatter_cnts 实现。


新建如下的 reduce_scatter.cpp 文件:


#include <iostream>#include <sstream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void RecuceScatter(int current_rank, int world_size){ // Each rank has an array whose size is world_size // Then reduce the array, and scatter the results to each rank
// Create array for each rank int buffer[world_size]; for (int i = 0; i < world_size; i++) { buffer[i] = current_rank + i + 1; } std::cout << "Rank " << current_rank << ": Original array is: " << std::endl; PrintArray(buffer, world_size);
// Create the count to scatter to each rank int scatter_cnts[world_size]; // Each rank will receive one element for (int i = 0; i < world_size; i++) { scatter_cnts[i] = 1; }
int results = 0; MPI_Reduce_scatter(buffer, &results, scatter_cnts, MPI_INT, MPI_SUM, MPI_COMM_WORLD); std::cout << "Rank " << current_rank << ": Receive data: " << results << std::endl;}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
RecuceScatter(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./reduce_scatter.cpp -o reduce_scatter
mpirun -np 4 reduce_scatter
Rank 0: Original array is:1 2 3 4Rank 1: Original array is:2 3 4 5Rank 2: Original array is:3 4 5 6Rank 3: Original array is:4 5 6 7Rank 0: Receive data: 10Rank 1: Receive data: 14Rank 2: Receive data: 18Rank 3: Receive data: 22
复制代码


具体过程如下图所示,不同进程里,相同颜色的数字进行加法操作(Reduce)。然后每个进程收到一种颜色的结果(Scatter)。



下一个操作是 All Gather。

All Gather

这个操作和之前的 Gather 操作之间,唯一的区别在于,在完成 Gather 操作之后,会把收集到的结果复制到所有的 ranks。可以用 Gather + Broadcast 来理解这个操作。


对应的函数为:


int MPI_Allgather(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, int count_recv, MPI_Datatype datatype_recv, MPI_Comm communicator)


对比一下之前的 Gather 函数:


int MPI_Gather(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, int count_recv, MPI_Datatype datatype_recv, int root, MPI_Comm communicator)


可以看到 All Gather 少了个 root 参数,其它的全部一样。功能类似,在此不赘述,直接看例子。


新建一个 allgather.cpp 文件:


#include <iostream>#include <sstream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void AllGatherNumbers(int current_rank, int world_size){ int root_rank = 0; int data_sent = (current_rank + 1) * 100; std::cout << "Rank " << current_rank << ": Send data " << data_sent << " for allgather" << std::endl;
// Alloc buffer to gather data int buffer[world_size]; MPI_Allgather(&data_sent, 1, MPI_INT, buffer, 1, MPI_INT, MPI_COMM_WORLD);
std::cout << "After allgather, the data in rank " << current_rank << std::endl; PrintArray(buffer, world_size);}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
AllGatherNumbers(current_rank, world_size); //AllGatherString(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./allgather.cpp -o allgather
mpirun -np 4 allgather
Rank 0: Send data 100 for allgatherRank 1: Send data 200 for allgatherRank 3: Send data 400 for allgatherRank 2: Send data 300 for allgatherAfter allgather, the data in rank 0100 200 300 400After allgather, the data in rank 3100 200 300 400After allgather, the data in rank 1100 200 300 400After allgather, the data in rank 2100 200 300 400
复制代码


具体过程如下图所示:



胜利在望,只剩下两个通信原语了。


我们将 All Reduce 放到最后,先讲 All to All 操作。

All to All

All to All 是通信域内所有的进程全交换。执行的操作可以用线性代数里面学的矩阵转置来理解。在大模型的分布式训练中,专家并行 MoE 就使用了 All to All 的操作。


对应的函数为:


int MPI_Alltoall(const void* buffer_send, int count_send, MPI_Datatype datatype_send, void* buffer_recv, int count_recv, MPI_Datatype datatype_recv, MPI_Comm communicator):它的意思是,communicator 通信域内的每个进程,都把自己的 buffer_send 发送到其它所有的进程,同时,从其它的所有进程接收数据并存放到 buffer_recv 里面。


看一个具体的例子来加深理解。


新建 alltoall.cpp 文件:


#include <iostream>#include <sstream>#include <mpi.h>
void PrintArray(int* array, int len){ for (int i = 0; i < len; i++) { std::cout << array[i] << " "; } std::cout << std::endl;}
void All2AllTranpose(int current_rank, int world_size){ // Initialize send buffer for each rank int send_buffer[world_size]; for (int i = 0; i < world_size; i++) { // Process rank sends data like: 0 sends {0,1,2,3}, 1 sends {10,11,12,13}, etc. send_buffer[i] = current_rank * 10 + i; } std::cout << "Rank " << current_rank << ": Send data:" << std::endl; PrintArray(send_buffer, world_size);
// Buffer to receive data from all other ranks int recv_buffer[world_size]; // All-to-all communication MPI_Alltoall(send_buffer, 1, MPI_INT, recv_buffer, 1, MPI_INT, MPI_COMM_WORLD);
// Print the received data std::cout << "Rank " << current_rank << ": Recv data:" << std::endl; PrintArray(recv_buffer, world_size);}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
All2AllTranpose(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./alltoall.cpp -o alltoall
mpirun -np 4 alltoall
Rank 0: Send data:0 1 2 3Rank 1: Send data:10 11 12 13Rank 2: Send data:20 21 22 23Rank 3: Send data:30 31 32 33Rank 0: Recv data:0 10 20 30Rank 1: Recv data:1 11 21 31Rank 2: Recv data:2 12 22 32Rank 3: Recv data:3 13 23 33
复制代码


如果将每个进程的原始数组看作矩阵不同的行、数组里的每个元素看作不同的列的话,可以看到,经过 All to All 之后,矩阵的行与列发生了交换,即执行了数学里面的矩阵转置。


具体过程如下图所示:



最后我们来看看 All Reduce 操作。

All Reduce

这个应该是目前变种最多的一个集合通信原语了。在大模型的分布式训练中,这个操作广泛应用于数据并行、张量并行。为了提升 All Reduce 的效率,各大公司、高校提出了很多不同的做法。我们这里只介绍最基础的原理。


复习一下前面讲的 Reduce 归约操作,它是把通信域内所有进程的数据,都发到指定 root rank 上面进行计算,比如求和、求最大值、求最小值、求乘积等等。


All Reduce 也是做了类似的事情,可以理解为,执行完 Reduce 之后,root rank 把结果广播到所有的进程;或者这样理解,依次指定通信域内的每个 rank 作为 root,然后分别执行一次 Reduce 操作。


对应的函数为:


int MPI_Allreduce(const void* send_buffer, void* receive_buffer, int count, MPI_Datatype datatype, MPI_Op operation, MPI_Comm communicator)


对比一下之前 Reduce 函数:


int MPI_Reduce(const void* send_buffer, void* receive_buffer, int count, MPI_Datatype datatype, MPI_Op operation, int root, MPI_Comm communicator)


可以看到,All Reduce 少了个 root 参数,它不用指定最后由哪个 rank 来保存归约结果,而是每个 rank 都会得到归约结果。


我们通过一个例子来看看怎么用。


新建 allreduce.cpp 文件:


#include <iostream>#include <sstream>#include <mpi.h>
void AllReduceAverage(int current_rank, int world_size){ int root_rank = 0; int global_data = 0;
// Assign data for each rank int local_data = (current_rank + 1) * 100; std::cout << "Rank " << current_rank << ": Send " << local_data << " for allreduce average" << std::endl;
MPI_Allreduce(&local_data, &global_data, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
// Show the reduce result at each rank std::cout << "Rank " << current_rank << ": Reduce sum " << global_data << std::endl; std::cout << "Rank " << current_rank << ": Reduce average " << (float)global_data/world_size << std::endl;}
int main(int argc, char **argv){ MPI_Init(&argc, &argv);
int world_size = 0; MPI_Comm_size(MPI_COMM_WORLD, &world_size); int current_rank = 0; MPI_Comm_rank(MPI_COMM_WORLD, &current_rank);
AllReduceAverage(current_rank, world_size);
MPI_Finalize();}
复制代码


编译并执行该文件,可以得到如下结果:


mpic++ ./allreduce.cpp -o allreduce
mpirun -np 4 allreduce
Rank 0: Send 100 for allreduce averageRank 3: Send 400 for allreduce averageRank 2: Send 300 for allreduce averageRank 1: Send 200 for allreduce averageRank 2: Reduce sum 1000Rank 2: Reduce average 250Rank 3: Reduce sum 1000Rank 3: Reduce average 250Rank 0: Reduce sum 1000Rank 0: Reduce average 250Rank 1: Reduce sum 1000Rank 1: Reduce average 250
复制代码


具体过程如下图所示:



通过这个操作,每个 rank 都得到了所有 ranks 规约后的累积和、平均值。




至此,MPI 的编程框架,以及集合通信的 10 个原语:MPI_Send、MPI_Recv、MPI_Bcast、MPI_Gather、MPI_Scatter、MPI_Reduce、MPI_Reduce_scatter、MPI_Allgather、MPI_Alltoall、MPI_Allreduce 都讲完了。


洋洋洒洒写了这么长的文字和示意图,成功的把自己的知识梳理了一遍,也希望对读者有所帮助。


发布于: 刚刚阅读数: 7
用户头像

王玉川

关注

https://yuchuanwang.github.io/ 2018-11-13 加入

https://www.linkedin.com/in/yuchuan-wang/

评论

发布
暂无评论
MPI高性能计算和集合通信编程_HPC_王玉川_InfoQ写作社区