这是本文档旧的修订版!
并行计算
我们对计算能力的需求越来越高,随着计算机单CPU处理能力的提升遇到瓶颈,并行方案成了解决之道:从硬件层的CPU多核技术,到软件层的并行计算、分布式计算、云计算…等等。
本文尽量用平实的语言来介绍并行计算(Parallel Computing),不涉及过多的学术概念和数学内容,以便没有背景知识也能理解。但如果你对该领域有兴趣,想深入学习的话,相关基础是必须的。
Foster方法论
并行程序设计常用Ian Foster的四步方法论:partitioning、communication、agglomeration、mapping。
- Partitioning(划分)
- 实现并行的首要任务就是找到能并行化的部分,理论上越多越好。通过对计算的划分和数据的划分,我们尽可能多的找到能并行的计算任务。
- Communication(通信)
- 通常计算任务之间不是孤立的,彼此需要通信。当通信到达一定程度时,相关的开销是无法忽略的,甚至导致部分计算只能串行执行。
- Agglomeration(聚集)
- 接下来开始考虑与具体并行计算环境相结合的问题。通常并行任务的数量要远远大于计算机处理器的数量,因而需要将多个计算任务合并到一个处理器上运行。合并时,尽量让那些彼此有通信的任务在一起,这样能降低整体的通信开销,当然,也要保持整个计算方法的可扩展性。
- Mapping(映射)
- 最后就是将合并后的计算任务投到具体的处理器上运行了,该部分的一个考虑是如何提高处理器的利用率。如果你是程序员,就是着手进行并行程序的设计和实现的时候了。
样例
接下来举一个很典型的例子,来看看如何将其并行化,这个例子就是矩阵向量乘法。
串行算法
对一个(m x n)的矩阵a,与一个(n)维的向量b相乘,结果为一个(m)维的向量c。
先看看传统的串行实现方法,很简单:
for (i = 0; i < m; i++) { c[i] = 0; for (j = 0; j < n; j++) c[i] += a[i][j] * b[j]; }
并行设计
通过算法可以看到,矩阵a的每一行,与结果向量c的一个元素对应,行与行之间没有任何交互,可以独立计算。
很自然的,我们就可以想到:
- 以行为单位来划分,一行为一个计算任务。
- 这是个很理想的例子,因为计算任务之间没有通信开销。
- 通常矩阵的行数比集群的处理器数量大得多,我们就将矩阵按行分割,均匀的分配给每个处理器。
- 各个处理器用传统方法分别计算自己负责的那部分数据,算出结果向量的一部分。
- 将分散在各个处理器上的部分结果合在一块,就是最终的结果。
如果有n个处理器参与计算,理论上计算速度是原来的n倍(不考虑开销)。
同时也可以看到,每个处理器的计算方法并没有变,算法复杂度还是一样的。从学术角度看,算法没有任何改进;但通过并行的方法,我们确实提高了效率,缩短了计算时间,这就是工程上的意义。
MPI实现
接下来我们用MPI(Message Passing Interface,消息传递接口)来实现上述并行算法。
代码来自Michael Quinn编著的Parallel Programming in C with MPI and OpenMP(ISBN:0072822562),个人只做了少量调整,完整代码可点击并行代码下载。
文件列表
- Makefile
- mv1.c(主程序)
- common.c(其他函数)
- mympi.h(头文件)
- m.txt(测试矩阵数据)
- v.txt(测试向量数据)
主函数main()
初始化:
MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &id); MPI_Comm_size(MPI_COMM_WORLD, &p);
从参数文件获取矩阵数据,按行划分的方式分配给各个参与计算的进程:
mympi_read_row_striped_matrix(argv[1], (void ***)&a, (void **)&storage, MPI_TYPE, &m, &n, MPI_COMM_WORLD);
从参数文件获取向量数据,每个参与进程都有完整的一份:
mympi_read_replicated_vector(argv[2], (void **)&b, MPI_TYPE, &nprime, MPI_COMM_WORLD);
每个进程进行自己负责部分的计算:
local_rows = BLOCK_SIZE(id, p, m); for (i = 0; i < local_rows; i++) { c_block[i] = 0.0; for (j = 0; j < n; j++) c_block[i] += a[i][j] * b[j]; }
将分散在各个进程的结果数据抓取合并成完整向量:
mympi_replicate_block_vector(c_block, m, c, MPI_TYPE, MPI_COMM_WORLD);
结束:
MPI_Finalize();
这就是实现的整体框架,相关子函数的说明见后。
运行测试
代码在OpenMPI环境下测试通过。如果你要搭建自己的并行学习环境,可参见:OpenMPI并行环境的模拟。
执行结果:
$ mpirun -np 4 mv1 m.txt v.txt 2.000 1.000 3.000 4.000 0.000 5.000 -1.000 2.000 -2.000 4.000 0.000 3.000 4.000 1.000 2.000 2.000 3.000 1.000 -3.000 0.000 3.000 1.000 4.000 0.000 3.000 19.000 34.000 25.000 13.000
子函数说明
读取矩阵数据mympi_read_row_striped_matrix()
从文件获得矩阵数据,既可以由每个进程分别从文件读取数据,也可由专门的进程负责全部读取工作,然后再传递给相关进程。当数据很大时,如何实现需要根据具体的运行环境和网络状况来决定。
这里选择由专门进程读取的方式。
进程0负责读取工作。首先获取矩阵的维数,然后调用MPI_Bcast广播给所有进程:
if (!id) { if (!(fp = fopen(file, "r"))) { printf("Error: open matrix file[%s] - %s\n", file, strerror(errno)); fflush(stdout); MPI_Abort(comm, OPEN_FILE_ERROR); } if (mympi_fread(m, MPI_INT, 1, fp) != 1) { printf("Error: read row number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (mympi_fread(n, MPI_INT, 1, fp) != 1) { printf("Error: read col number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } ... } MPI_Bcast(m, 1, MPI_INT, 0, comm); MPI_Bcast(n, 1, MPI_INT, 0, comm);
进程0依次获取每行数据,然后判断该发给谁:如果是自己,则复制到本地缓冲区里;否则通过MPI_Send发给相应进程。其他进程则只管执行MPI_Recv等待接收数据:
count = 0; for (i = 0; i < *m; i++) { dest_id = BLOCK_OWNER(i, p, *m); if (!id) { if (mympi_fread(tmpbuf, dtype, *n, fp) != *n) { printf("Error: read %d row data of matrix.\n", i); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (dest_id == id) { memcpy((*subs)[count++], tmpbuf, (*n) * datum_size); } else { MPI_Send(tmpbuf, *n, dtype, dest_id, DATA_MSG, comm); } } else if (dest_id == id) { MPI_Recv((*subs)[count++], *n, dtype, 0, DATA_MSG, comm, &status); MPI_Get_count(&status, dtype, &status_count); if (status_count != *n) { printf("Error: recv count %d != %d.\n", status_count, *n); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } } }
读取向量数据mympi_read_replicated_vector()
同样由进程0负责读取工作,先获取向量参数,再获取向量内容。由于向量数据每个进程都用到,所以只需通过MPI_Bcast广播给所有进程即可:
MPI_Bcast(*v, *n, dtype, 0, comm);
合并结果向量mympi_replicate_block_vector()
调用MPI_Allgatherv抓取数据,使得每个进程都有一份完整的结果向量:
mympi_create_mixed_xfer_arrays(p, n, &cnt, &disp); MPI_Allgatherv(src, cnt[id], dtype, dst, cnt, disp, dtype, comm);
参数的数据结构有点复杂,请详细参阅函数manual。
显示矩阵数据mympi_print_row_striped_matrix()
理解了上述部分,该函数不难明白,故不再介绍。
显示向量数据mympi_print_replicated_vector()
略(同上)。