Please enable Javascript to view the contents

MPI 通信原语及 Python 编程使用

 ·  ☕ 5 分钟

1. 什么是 MPI

MPI,Message Passing Interface 消息传递接口,是一种用于并行计算的通信协议。

MPI 提供了一组标准化的接口,用于在不同的计算节点之间传输数据,广泛应用于科学计算、机器学习、深度学习等领域。

MPI 有多个实现,常用实现有 MPICH 和 OpenMPI。MPICH 是由 Argonne 实验室主导开发的,是各种商业定制版本的基础,因此不能简单认为 MPICH 是一个产品,而是包含一系列衍生的版本,比如 MVAPICH、Intel MPI 等。OpenMPI 是一个由多个研究机构(包括 UTK、IU、Cisco、NVIDIA 等)联合开发的的版本。

具体选择可以参考:

  • Open MPI

适合大多数 Linux 集群,尤其是需要高性能网络支持(如 InfiniBand、ROCE)和 GPU 支持的场景。常用于 HPC 和深度学习集群。

  • MPICH

通用的 Linux 系统高兼容性选项,适用于标准 MPI 应用。若系统希望在不同 MPI 实现之间迁移,MPICH 是稳妥的选择。

  • MVAPICH

基于 MPICH,专为高性能网络(如 InfiniBand 和 RDMA)优化,适合有高网络带宽需求的科学计算任务和 GPU 任务。

  • Intel MPI

专门针对 Intel 硬件进行了优化,适合 Intel 处理器及 Intel Omni-Path 网络,支持主流的 Linux 系统。

2. MPI 通信原语

2.1 点对点 P2P

一个进程与另一个指定进程的通信方式。

  • send
1
2
3
4
5
6
7
MPI_Send(
    void* data,
    int count,
    MPI_Datatype datatype,
    int destination,
    int tag,
    MPI_Comm communicator)

向指定的进程发送指定数量的数据。

  • receive
1
2
3
4
5
6
7
8
MPI_Recv(
    void* data,
    int count,
    MPI_Datatype datatype,
    int source,
    int tag,
    MPI_Comm communicator,
    MPI_Status* status)

从指定的进程接收指定数量的数据。

2.2 集合通信 CC

一个进程和所有进程的通信方式。

  • barrier

等待所有进程到达某个点。

1
MPI_Barrier(MPI_Comm communicator)

进程 0 在时间点 T1 调用了 MPI_Barrier 之后,需要等待全部进程到达 MPI_Barrier 调用的位置,才能够同时继续执行。

  • broadcast
1
2
3
4
5
6
MPI_Bcast(
    void* data,
    int count,
    MPI_Datatype datatype,
    int root,
    MPI_Comm communicator)

root 进程 0 将向所有进程发送一份数据。

  • scatter
1
2
3
4
5
6
7
8
9
MPI_Scatter(
    void* send_data,
    int send_count,
    MPI_Datatype send_datatype,
    void* recv_data,
    int recv_count,
    MPI_Datatype recv_datatype,
    int root,
    MPI_Comm communicator)

不同于 boradcast 将数据完整副本发送给所有进程,scatter 将数据分割成多份,发送给不同的进程。

  • gather
1
2
3
4
5
6
7
8
9
MPI_Gather(
    void* send_data,
    int send_count,
    MPI_Datatype send_datatype,
    void* recv_data,
    int recv_count,
    MPI_Datatype recv_datatype,
    int root,
    MPI_Comm communicator)

与 scatter 相反,gather 将多份数据接收到一个进程。

  • allgather
1
2
3
4
5
6
7
8
MPI_Allgather(
    void* send_data,
    int send_count,
    MPI_Datatype send_datatype,
    void* recv_data,
    int recv_count,
    MPI_Datatype recv_datatype,
    MPI_Comm communicator)

allgather 不需要指定 root,全部的进程会收到其他所有进程的数据。

  • reduce
1
2
3
4
5
6
7
8
MPI_Reduce(
    void* send_data,
    void* recv_data,
    int count,
    MPI_Datatype datatype,
    MPI_Op op,
    int root,
    MPI_Comm communicator)

在进行 reduce 操作时,需要指定一个 op 操作称之为归约。上图中的归约操作就是求和。

  • allreduce
1
2
3
4
5
6
7
MPI_Allreduce(
    void* send_data,
    void* recv_data,
    int count,
    MPI_Datatype datatype,
    MPI_Op op,
    MPI_Comm communicator)

不同于 reduce,allreduce 不需要指定 root,所有进程都会收到归约后的结果。

3. 安装 MPICH\OpenMPI

安装 MPICH 就执行

1
apt-get install mpich -y

安装 OpenMPI 就执行

1
apt-get install openmpi-bin -y

会得到编译命令和运行命令,无论是安装 MPICH 还是 OpenMPI,都会得到类似的命令行工具。

3.1 MPI 编译命令

  • mpicc

使用 C 编译器编译 MPI 程序,确保编译出的程序可以使用 MPI 通信库。

  • mpic++、mpiCC、mpicxx

使用 C++ 编译器编译 MPI 程序。

  • mpif77、mpif90、mpifort

使用不同 Fortran 编译器编译 MPI 程序。

3.2 MPI 运行命令

  • mpirun

运行 MPI 程序,管理 MPI 进程的启动。

  • mpiexec

