响水凹

欢迎来到 Guang-Wen Duan (Dennis Duan) 的个人 Wiki

用户工具

站点工具


computer:parallel:parallel

并行计算

我们对计算能力的需求越来越高,随着计算机单CPU处理能力的提升遇到瓶颈,并行方案成了解决之道:从硬件层的CPU多核技术,到软件层的并行计算、分布式计算、云计算…等等。

本文尽量用平实的语言来介绍并行计算(Parallel Computing),不涉及过多的学术概念和数学内容,以便没有背景知识也能理解。但如果你对该领域有兴趣,想深入学习的话,相关基础是必须的。

Foster方法论

并行程序设计常用Ian Foster的四步方法论:partitioning、communication、agglomeration、mapping。

  1. Partitioning(划分)
    • 实现并行的首要任务就是找到能并行化的部分,理论上越多越好。通过对计算的划分和数据的划分,我们尽可能多的找到能并行的计算任务。
  2. Communication(通信)
    • 通常计算任务之间不是孤立的,彼此需要通信。当通信到达一定程度时,相关的开销是无法忽略的,甚至导致部分计算只能串行执行。
  3. Agglomeration(聚集)
    • 接下来开始考虑与具体并行计算环境相结合的问题。通常并行任务的数量要远远大于计算机处理器的数量,因而需要将多个计算任务合并到一个处理器上运行。合并时,尽量让那些彼此有通信的任务在一起,这样能降低整体的通信开销,当然,也要保持整个计算方法的可扩展性。
  4. Mapping(映射)
    • 最后就是将合并后的计算任务投到具体的处理器上运行了,该部分的一个考虑是如何提高处理器的利用率。如果你是程序员,就是着手进行并行程序的设计和实现的时候了。

样例

接下来举一个很典型的例子,来看看如何将其并行化,这个例子就是矩阵向量乘法。

串行算法

对一个(m x n)的矩阵a,与一个(n)维的向量b相乘,结果为一个(m)维的向量c。

先看看传统的串行实现方法,很简单:

        for (i = 0; i < m; i++) {
                c[i] = 0;
                for (j = 0; j < n; j++)
                        c[i] += a[i][j] * b[j];
        }

并行设计

通过算法可以看到,矩阵a的每一行,与结果向量c的一个元素对应,行与行之间没有任何交互,可以独立计算。

很自然的,我们就可以想到:

  • 以行为单位来划分,一行为一个计算任务。
  • 这是个很理想的例子,因为计算任务之间没有通信开销。
  • 通常矩阵的行数比集群的处理器数量大得多,我们就将矩阵按行分割,均匀的分配给每个处理器。
  • 各个处理器用传统方法分别计算自己负责的那部分数据,算出结果向量的一部分。
  • 将分散在各个处理器上的部分结果合在一块,就是最终的结果。

如果有n个处理器参与计算,理论上计算速度是原来的n倍(不考虑开销)。

同时也可以看到,每个处理器的计算方法并没有变,算法复杂度还是一样的。从学术角度看,算法没有任何改进;但通过并行的方法,我们确实提高了效率,缩短了计算时间,这就是工程上的意义。

MPI实现

接下来我们用MPI(Message Passing Interface,消息传递接口)来实现上述并行算法。

代码来自Michael Quinn编著的Parallel Programming in C with MPI and OpenMP(ISBN:0072822562),个人只做了少量调整,完整代码可点击并行代码下载。

文件列表

  • Makefile
  • mv1.c(主程序)
  • common.c(其他函数)
  • mympi.h(头文件)
  • m.txt(测试矩阵数据)
  • v.txt(测试向量数据)

主函数main()

初始化:

        MPI_Init(&argc, &argv);
        MPI_Comm_rank(MPI_COMM_WORLD, &id);
        MPI_Comm_size(MPI_COMM_WORLD, &p);

从参数文件获取矩阵数据,按行划分的方式分配给各个参与计算的进程:

        mympi_read_row_striped_matrix(argv[1], (void ***)&a, (void **)&storage,
                                        MPI_TYPE, &m, &n, MPI_COMM_WORLD);

从参数文件获取向量数据,每个参与进程都有完整的一份:

        mympi_read_replicated_vector(argv[2], (void **)&b, MPI_TYPE,
                                        &nprime, MPI_COMM_WORLD);

每个进程进行自己负责部分的计算:

        local_rows = BLOCK_SIZE(id, p, m);
        for (i = 0; i < local_rows; i++) {
                c_block[i] = 0.0;
                for (j = 0; j < n; j++)
                        c_block[i] += a[i][j] * b[j];
        }

