====== 并行计算 ======
我们对计算能力的需求越来越高,随着计算机单CPU处理能力的提升遇到瓶颈,并行方案成了解决之道:从硬件层的CPU多核技术,到软件层的并行计算、分布式计算、云计算...等等。
本文尽量用平实的语言来介绍并行计算(Parallel Computing),不涉及过多的学术概念和数学内容,以便没有背景知识也能理解。但如果你对该领域有兴趣,想深入学习的话,相关基础是必须的。
===== Foster方法论 =====
并行程序设计常用Ian Foster的四步方法论:partitioning、communication、agglomeration、mapping。
- Partitioning(划分)
* 实现并行的首要任务就是找到能并行化的部分,理论上越多越好。通过对计算的划分和数据的划分,我们尽可能多的找到能并行的计算任务。
- Communication(通信)
* 通常计算任务之间不是孤立的,彼此需要通信。当通信到达一定程度时,相关的开销是无法忽略的,甚至导致部分计算只能串行执行。
- Agglomeration(聚集)
* 接下来开始考虑与具体并行计算环境相结合的问题。通常并行任务的数量要远远大于计算机处理器的数量,因而需要将多个计算任务合并到一个处理器上运行。合并时,尽量让那些彼此有通信的任务在一起,这样能降低整体的通信开销,当然,也要保持整个计算方法的可扩展性。
- Mapping(映射)
* 最后就是将合并后的计算任务投到具体的处理器上运行了,该部分的一个考虑是如何提高处理器的利用率。如果你是程序员,就是着手进行并行程序的设计和实现的时候了。
===== 样例 =====
接下来举一个很典型的例子,来看看如何将其并行化,这个例子就是矩阵向量乘法。
==== 串行算法 ====
对一个(m x n)的矩阵a,与一个(n)维的向量b相乘,结果为一个(m)维的向量c。
先看看传统的串行实现方法,很简单:
for (i = 0; i < m; i++) {
c[i] = 0;
for (j = 0; j < n; j++)
c[i] += a[i][j] * b[j];
}
==== 并行设计 ====
通过算法可以看到,矩阵a的每一行,与结果向量c的一个元素对应,行与行之间没有任何交互,可以独立计算。
很自然的,我们就可以想到:
* 以行为单位来划分,一行为一个计算任务。
* 这是个很理想的例子,因为计算任务之间没有通信开销。
* 通常矩阵的行数比集群的处理器数量大得多,我们就将矩阵按行分割,均匀的分配给每个处理器。
* 各个处理器用传统方法分别计算自己负责的那部分数据,算出结果向量的一部分。
* 将分散在各个处理器上的部分结果合在一块,就是最终的结果。
如果有n个处理器参与计算,理论上计算速度是原来的n倍(不考虑开销)。
同时也可以看到,每个处理器的计算方法并没有变,算法复杂度还是一样的。从学术角度看,算法没有任何改进;但通过并行的方法,我们确实提高了效率,缩短了计算时间,这就是工程上的意义。
==== MPI实现 ====
接下来我们用MPI(Message Passing Interface,消息传递接口)来实现上述并行算法。
代码来自Michael Quinn编著的//Parallel Programming in C with MPI and OpenMP//(ISBN:0072822562),个人只做了少量调整,完整代码可点击[[ftp://gwduan.com/pub/wiki/parallel/|并行代码]]下载。
=== 文件列表 ===
* Makefile
* mv1.c(主程序)
* common.c(其他函数)
* mympi.h(头文件)
* m.txt(测试矩阵数据)
* v.txt(测试向量数据)
=== 主函数main() ===
初始化:
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &p);
从参数文件获取矩阵数据,按行划分的方式分配给各个参与计算的进程:
mympi_read_row_striped_matrix(argv[1], (void ***)&a, (void **)&storage,
MPI_TYPE, &m, &n, MPI_COMM_WORLD);
从参数文件获取向量数据,每个参与进程都有完整的一份:
mympi_read_replicated_vector(argv[2], (void **)&b, MPI_TYPE,
&nprime, MPI_COMM_WORLD);
每个进程进行自己负责部分的计算:
local_rows = BLOCK_SIZE(id, p, m);
for (i = 0; i < local_rows; i++) {
c_block[i] = 0.0;
for (j = 0; j < n; j++)
c_block[i] += a[i][j] * b[j];
}
将分散在各个进程的结果数据抓取合并成完整向量:
mympi_replicate_block_vector(c_block, m, c, MPI_TYPE, MPI_COMM_WORLD);
结束:
MPI_Finalize();
这就是实现的整体框架,相关子函数的说明见后。
==== 运行测试 ====
代码在OpenMPI环境下测试通过。如果你要搭建自己的并行学习环境,可参见:[[..:env:openmpi|OpenMPI并行环境的模拟]]。
执行结果:
$ mpirun -np 4 mv1 m.txt v.txt
2.000 1.000 3.000 4.000 0.000
5.000 -1.000 2.000 -2.000 4.000
0.000 3.000 4.000 1.000 2.000
2.000 3.000 1.000 -3.000 0.000
3.000 1.000 4.000 0.000 3.000
19.000 34.000 25.000 13.000
==== 子函数说明 ====
=== 读取矩阵数据mympi_read_row_striped_matrix() ===
从文件获得矩阵数据,既可以由每个进程分别从文件读取数据,也可由专门的进程负责全部读取工作,然后再传递给相关进程。当数据很大时,如何实现需要根据具体的运行环境和网络状况来决定。
这里选择由专门进程读取的方式。
进程0负责读取工作。首先获取矩阵的维数,然后调用[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Bcast.html|MPI_Bcast]]广播给所有进程:
if (!id) {
if (!(fp = fopen(file, "r"))) {
printf("Error: open matrix file[%s] - %s\n",
file, strerror(errno));
fflush(stdout);
MPI_Abort(comm, OPEN_FILE_ERROR);
}
if (mympi_fread(m, MPI_INT, 1, fp) != 1) {
printf("Error: read row number of matrix.\n");
fflush(stdout);
MPI_Abort(comm, DATA_ERROR);
}
if (mympi_fread(n, MPI_INT, 1, fp) != 1) {
printf("Error: read col number of matrix.\n");
fflush(stdout);
MPI_Abort(comm, DATA_ERROR);
}
...
}
MPI_Bcast(m, 1, MPI_INT, 0, comm);
MPI_Bcast(n, 1, MPI_INT, 0, comm);
进程0依次获取每行数据,然后判断该发给谁:如果是自己,则复制到本地缓冲区里;否则通过[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Send.html|MPI_Send]]发给相应进程。其他进程则只管执行[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Recv.html|MPI_Recv]]等待接收数据:
count = 0;
for (i = 0; i < *m; i++) {
dest_id = BLOCK_OWNER(i, p, *m);
if (!id) {
if (mympi_fread(tmpbuf, dtype, *n, fp) != *n) {
printf("Error: read %d row data of matrix.\n",
i);
fflush(stdout);
MPI_Abort(comm, DATA_ERROR);
}
if (dest_id == id) {
memcpy((*subs)[count++], tmpbuf,
(*n) * datum_size);
} else {
MPI_Send(tmpbuf, *n, dtype, dest_id,
DATA_MSG, comm);
}
} else if (dest_id == id) {
MPI_Recv((*subs)[count++], *n, dtype, 0, DATA_MSG,
comm, &status);
MPI_Get_count(&status, dtype, &status_count);
if (status_count != *n) {
printf("Error: recv count %d != %d.\n",
status_count, *n);
fflush(stdout);
MPI_Abort(comm, DATA_ERROR);
}
}
}
=== 读取向量数据mympi_read_replicated_vector() ===
同样由进程0负责读取工作,先获取向量参数,再获取向量内容。由于向量数据每个进程都用到,所以只需通过MPI_Bcast广播给所有进程即可:
MPI_Bcast(*v, *n, dtype, 0, comm);
=== 合并结果向量mympi_replicate_block_vector() ===
调用[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Allgatherv.html|MPI_Allgatherv]]抓取数据,使得每个进程都有一份完整的结果向量:
mympi_create_mixed_xfer_arrays(p, n, &cnt, &disp);
MPI_Allgatherv(src, cnt[id], dtype, dst, cnt, disp, dtype, comm);
参数的数据结构有点复杂,请详细参阅函数manual。
=== 显示矩阵数据mympi_print_row_striped_matrix() ===
理解了上述部分,该函数不难明白,故不再介绍。
=== 显示向量数据mympi_print_replicated_vector() ===
略(同上)。
{{tag>并行计算}}