与 mpirun 类似,但是更加符合 MPI 标准。

  • mpiexec.xxx、mpirun.xxx

针对特定环境、使用特定工具的 MPI 程序管理器。

4. MPI 程序示例

4.1 编写 MPI Python 程序

1
vim /data/mpi.py

保存如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from mpi4py import MPI
import numpy as np
import socket

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# 获取当前进程所在主机的主机名
hostname = socket.gethostname()

# 每个进程生成一个随机数并包装成 numpy 数组
local_data = np.random.random(1)  # 包装成一个 numpy 数组
print(f"Process {rank} on host {hostname} has local data: {local_data[0]}")

# 使用 gather 收集所有进程的数据
all_data = comm.gather(local_data[0], root=0)

# 创建用于 Allreduce 的缓冲区
global_data = np.zeros(1, dtype='d')  # 初始化全局数据为零,类型为 double
comm.Allreduce(local_data, global_data, op=MPI.SUM)

# 只在 rank 0 上输出结果
if rank == 0:
    print(f"All process data: {all_data}")
    print(f"The sum of all data is: {global_data[0]}")

在一些集合通信的场景下,有时会针对某个进程进行一些特殊处理,就会使用 rank 来判断当前进程的编号符合条件时,执行操作。

4.2 配置免密

  • 配置 /etc/hosts

编辑 /etc/hosts 文件,添加主机名和 IP 地址的映射。

1
2
3
x.x.x.x host-1
x.x.x.x host-2
x.x.x.x host-3
  • 生成 SSH 秘钥
1
ssh-keygen -t rsa
  • 配置主机之间的相互免密
1
2
3
scp /root/.ssh/id_rsa /root/.ssh/id_rsa.pub host-1:/root/.ssh/
scp /root/.ssh/id_rsa /root/.ssh/id_rsa.pub host-2:/root/.ssh/
scp /root/.ssh/id_rsa /root/.ssh/id_rsa.pub host-3:/root/.ssh/

4.3 安装依赖

  • 安装 MPI

全部主机应该安装相同的 MPI 实现和版本。

1
opscli shell -i host-1,host-2,host-3 --content "apt-get install openmpi-bin libopenmpi-dev  -y"

这里需要额外安装一个依赖包 libopenmpi-dev,用于编译安装 mpi4py。

  • 安装 mpi4py、numpy 依赖
1
opscli shell -i host-1,host-2,host-3 --content "pip install mpi4py numpy"
  • 将 mpi.py 文件拷贝到每个主机
1
2
scp /data/mpi.py host-2:/data/
scp /data/mpi.py host-3:/data/

4.4 创建 Hostfile 文件

1
2
3
4
5
cat > hostfile <<EOF
host-1 slots=1
host-2 slots=1
host-3 slots=1
EOF

这里的 slots 表示每个主机能启动的最大进程数。一般情况下,slots 为 CPU 核心数。

除了使用 hostfile 文件指定主机,还可以使用 -host 参数指定主机及 slots 值。

1
2
-host host-1 -host host-2 -host host-3
-host host-1:1 -host host-2:1

host-1:1 表示 host-1 只能启动一个进程,冒号后面是最大进程数。

4.5 运行 MPI 程序

  • 多机多进程
1
2
3
4
5
6
7
mpirun --allow-run-as-root -np 3 -hostfile hostfile python3 /data/mpi.py

Process 2 on host host-3 has local data: 0.8237177424830892
Process 0 on host host-1 has local data: 0.4049368708804896
Process 1 on host host-2 has local data: 0.055557219335660935
All process data: [np.float64(0.4049368708804896), np.float64(0.055557219335660935), np.float64(0.8237177424830892)]
The sum of all data is: 1.2842118326992398
  • 单机多进程
1
2
3
4
5
6
7
8
mpirun --allow-run-as-root -np 4 -host host-2:4 python3 /data/mpi.py

Process 1 on host host-2 has local data: 0.57806331374296
Process 0 on host host-2 has local data: 0.10289882095170111
Process 2 on host host-2 has local data: 0.7618555892517183
Process 3 on host host-2 has local data: 0.810980561742528
All process data: [0.10289882095170111, 0.57806331374296, 0.7618555892517183, 0.810980561742528]
The sum of all data is: 2.2537982856889074
  • mpi 会按照 host 顺序,依次给每个 host 分配 slots 数量的进程

让可用的 slots 数量远大于进程数,观察 MPI 对进程的分配策略。

1
2
3
4
5
6
7
8
mpirun --allow-run-as-root -np 4 -host host-1:2 -host host-2:4 -host host-3:8 python3 /data/mpi.py

Process 3 on host host-2 has local data: 0.2001460877733756
Process 0 on host host-1 has local data: 0.6559694967423054
Process 2 on host host-2 has local data: 0.43367558731572886
Process 1 on host host-1 has local data: 0.8121368124082778
All process data: [np.float64(0.6559694967423054), np.float64(0.8121368124082778), np.float64(0.43367558731572886), np.float64(0.2001460877733756)]
The sum of all data is: 2.101927984239688

实验了很多次,结果都是 host-1 分配 2 个进程,host-2 分配 2 个进程,host-3 没有分配进程。这说明,MPI 是按照提供的主机顺序,依次充分给 host 分配进程。

5. 参考


微信公众号
作者
微信公众号