将分散在各个进程的结果数据抓取合并成完整向量:

        mympi_replicate_block_vector(c_block, m, c, MPI_TYPE, MPI_COMM_WORLD);

结束:

        MPI_Finalize();

这就是实现的整体框架,相关子函数的说明见后。

运行测试

代码在OpenMPI环境下测试通过。如果你要搭建自己的并行学习环境,可参见:OpenMPI并行环境的模拟

执行结果:

$ mpirun -np 4 mv1 m.txt v.txt
 2.000  1.000  3.000  4.000  0.000 
 5.000 -1.000  2.000 -2.000  4.000 
 0.000  3.000  4.000  1.000  2.000 
 2.000  3.000  1.000 -3.000  0.000 

 3.000  1.000  4.000  0.000  3.000 

19.000 34.000 25.000 13.000

子函数说明

读取矩阵数据mympi_read_row_striped_matrix()

从文件获得矩阵数据,既可以由每个进程分别从文件读取数据,也可由专门的进程负责全部读取工作,然后再传递给相关进程。当数据很大时,如何实现需要根据具体的运行环境和网络状况来决定。

这里选择由专门进程读取的方式。

进程0负责读取工作。首先获取矩阵的维数,然后调用MPI_Bcast广播给所有进程:

        if (!id) {
                if (!(fp = fopen(file, "r"))) {
                        printf("Error: open matrix file[%s] - %s\n",
                                        file, strerror(errno));
                        fflush(stdout);
                        MPI_Abort(comm, OPEN_FILE_ERROR);
                }
                if (mympi_fread(m, MPI_INT, 1, fp) != 1) {
                        printf("Error: read row number of matrix.\n");
                        fflush(stdout);
                        MPI_Abort(comm, DATA_ERROR);
                }
                if (mympi_fread(n, MPI_INT, 1, fp) != 1) {
                        printf("Error: read col number of matrix.\n");
                        fflush(stdout);
                        MPI_Abort(comm, DATA_ERROR);
                }
                ...
        }
        MPI_Bcast(m, 1, MPI_INT, 0, comm);
        MPI_Bcast(n, 1, MPI_INT, 0, comm);

进程0依次获取每行数据,然后判断该发给谁:如果是自己,则复制到本地缓冲区里;否则通过MPI_Send发给相应进程。其他进程则只管执行MPI_Recv等待接收数据:

        count = 0;
        for (i = 0; i < *m; i++) {
                dest_id = BLOCK_OWNER(i, p, *m);
 
                if (!id) {
                        if (mympi_fread(tmpbuf, dtype, *n, fp) != *n) {
                                printf("Error: read %d row data of matrix.\n",
                                        i);
                                fflush(stdout);
                                MPI_Abort(comm, DATA_ERROR);
                        }
                        if (dest_id == id) {
                                memcpy((*subs)[count++], tmpbuf,
                                        (*n) * datum_size);
                        } else {
                                MPI_Send(tmpbuf, *n, dtype, dest_id,
                                                DATA_MSG, comm);
                        }
                } else if (dest_id == id) {
                        MPI_Recv((*subs)[count++], *n, dtype, 0, DATA_MSG,
                                                        comm, &status);
                        MPI_Get_count(&status, dtype, &status_count);
                        if (status_count != *n) {
                                printf("Error: recv count %d != %d.\n",
                                                status_count, *n);
                                fflush(stdout);
                                MPI_Abort(comm, DATA_ERROR);
                        }
                }
        }

读取向量数据mympi_read_replicated_vector()

同样由进程0负责读取工作,先获取向量参数,再获取向量内容。由于向量数据每个进程都用到,所以只需通过MPI_Bcast广播给所有进程即可:

        MPI_Bcast(*v, *n, dtype, 0, comm);

合并结果向量mympi_replicate_block_vector()

调用MPI_Allgatherv抓取数据,使得每个进程都有一份完整的结果向量:

        mympi_create_mixed_xfer_arrays(p, n, &cnt, &disp);
        MPI_Allgatherv(src, cnt[id], dtype, dst, cnt, disp, dtype, comm);

参数的数据结构有点复杂,请详细参阅函数manual。

显示矩阵数据mympi_print_row_striped_matrix()

理解了上述部分,该函数不难明白,故不再介绍。

显示向量数据mympi_print_replicated_vector()

略(同上)。

computer/parallel/parallel.txt · 最后更改: 2014/11/01 02:02 由 127.0.0.1