我们对计算能力的需求越来越高,随着计算机单CPU处理能力的提升遇到瓶颈,并行方案成了解决之道:从硬件层的CPU多核技术,到软件层的并行计算、分布式计算、云计算…等等。
本文尽量用平实的语言来介绍并行计算(Parallel Computing),不涉及过多的学术概念和数学内容,以便没有背景知识也能理解。但如果你对该领域有兴趣,想深入学习的话,相关基础是必须的。
并行程序设计常用Ian Foster的四步方法论:partitioning、communication、agglomeration、mapping。
接下来举一个很典型的例子,来看看如何将其并行化,这个例子就是矩阵向量乘法。
对一个(m x n)的矩阵a,与一个(n)维的向量b相乘,结果为一个(m)维的向量c。
先看看传统的串行实现方法,很简单:
for (i = 0; i < m; i++) { c[i] = 0; for (j = 0; j < n; j++) c[i] += a[i][j] * b[j]; }
通过算法可以看到,矩阵a的每一行,与结果向量c的一个元素对应,行与行之间没有任何交互,可以独立计算。
很自然的,我们就可以想到:
如果有n个处理器参与计算,理论上计算速度是原来的n倍(不考虑开销)。
同时也可以看到,每个处理器的计算方法并没有变,算法复杂度还是一样的。从学术角度看,算法没有任何改进;但通过并行的方法,我们确实提高了效率,缩短了计算时间,这就是工程上的意义。
接下来我们用MPI(Message Passing Interface,消息传递接口)来实现上述并行算法。
代码来自Michael Quinn编著的Parallel Programming in C with MPI and OpenMP(ISBN:0072822562),个人只做了少量调整,完整代码可点击并行代码下载。
初始化:
MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &id); MPI_Comm_size(MPI_COMM_WORLD, &p);
从参数文件获取矩阵数据,按行划分的方式分配给各个参与计算的进程:
mympi_read_row_striped_matrix(argv[1], (void ***)&a, (void **)&storage, MPI_TYPE, &m, &n, MPI_COMM_WORLD);
从参数文件获取向量数据,每个参与进程都有完整的一份:
mympi_read_replicated_vector(argv[2], (void **)&b, MPI_TYPE, &nprime, MPI_COMM_WORLD);
每个进程进行自己负责部分的计算:
local_rows = BLOCK_SIZE(id, p, m); for (i = 0; i < local_rows; i++) { c_block[i] = 0.0; for (j = 0; j < n; j++) c_block[i] += a[i][j] * b[j]; }
将分散在各个进程的结果数据抓取合并成完整向量:
mympi_replicate_block_vector(c_block, m, c, MPI_TYPE, MPI_COMM_WORLD);
结束:
MPI_Finalize();
这就是实现的整体框架,相关子函数的说明见后。
代码在OpenMPI环境下测试通过。如果你要搭建自己的并行学习环境,可参见:OpenMPI并行环境的模拟。
执行结果:
$ mpirun -np 4 mv1 m.txt v.txt 2.000 1.000 3.000 4.000 0.000 5.000 -1.000 2.000 -2.000 4.000 0.000 3.000 4.000 1.000 2.000 2.000 3.000 1.000 -3.000 0.000 3.000 1.000 4.000 0.000 3.000 19.000 34.000 25.000 13.000
从文件获得矩阵数据,既可以由每个进程分别从文件读取数据,也可由专门的进程负责全部读取工作,然后再传递给相关进程。当数据很大时,如何实现需要根据具体的运行环境和网络状况来决定。
这里选择由专门进程读取的方式。
进程0负责读取工作。首先获取矩阵的维数,然后调用MPI_Bcast广播给所有进程:
if (!id) { if (!(fp = fopen(file, "r"))) { printf("Error: open matrix file[%s] - %s\n", file, strerror(errno)); fflush(stdout); MPI_Abort(comm, OPEN_FILE_ERROR); } if (mympi_fread(m, MPI_INT, 1, fp) != 1) { printf("Error: read row number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (mympi_fread(n, MPI_INT, 1, fp) != 1) { printf("Error: read col number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } ... } MPI_Bcast(m, 1, MPI_INT, 0, comm); MPI_Bcast(n, 1, MPI_INT, 0, comm);
进程0依次获取每行数据,然后判断该发给谁:如果是自己,则复制到本地缓冲区里;否则通过MPI_Send发给相应进程。其他进程则只管执行MPI_Recv等待接收数据:
count = 0; for (i = 0; i < *m; i++) { dest_id = BLOCK_OWNER(i, p, *m); if (!id) { if (mympi_fread(tmpbuf, dtype, *n, fp) != *n) { printf("Error: read %d row data of matrix.\n", i); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (dest_id == id) { memcpy((*subs)[count++], tmpbuf, (*n) * datum_size); } else { MPI_Send(tmpbuf, *n, dtype, dest_id, DATA_MSG, comm); } } else if (dest_id == id) { MPI_Recv((*subs)[count++], *n, dtype, 0, DATA_MSG, comm, &status); MPI_Get_count(&status, dtype, &status_count); if (status_count != *n) { printf("Error: recv count %d != %d.\n", status_count, *n); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } } }
同样由进程0负责读取工作,先获取向量参数,再获取向量内容。由于向量数据每个进程都用到,所以只需通过MPI_Bcast广播给所有进程即可:
MPI_Bcast(*v, *n, dtype, 0, comm);
调用MPI_Allgatherv抓取数据,使得每个进程都有一份完整的结果向量:
mympi_create_mixed_xfer_arrays(p, n, &cnt, &disp); MPI_Allgatherv(src, cnt[id], dtype, dst, cnt, disp, dtype, comm);
参数的数据结构有点复杂,请详细参阅函数manual。
理解了上述部分,该函数不难明白,故不再介绍。
略(同上)。