====== 并行计算 ====== 我们对计算能力的需求越来越高,随着计算机单CPU处理能力的提升遇到瓶颈,并行方案成了解决之道:从硬件层的CPU多核技术,到软件层的并行计算、分布式计算、云计算...等等。 本文尽量用平实的语言来介绍并行计算(Parallel Computing),不涉及过多的学术概念和数学内容,以便没有背景知识也能理解。但如果你对该领域有兴趣,想深入学习的话,相关基础是必须的。 ===== Foster方法论 ===== 并行程序设计常用Ian Foster的四步方法论:partitioning、communication、agglomeration、mapping。 - Partitioning(划分) * 实现并行的首要任务就是找到能并行化的部分,理论上越多越好。通过对计算的划分和数据的划分,我们尽可能多的找到能并行的计算任务。 - Communication(通信) * 通常计算任务之间不是孤立的,彼此需要通信。当通信到达一定程度时,相关的开销是无法忽略的,甚至导致部分计算只能串行执行。 - Agglomeration(聚集) * 接下来开始考虑与具体并行计算环境相结合的问题。通常并行任务的数量要远远大于计算机处理器的数量,因而需要将多个计算任务合并到一个处理器上运行。合并时,尽量让那些彼此有通信的任务在一起,这样能降低整体的通信开销,当然,也要保持整个计算方法的可扩展性。 - Mapping(映射) * 最后就是将合并后的计算任务投到具体的处理器上运行了,该部分的一个考虑是如何提高处理器的利用率。如果你是程序员,就是着手进行并行程序的设计和实现的时候了。 ===== 样例 ===== 接下来举一个很典型的例子,来看看如何将其并行化,这个例子就是矩阵向量乘法。 ==== 串行算法 ==== 对一个(m x n)的矩阵a,与一个(n)维的向量b相乘,结果为一个(m)维的向量c。 先看看传统的串行实现方法,很简单: for (i = 0; i < m; i++) { c[i] = 0; for (j = 0; j < n; j++) c[i] += a[i][j] * b[j]; } ==== 并行设计 ==== 通过算法可以看到,矩阵a的每一行,与结果向量c的一个元素对应,行与行之间没有任何交互,可以独立计算。 很自然的,我们就可以想到: * 以行为单位来划分,一行为一个计算任务。 * 这是个很理想的例子,因为计算任务之间没有通信开销。 * 通常矩阵的行数比集群的处理器数量大得多,我们就将矩阵按行分割,均匀的分配给每个处理器。 * 各个处理器用传统方法分别计算自己负责的那部分数据,算出结果向量的一部分。 * 将分散在各个处理器上的部分结果合在一块,就是最终的结果。 如果有n个处理器参与计算,理论上计算速度是原来的n倍(不考虑开销)。 同时也可以看到,每个处理器的计算方法并没有变,算法复杂度还是一样的。从学术角度看,算法没有任何改进;但通过并行的方法,我们确实提高了效率,缩短了计算时间,这就是工程上的意义。 ==== MPI实现 ==== 接下来我们用MPI(Message Passing Interface,消息传递接口)来实现上述并行算法。 代码来自Michael Quinn编著的//Parallel Programming in C with MPI and OpenMP//(ISBN:0072822562),个人只做了少量调整,完整代码可点击[[ftp://gwduan.com/pub/wiki/parallel/|并行代码]]下载。 === 文件列表 === * Makefile * mv1.c(主程序) * common.c(其他函数) * mympi.h(头文件) * m.txt(测试矩阵数据) * v.txt(测试向量数据) === 主函数main() === 初始化: MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &id); MPI_Comm_size(MPI_COMM_WORLD, &p); 从参数文件获取矩阵数据,按行划分的方式分配给各个参与计算的进程: mympi_read_row_striped_matrix(argv[1], (void ***)&a, (void **)&storage, MPI_TYPE, &m, &n, MPI_COMM_WORLD); 从参数文件获取向量数据,每个参与进程都有完整的一份: mympi_read_replicated_vector(argv[2], (void **)&b, MPI_TYPE, &nprime, MPI_COMM_WORLD); 每个进程进行自己负责部分的计算: local_rows = BLOCK_SIZE(id, p, m); for (i = 0; i < local_rows; i++) { c_block[i] = 0.0; for (j = 0; j < n; j++) c_block[i] += a[i][j] * b[j]; } 将分散在各个进程的结果数据抓取合并成完整向量: mympi_replicate_block_vector(c_block, m, c, MPI_TYPE, MPI_COMM_WORLD); 结束: MPI_Finalize(); 这就是实现的整体框架,相关子函数的说明见后。 ==== 运行测试 ==== 代码在OpenMPI环境下测试通过。如果你要搭建自己的并行学习环境,可参见:[[..:env:openmpi|OpenMPI并行环境的模拟]]。 执行结果: $ mpirun -np 4 mv1 m.txt v.txt 2.000 1.000 3.000 4.000 0.000 5.000 -1.000 2.000 -2.000 4.000 0.000 3.000 4.000 1.000 2.000 2.000 3.000 1.000 -3.000 0.000 3.000 1.000 4.000 0.000 3.000 19.000 34.000 25.000 13.000 ==== 子函数说明 ==== === 读取矩阵数据mympi_read_row_striped_matrix() === 从文件获得矩阵数据,既可以由每个进程分别从文件读取数据,也可由专门的进程负责全部读取工作,然后再传递给相关进程。当数据很大时,如何实现需要根据具体的运行环境和网络状况来决定。 这里选择由专门进程读取的方式。 进程0负责读取工作。首先获取矩阵的维数,然后调用[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Bcast.html|MPI_Bcast]]广播给所有进程: if (!id) { if (!(fp = fopen(file, "r"))) { printf("Error: open matrix file[%s] - %s\n", file, strerror(errno)); fflush(stdout); MPI_Abort(comm, OPEN_FILE_ERROR); } if (mympi_fread(m, MPI_INT, 1, fp) != 1) { printf("Error: read row number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (mympi_fread(n, MPI_INT, 1, fp) != 1) { printf("Error: read col number of matrix.\n"); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } ... } MPI_Bcast(m, 1, MPI_INT, 0, comm); MPI_Bcast(n, 1, MPI_INT, 0, comm); 进程0依次获取每行数据,然后判断该发给谁:如果是自己,则复制到本地缓冲区里;否则通过[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Send.html|MPI_Send]]发给相应进程。其他进程则只管执行[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Recv.html|MPI_Recv]]等待接收数据: count = 0; for (i = 0; i < *m; i++) { dest_id = BLOCK_OWNER(i, p, *m); if (!id) { if (mympi_fread(tmpbuf, dtype, *n, fp) != *n) { printf("Error: read %d row data of matrix.\n", i); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } if (dest_id == id) { memcpy((*subs)[count++], tmpbuf, (*n) * datum_size); } else { MPI_Send(tmpbuf, *n, dtype, dest_id, DATA_MSG, comm); } } else if (dest_id == id) { MPI_Recv((*subs)[count++], *n, dtype, 0, DATA_MSG, comm, &status); MPI_Get_count(&status, dtype, &status_count); if (status_count != *n) { printf("Error: recv count %d != %d.\n", status_count, *n); fflush(stdout); MPI_Abort(comm, DATA_ERROR); } } } === 读取向量数据mympi_read_replicated_vector() === 同样由进程0负责读取工作,先获取向量参数,再获取向量内容。由于向量数据每个进程都用到,所以只需通过MPI_Bcast广播给所有进程即可: MPI_Bcast(*v, *n, dtype, 0, comm); === 合并结果向量mympi_replicate_block_vector() === 调用[[http://www.mcs.anl.gov/research/projects/mpi/www/www3/MPI_Allgatherv.html|MPI_Allgatherv]]抓取数据,使得每个进程都有一份完整的结果向量: mympi_create_mixed_xfer_arrays(p, n, &cnt, &disp); MPI_Allgatherv(src, cnt[id], dtype, dst, cnt, disp, dtype, comm); 参数的数据结构有点复杂,请详细参阅函数manual。 === 显示矩阵数据mympi_print_row_striped_matrix() === 理解了上述部分,该函数不难明白,故不再介绍。 === 显示向量数据mympi_print_replicated_vector() === 略(同上)。 {{tag>并行计算}}