MPI簡談

  • 2022 年 7 月 30 日
  • 筆記

MPI簡談


MPI是分散式記憶體系統,區別於OpenMP和Pthreads的共享記憶體系統。MPI是一種基於消息傳遞的並行編程技術,是如今最為廣泛的並行程式開發方法。

MPI前世今生

MPI(Message Passing Interface,簡稱MPI)是一種編程介面標準,不是一種具體的程式語言。

  • 1992年開始起草
  • 1994年發布第一個版本MPI-1
  • 1997年發布第二個版本MPI-2
  • 成為消息傳遞並行編程標準,也是最為流行的並行編程介面

MPI實現小記

MPI是一個標準,不是語言。

  1. MPICH
  1. Intel MPI
  • Intel MPI下載
  • Intel MPI是符合MPI-2標準的MPI實現。
  • Intel MPI在通訊協議的選擇上無需進行額外設置,可自動選擇MPI進程間最快的傳輸協議。

MPI特點

聚是一團火,散如滿天星

  • 基於消息傳遞的並行程式,也就是所謂的分散式記憶體心痛,其中的每個進程之間具有自己獨立的堆棧和程式碼段,作為互不相關的多個程式執行,進程之間的資訊交互完全通過顯式的調用通訊函數來實現。

  • SPMD(Single Program Multiple Data)單程式多數據,使用一個程式來處理多個不同的數據集來達到並行的目的

  • MPMD(Multiple Program Multiple Data)多程式多數據,使用不同的程式處理多個數據集,合作求解同一個問題

SPMD

本質上就是對於不同的數據集合都是一樣的處理,在串列中,如果此時有一個數據集A,那麼就是將該操作對數據集A全部執行過去。

那麼在SPMD中,比如現在有三個獨立的進程,那麼就是將數據集A均等拆分成三份a1,a2,a3分給這三個進程(在SPMD中,每一個進程會有一個進程號rank,用來相互區別),等到每個進程處理完任務後,再通過消息傳遞的辦法來收集處理數據處理的結果。MPI標準正是為了實現最後的消息傳遞提供標準和實現方法。

MPMD

MPMD目前有三種典型的執行模型

  1. 管理者(Master)/工人(Worker)類型
  • 由一個管理者程式來控制整個程式的執行
  • 管理者程式負責將不同的任務分配給多個工人程式來完成工作
  1. 聯合數據分析類型
  • 不同的程式各自獨立的完成自己的任務,在特定的時候交換數據
  • 耦合性最少,通訊少,較為容易獲得更好的並行加速效果(相對於串列來說)
  1. 流式類型
  • 和工廠中的流水線類似,假設有三個進程,那麼對於一個任務來說,進程1完成後的輸出作為進程2的輸入,同理,進程2的輸出作為進程3的輸出,不同的執行緒之間構成的實際上是串列關係,但是當數據量足夠大的時候,我們其實可以把這種關係當作並行關係,也就是任務1,2,3近似的同時開始,同時結束(不考慮是否為同一個任務)

編寫並行程式是為了利用冗餘硬體(例如多核,多處理器或多機)提高應用性能。

MPICH誕生之旅

眾里尋他千百度。驀然回首,那人卻在,燈火闌珊處。

筆者下載安裝MPICH走了好多彎路,謹以此文紀念。

溫馨提示:接下來的教程只適用於windowslinux筆者可不負責哦。

安裝網址推薦

MPICH-2,點擊去官網安裝,或者複製下面的鏈接去//www.mpich.org/static/downloads/1.4.1p1/安裝。

進入到的介面後尋找mpich2-1.4.1p1-win-x86-64.msi,點擊下載就可以了。下載後會得到一個文件,但是注意這時候將其移動到桌面,

開始安裝

注意,不要直接點擊安裝,這裡的推薦是以管理員身份安裝。右鍵進行管理員安裝,但是win10沒有這個操作,這裡推薦是左下角windows圖標,右鍵,然後找Windows Powershell(管理員)或者就是找到管理員就可以了。進入介面按照筆者的順序輸入命令即可(注意需要保證前面的msi文件已經放在桌面了,不然就需要自己cd去找了,這裡熟悉dos系統的可以跳過)

cd..
cd..
cd users/username/desktop
msiexec /package mpich2-1.4.1p1-win-x86-64.msi

一共輸入四個命令,注意第三個命令cd users/username/desktop中的username需要替換成自己的用戶名,也就是開機進入的自己當初起的名字。第四個命令的 mpich2-1.4.1p1-win-x86-64.msi是當初下載下來的文件名字,如果不是這個名字記得自己改下名字。

進入安裝

如果前面的操作沒有問題,那麼現在已經運行了安裝程式。

注意兩點:

  • 注意不要點的太快,中間有一個process manager setup介面,那裡需要輸入自己的開機密碼(針對筆記型電腦電腦),不是默認的behappy,筆者沒有嘗試過,但是聽說直接behappy最後安裝的結果也是蠻happy的。

  • 之後會有一個just for mefor everyone的選項,默認是just for me,這裡推薦改成for everyone,這裡所有的修改,都是為了之後安裝的順利。

  • 最後一直按next就行了,如果不想要在默認的文件夾,自己修改的話,那麼注意自己找好想放的位置就可以了。這裡默認路徑為C:/program files/mpich2

繼續出發

這裡同時按住win+R,在裡面輸入cmd,按回車,喚出dos系統。

然後依次輸入以下命令(本質上就是找到那個mpich2文件)

cd/.
cd program files
cd mpich2
cd bin
smpd -status

這裡如果出現smpd running on ...的資訊,就說明smpd是運行的,安裝可以。不行的話,可能需要重新安裝qaq。

設置mpiexec

使用mpiexec我們需要先註冊賬戶

接下來在文件夾中找到moich2文件夾,點進去找到bin,然後找到wmpiregisterexe文件點擊進行註冊。當然如果之前的命令行窗口沒有關閉,也可以輸入mpiexec -register進行,是一樣的,不過這個有圖形化介面,推薦用圖形化介面,好看一點。

同樣是設置賬戶和密碼,這裡的賬戶就是鎖屏之後電腦上顯示的你的用戶名字,密碼就是鎖屏密碼。

註冊完畢之後,可以在dos窗口下輸入該命令:

mpiexec -validate

如果系統的反應是’SUCCESS’,那麼就說明你註冊成功了,否則需要重新開始註冊。

環境變數的配置

  • 找到我的電腦,右鍵找到屬性,點進去,然後找到高級系統設置,點擊進去,裡面的窗口中有環境變數點擊進去,這裡有自己的用戶變數和系統變數,我們選擇在系統變數加入。

  • 在裡面找到Path,點擊,然後新建一個路徑,將剛才的mpich2裡面的bin文件添加進去,如果安裝路徑是前面所說的,那麼就是

    C:/program files/mpich2/bin

    加入即可。

  • 注意此時需要推出dos系統重新按照之前的策略,找到bin,或者輸入下面的命令。

cd /.
cd program files
cd mpich2
cd examples

可以自行運行裡面的cpi.exe文件看看是否正常。

MPI落地

如果前面的方法都沒有用,沒有關係。我們這邊直接來一個微型mpi環境的搭建,一切為了運行自己的第一個mpi程式。

通用操作:

mpi下載

進入官網以後點擊download,將裡面的msmpisetup.exemsmpisdk.msi都下載下來。這裡默認下載下來以後一直點的是確認,沒有修改安裝地址,如果有修改,根據筆者的程式碼自行修改地址那邊的資訊。

mingw64-8.1.0

注意往下拉,找到x86_64-posix-seh,點擊下載,最好是64位的,好像32位不是很匹配。以及環境變數的配置,一樣找到我的電腦(此電腦),右鍵選擇屬性,然後找到高級系統設置,然後找到環境變數,點進去。

在用戶變數和系統變數找到path,然後剛才下載下來的x86_64-posix-seh,解壓完成之後,找到裡面的文件夾bin,比如筆者的就是C:\Program Files\mingw64\bin,也就是我將mingw64放在了C盤的program files下,然後在每個path下加入就可以了。

檢驗的過程就是仍然是同時按win+R,喚出dos系統,輸入gcc --version,如果在一大堆英文中看到8.1.0就說明安裝以及配置環境變數成功了。

vscode

vscode

一進宮

推薦去官網下載,點擊download即可,裡面的配置可以放心選擇默認。如果下載完vscode,可以喚出dos系統,然後輸入code -v,如果第一行是版本號,第三行是x64說明安裝成功了。

二進宮

這裡第一次打開vscode是英文介面,不熟悉的同學沒有關係,往右邊的懸浮窗口尋找,找到第五個擴展,或者直接Ctrl+Shift+X,在裡面輸入Chinese,然後下載應用就可以了,記得做完退出再進來就是中文介面了。

三進宮

接下來一樣的操作,仍然是在拓展窗口,這時候我們搜索code runner,下載並且應用,點確認就可以了。然後依然是退出再進去。

我不來啦

最後一次進去,點擊拓展介面,這時候應該能看到code runner在你的搜索欄下面,這時候右鍵,找到拓展設置點擊,然後在裡面找到這個Code-runner:Executor Map,如果覺得麻煩,直接Ctrl+F,然後再那個框中粘貼Code-runner:Executor Map,按個回車就能找到了,然後點擊下面的在setting.json中編輯,找到裡面的:

"c":"...."
"cpp":"...."

我們要修改的就是這個。

造宮殿

將原先的替換成下面的:

"c": "cd $dir && gcc $fileName -o $fileNameWithoutExt -fopenmp -l msmpi -L \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\" -I \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\" && mpiexec -n 4 $fileNameWithoutExt",
"cpp": "cd $dir && g++ $fileName -o $fileNameWithoutExt -fopenmp -l msmpi -L \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\" -I \"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\" && mpiexec -n 4 $fileNameWithoutExt",

注意這裡面的MS-MPI庫的位置(\"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Lib\\x64\"\"C:\\Program Files (x86)\\Microsoft SDKs\\MPI\\Include\"),尤其注意"\之前都要加上轉義符號\,否則vscode後面運行的時候會出現no this file or directionary,這邊也可以按照自己之前下載的位置找到x64Include對應替換就可以了

運行自己的第一個mpi程式

拷貝下面程式,然後在vscode上運行,記著點擊右上角的三角,或者直接Ctrl+Alt+N,就可以測試自己的mpi環境搭建的如何了。

#include <iostream>
#include <mpi.h>
#include <stdint.h>
using namespace std;

int main(int argc, char* argv[]) {
  cout << "hello" << endl;
  int myid, numprocs;
  int namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Get_processor_name(processor_name, &namelen);
  cout << "Hello World! Process" << myid << "of" << numprocs << " on " << processor_name << endl;
  MPI_Finalize();
  return 0;
}

如果輸出為

hello
hello
hello
hello
Hello World! Process0of4 on ...
Hello World! Process1of4 on ...
Hello World! Process2of4 on ...
Hello World! Process3of4 on ...

這裡的...每個人可能不一樣,應該是硬體的一種編號,大致類似就可以了。接下來就可以mpi的探索之旅啦。

DevCPP

又是我們夢開始的dev,不過因為目前dev的mingw64版本是5.11,編譯mpi程式的時候會有錯誤,所以我們要給自己的dev升級一下。

告別往昔

點擊Tools,在點擊Compiler Options,進入編譯器選項介面後,右上角從左到右點擊第三個Add a compiler set by folder,然後找到前面下載的mingw64文件夾,點擊確定。然後再點從左到右第四個(向右的箭頭)Rename the selected compiler set,為自己新搭建的編譯器起個名字。注意,如果以後想要換回以前的編譯環境,可以往上面的菜單欄中找到help,往下一行最右邊,就會看到剛才自己為編譯器起的名字,這時候點擊,然後選擇裡面的第一個就是以前自己的編譯器環境,以後切換可以在這邊實現。或者就是在剛才的Compiler Options裡面的第一欄點擊選擇就可以了。

浴火重生

Add the following commands when calling the compiler裡面添加下列程式碼:

-L "C:\Program Files (x86)\Microsoft SDKs\MPI\Lib\x64" -I "C:\Program Files (x86)\Microsoft SDKs\MPI\Include"

注意這裡面的MS-MPI庫的位置("C:\Program Files (x86)\Microsoft SDKs\MPI\Lib\x64"和`”C:\Program Files ,這邊也可以按照自己之前下載的位置找到x64Include對應替換就可以了

注意修改之前先在之前打勾,不然沒法選擇。

接著就是在下面的Add the following commands when calling the linker中改成下面的語句:

-static-libgcc -fopenmp -l msmpi

這裡的-fopenmp是為了運行omp,而-l msmpi是為了運行mpi,根據自己的需要自行選擇。

偷懶是一輩子的事情

這時候理論上可以開始運行自己的第一個mpi程式了,但是還不夠方便,因為此時編譯運行是沒有並行的感覺,所以我們還要進行一步操作。

點擊Tools,再點擊Configure Tools,點擊裡面的Add,下面的內容copy即可

標題 需要填寫的內容
Title MPI RUN FOR 4
Program C:\Windows\System32\cmd.exe
Working Directory C:\Windows\System32\
Parameters /c cd/d <PROJECTPATH> & mpiexec -n 4 <EXENAME> & "<EXECPATH>ConsolePauser.exe"

這邊的parameters裡面的數字4就是我們同時運行的執行緒有多少,這裡可以根據自己的需要自行修改

運行自己的第一個mpi程式

拷貝下面程式,然後在dev運行,注意先編譯,然後點擊Tools,然後選擇裡面的Package Manager,點擊自己剛剛搭建的MPI RUN FOR 4,就可以測試自己的mpi環境搭建的如何了。

#include <iostream>
#include <mpi.h>
#include <stdint.h>
using namespace std;

int main(int argc, char* argv[]) {
  cout << "hello" << endl;
  int myid, numprocs;
  int namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Get_processor_name(processor_name, &namelen);
  cout << "Hello World! Process" << myid << "of" << numprocs << " on " << processor_name << endl;
  MPI_Finalize();
  return 0;
}

如果輸出為

hello
hello
hello
hello
Hello World! Process0of4 on ...
Hello World! Process1of4 on ...
Hello World! Process2of4 on ...
Hello World! Process3of4 on ...

這裡的...每個人可能不一樣,應該是硬體的一種編號,大致類似就可以了。接下來就可以mpi的探索之旅啦。

MPI編程基礎

開始MPI語法的學習苦旅

關於int main(int argc, char* argv[])的解釋

注意main函數本質上只是一個程式執行的入口而已,平常我們使用scanf函數之類的,都是在執行的時候傳入參數,那麼有沒有方法在程式啟動的時候就傳遞參數呢,這裡我們就要用到int main(int argc, char* argv)

argc參數和argv參數

#include<stdio.h>

int main(int argc, char* argv[]) {
  printf("argc = %d\n", argc);
  printf("%s\n", *argv);
}

運行上面的程式,我們會發現這邊的結果為

argc = 1
C:/.../Untitled1.exe

argc代表了我們的命令行有1個字元串,而這個字元串就是C:/.../Untitled1.exe

所有我們就可以通過argc和argv這樣的關係來進行命令行的輸入

傳遞參數的方法

通過下面的格式傳遞:

程式名.exe 字元串1 字元串2 ...

#include<stdio.h>
#include<string.h>

int main(int argc, char* argv[]) {
  printf("argc = %d\n", argc);

  argv++;
  while(*argv) {
  	if(strcmp(*argv, "a") == 0) {
  		argv++;
  		printf("a\n");
	}else{
	    argv++;
	    printf("wrong\n");
	}
  }
  return 0;
}

輸入以上程式,編譯,假設此時產生的可執行文件叫做a.exe,那麼注意此時是喚出cmd窗口,找到這個文件所在的位置,然後輸入

a.exe a A w

就會得到一下的輸出

argc = 4
a
wrong
wrong

所以如果需要程式帶參數地啟動的時候,就是用int main(int argc, char* argv[]),僅此而已。

MPI程式引入

仍然是Hello World

#include<stdio.h>
#include "mpi.h"

int main(int argc, char* argv[]) {
  int rank;
  int size;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  printf("Hello World from process %d of %d\n", rank, size);
  MPI_Finalize();
  return 0;
}

這裡不使用argc,argv也是可以的

#include<stdio.h>
#include "mpi.h"

int main() {
  int rank;
  int size;
  MPI_Init(NULL, NULL);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  printf("Hello World from process %d of %d\n", rank, size);
  MPI_Finalize();
  return 0;
}

這裡運行的結果是:

Hello World from process 2 of 4
Hello World from process 1 of 4
Hello World from process 0 of 4
Hello World from process 3 of 4

這裡筆者是4個進程,分別列印他們各自的編號,注意這邊的順序有很多種,並沒有固定的順序,因為他們是並行的,誰快,誰就先佔用列印設備,僅此而已。

MPI四大護法

首先,想要運行mpi,很明顯
#include"mpi.h"是顯然必要的。

  • MPI_Init和MPI_Finalize

函數 作用
MPI_Init 用來初始化MPI執行環境,建立多個MPI之間的聯繫,為後續通訊做準備
MPI_Finalize 結束MPI執行環境

MPI_Init和MPI_Finalize配套使用,用來定義mpi程式的並行區。一般只有在這兩個定義的區域之內調用mpi函數,同時配套使用。

如果在並行區域之外有其他的行為執行,那麼不同於OpenMP,大部分MPI實現 會在各個並行進程之間獨立地執行相應地程式碼。

#include"mpi.h"
#include<stdio.h>

int main() {
  MPI_Init(NULL, NULL);
  MPI_Finalize();
  printf("Hello World\n");
  return 0;
}

/*
output:
Hello World
Hello World
Hello World
Hello World
*/
  • C語言中的MPI_Init需要提供argc和argv參數,如果沒有,寫成NULL就可以了,二MPI_Finalize函數不需要提供參數。二者的返回值都是int類型,標識函數是否調用成功。

  • 總的來說就是一下的調用形式

    • MPI_Init(&argc, &argv);
    • MPI_Init(NULL, NULL);
    • MPI_Finalize();
  • MPI_Comm_rank

MPI_Comm_rank就是表示各個MPI進程的,使用的時候需要提供兩個函數參數:

  • MPI_Comm類型的通訊域,標識參與計算的MPI進程組。MPI_COMM_WORLD是MPI實現預先定義好的進程組,指的是所有MPI進程所在的進程組,如果想要申請自己的進程組,則需要通過MPI_Comm定義並通過其他MPI函數生成。
  • 整型指針,返回進程在相應進程組中的進程號。即需要將rank存放的地址了,本質上可以認為同scanf的參數類似
  • MPI還會預先定義一個進程組MPI_COMM_SELF,只包含自己的進程組,因此裡面的編號都是0
#include<stdio.h>
#include"mpi.h"

int main() {
  int r1, r2;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &r1);
    MPI_Comm_rank(MPI_COMM_SELF, &r2);
    printf("%d %d\n", r1, r2);
  MPI_Finalize();
  return 0;
}
  • MPI_Comm_size

本函數表示相應進程組之間有多少個進程。其返回的也是整型值,同樣需要兩個參數:

  • MPI_Comm類型的通訊域,標識參與計算的MPI進程組,與上面類似,這裡就是MPI_COMM_WORLD

  • 整型指針,返回相應進程組中的進程數

#include<stdio.h>
#include"mpi.h"

int main() {
  int r1, r2, s1, s2;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &r1);
    MPI_Comm_rank(MPI_COMM_SELF, &r2);
    MPI_Comm_size(MPI_COMM_WORLD, &s1);
    MPI_Comm_size(MPI_COMM_SELF, &s2);
    printf("world %d of %d, self %d of %d\n", r1, s1, r2, s2);
  MPI_Finalize();
  return 0;
}

MPI的點對點通訊

點對點通訊時MPI編程的基礎。接下來將引入兩個重要的MPI函數MPI_SendMPI_Recv

先給程式碼,注意這邊的如果格式化(printf)的%d %s之類的漏掉的話,會發生通訊錯誤。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main(int argc, char* argv[]) {
  int myid, numprocs, next, namelen;
  char buffer[BUFLEN], processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);

    printf("Process %d on %s\n", myid, processor_name);
    printf("Process %d of %d\n", myid, numprocs);
    memset(buffer, 0, BUFLEN*sizeof(char));
    if(myid == numprocs-1)
      next = 0;
    else
      next = myid+1;
    if(myid == 0)
    {
        strcpy(buffer, "hello there");
        printf("%d sending '%s'\n", myid, buffer);
        fflush(stdout);
        MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
        printf("%d reveiving\n", myid);
        fflush(stdout);
        MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
        printf("%d received '%s'\n", myid, buffer);
        fflush(stdout);
    }else{
        printf("%d receiving\n", myid);
        fflush(stdout);
        MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
        printf("%d received '%s'\n", myid, buffer);
        fflush(stdout);
        MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
        printf("%d sent '%s'\n", myid, buffer);
        fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}

這裡筆者的輸出為

Process 2 on ...
Process 2 of 4
2 receiving
Process 0 on ...
Process 0 of 4
0 sending 'hello there'
Process 1 on ...
Process 1 of 4
1 receiving
Process 3 on ...
Process 3 of 4
3 receiving
0 reveiving
1 received 'hello there'
1 sent 'hello there'
2 received 'hello there'
2 sent 'hello there'
3 received 'hello there'
3 sent 'hello there'
0 received 'hello there'

接下來逐步拆解上面的程式

四劍客

MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Get_processor_name(processor_name, &namelen);

這四個語句所執行的都是初始化操作,其中一個新成員MPI_Get_processor_name是用來取得運行本進程的機器名稱,該名稱放在processor_name中,其長度為namelen,同時MPI_MAX_PROCESSOR_NAME是記錄機器名的最大長度的。

MPI_Get_processor_name
  • 注意MPI_Get_processor_name的用法。

  • MPI_Get_processor_name(processor_name, &namelen)

這裡後面的程式碼

if(myid == numprocs - 1)
    next = 0;
else
    next = myid + 1;

目的是為了告訴進程號他們下一個進程號是多少,注意這是一個循環,最後一個進程號的下一個進程號是0。所以這裡的程式碼也可以是next = (myid + 1) % numprocs;,至於寫哪一種就看各自的選擇了。

fflush

如今windows下的stdout變成及時輸出,所以一般來說適用不適用fflush也看不出太大的區別了。

注意,平時使用的printf函數並不是直接列印到螢幕上,而是先發送到stdout(此時的stdout類似緩衝區)中,再由stdout發送到螢幕上。

那麼假設現在stdout直到遇到\n才會進行列印輸出,那麼假設進程1發送hello給到stdout,然後這時候切換到進程2,進程2發送hello world\n給stdout,此時列印到螢幕上的就是

hellohello world

很明顯第一個明明是進程1的,但是在我們看來是執行進程2列印出來的,為了解決這個問題,我們就要使用fflush(stdout),它的作用就是立即將所有內容發送到指定輸出設備上(清空緩衝區)。一般在多執行緒的輸出中使用。

接下來主角登場

MPI_Send

  • MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
  • MPI_Send函數的標準形式是
    int MPI_SEND(buf, count, datatype, dest, tag, comm)

其中,輸入參數包括:

輸入參數 作用
buf 發送緩衝區的起始地址,可以是各種數組或結構的指針
count 整型,發送的數據個數,應為非負整數(感覺類似指針的偏移量)
datatype 發送數據的數據類型
dest 應該為整數,表示目的進程號,即destination
tag 應該為整數,消息標誌
comm MPI進程組所在的通訊域(應該是發送的哪個進程號所在的通訊域)
  • 該函數的作用就是向通訊域comm中的dest進程發送數據。消息數據存放在buf中,類型是datatype,個數是count個。這個消息的標誌是tag,用以和本進程向同意目的進程發送的其他消息區別開來。

對於具體的MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD)的解釋

在通訊域MPI_COMM_WORLD內,向進程號next發送資訊。發送的是buffer裡面的所有數據,數據類型就是MPI_CHAR(因為buffer存儲的是char類型的數據,MPI_CHAR是MPI的預定義數據類型,和char一一對應),MPI_Send的參數都是輸入參數,沒有輸出參數

MPI_Recv

  • MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
  • MPI_Recv的標準形式就是:int MPI_Recv(buf, count, datatype, source, tag, comm, status);
  • MPI_Recv中的buffer和status是輸出參數,其他的都是輸入參數

其中的參數包括:

參數類型 作用
buf 接收緩衝區的起始地址,可以是各種數組或結構的指針,為輸出參數
status MPI_Status結構指針,返回狀態資訊,為輸出參數
count 整數,最多可接收的數據個數
datatype 接收數據的數據類型
source 整型,接受數據的來源即發送數據進程號
tag 整數,消息標識,應與相應的發soon給操作消息標識相同。
comm 本進程(消息接收進程)和消息發送進程所在的通訊域

對於MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);的解釋:

在通訊域MPI_COMM_WORLD中,0號進程(假設是0)從任意進程(MPI_ANY_SOURCE表示接受任意進程發來的消息),接收的標籤號是99,而且不超過512個MPI_CHAR類型數據,保存到buffer中。

注意緩衝區buf的大小,不能小於發送過來的有效消息長度,否則可能由於數組越界導致程式錯誤(段錯誤)

MPI_Status
  • MPI_Status是MPI中一個特殊的,也是比較有用的結構。MPI_Status的結構定義如下:
typedef struct MPI_Status {
  int count;
  int cancelled;
  int MPI_SOURCE;
  int MPI_TAG;
  int MPI_ERROR;
} MPI_Status;
  • status主要顯示接收函數的各種錯誤狀態,我們通過訪問status.MPI_SOURCE,status.MPI_TAG和status.MPI_ERROR就可以得到發送數據進程號,發送數據使用的tag以及本接收操作返回的錯誤程式碼。當然如果想要獲取數據項數,筆者嘗試了一下,好像通過status.count無法獲取,需要通過MPI函數MPI_Get_count獲得。

MPI_Get_count

其標準定義為:

int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count);

其中前兩個參數為輸入參數,status是MPI_Recv返回的狀態結構的指針,datatype指定數據類型,最後一個參數是輸出參數,是實際接收到的給頂數據類型的數據項數。

筆者測試的程式如下,確實獲得了實際收到的個數。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXLEN 512

int main(int argc, char* argv[]) {
  int myid, namelen, numprocs;
  char buffer[MAXLEN], pro_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(pro_name, &namelen);
    printf("myid %d of %d running on %s\n", myid, numprocs, pro_name);
    if(myid == 0) {
      strcpy(buffer, "hello world");
      printf("processor 0 sending message: %s\n", buffer);
      fflush(stdout);
      MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, 3, 110, MPI_COMM_WORLD);
      printf("send %d data\n", strlen(buffer)+1);
      fflush(stdout);
    }
    if(myid == 3) {
      MPI_Recv(buffer, MAXLEN, MPI_CHAR, 0, 110, MPI_COMM_WORLD, &status);
      printf("processor 3 received message: %s\n", buffer);
      fflush(stdout);
      int count;
      MPI_Get_count(&status, MPI_CHAR, &count);
      printf("the data num is %d\n", count);
    }
  MPI_Finalize();
  return 0;
}

這裡的count其實本質上是需要根據數據類型變化的,MPI_DOUBLE,MPI_INT,MPI_CHAR對於同一長度的數據所能存儲的數據個數是不一樣的,這與C是一樣的。

上面的點對點通訊的例子,對應上面MPMD中的流式模型,即進程i等待進程i-1傳遞過來的字元串,並將其傳遞給進程i+1,直到最後一個進程傳遞給進程0。

消息管理7要素

mpi最重要的功能就是消息傳遞,MPI_Send和MPI_Recv負責在兩個進程之間接收資訊和發送資訊。主要由以下7個參數構成。

  • 發送或者接收緩衝區buf
  • 數據數量count
  • 數據類型datatype
  • 目標進程或者源進程destination/source
  • 消息標籤tag
  • 通訊域comm
  • 消息狀態status,只在接收的函數中出現

消息信封
MPI程式中的消息傳遞和我們日常的郵件發送和傳遞有類似之處,其中buf,coutn,datatype是信件的內容,而source/destination,tag,comm是信件的信封,因此我們稱之為消息信封。

消息數據類型

消息數據類型,就是之前所說的datatype

作用

  • 方便將非連續記憶體中的數據,以及具有不同數據類型的內容組成消息
  • 其類型匹配非常嚴格,一是宿主語言(如C)數據類型和通訊操作的數據類型匹配,同時發送方和接收方的數據類型匹配

基本數據類型

以下給出了MPI預定義數據類型與C數據類型的對應關係

MPI預定義數據類型 相應的C數據類型
MPI_CHAR signed char
MPI_SHORT signed short int
MPI_INT signed int
MPI_LONG signed long int
MPI_UNSIGNED_CHAR unsigned char
MPI_UNSIGNED_SHORT unsigned short int
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long int
MPI_FLOAT float
MPI_DOUBLE double
MPI_LONG_DOUBLE long double
MPI_BYTE 無對應類型
MPI_PACKED 無對應類型

基本上就是MPI+datatype的結構

一開始的時候建議儘可能地保證發送和接收地數據類型完全一致。

這裡面的多出來的MPI_BYTEMPI_PACKED,可以與任意以位元組為單位的消息相匹配。MPI_BYTE是將消息不加修改的通過二進位位元組流來傳遞的一種方式,而MPI_PACKED是為了將非連續的數據進行打包發送而提出的。經常與函數MPI_Pack_sizeMPI_Pack聯合使用。

下面是MPI_PACKED的使用程式碼:

#include"mpi.h"
#include<stdio.h>
#include<string.h>
#include<stdlib.h>

#define MAXLEN 512

int main() {
  int myid, namelen;
  MPI_Status status;
  char name[MPI_MAX_PROCESSOR_NAME], buf[MAXLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(name, &namelen);
    printf("processor %d is started on %s\n", myid, name);
    if(myid == 0) {
        double A[100];
        int buffersize;
        MPI_Pack_size(50, MPI_DOUBLE, MPI_COMM_WORLD, &buffersize);
        void* tempbuffer = malloc(buffersize);
        int j = sizeof(MPI_DOUBLE);
        int position = 0;
        for(int i = 0; i < 100; i++) A[i] = i * 1.1;
        printf("position : %d\n", position);
        for(int i = 0; i < 50; i++)
          MPI_Pack(A+i*2, 1, MPI_DOUBLE, tempbuffer, buffersize, &position, MPI_COMM_WORLD);
        MPI_Send(tempbuffer, position, MPI_PACKED, 1, 101, MPI_COMM_WORLD);
        free(tempbuffer);
    }
    if(myid == 1) {
      void* B = malloc(MAXLEN);
      MPI_Recv(B, MAXLEN, MPI_PACKED, 0, 101, MPI_COMM_WORLD,&status);
      int num;
      MPI_Get_count(&status, MPI_PACKED, &num);
      printf("%d\n", num);
      double* C = (double*)B;
      for(int i = 0; i < 50; i++) {
        printf("%lf\n", C[i]);
      }
      free(B);
    }
  MPI_Finalize();
  return 0;
}
  • MPI_Pack_size

    • 決定需要多大的緩衝區來存放數據
    • MPI_Pack_size(num, datatype, comm, buffersize)
    • 這裡是通過MPI_Pack_size來計算num個datatype數據所需要的記憶體,其結果存放在buffersize,注意buffersize給的是整型指針,comm就是通訊域
  • MPI_Pack

    • MPI_Pack(buf, sum, datatype, tempbuffer, buffersize, &position, comm)
    • buf是所要打包的數據的起始位置(指針or地址),第二個參數是打包幾個數據,第三個參數是說這回的數據的種類,第四個參數tempbuffer是要打包的地方,buffersize是緩衝區大小,第五個參數用於跟蹤已經有多少個數據被打包(同時也作為地址偏移量,本質上也是第一個數據開始存放的地方),第六個就是通訊域
導出數據類型

MPI還允許通過導出數據類型,將不連續的,甚至是不同類型的數據元素組合在一起形成新的數據類型。我們稱這種由用戶定義的數據類型為到此處數據類型。這需要由MPI提供的構造函數來構造。

總之類型匹配規則如下:

  • 有類型數據的通訊,發送方和接收方均使用相同的數據類型
  • 無類型數據的通訊,發送方和接收方均以MPI_BYTE作為數據類型
  • 打包數據的通訊,發送方和接收方均使用MPI_PACKED

消息標籤TAG

TAG是消息信封中的一項,是程式在同一接收者的情況下,用於標識不同類型消息的一個整數。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXN 512

int main() {
  int myid, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("processor %d running on %s\n", myid, processor_name);
    if(myid == 0) {
      char message[MAXN];
      strcpy(message, "hello, I'm processor 0\n");
      printf("processor 0 sending message: %s", message);
      fflush(stdout);
      MPI_Send(message, strlen(message)+1, MPI_CHAR, 2, 101, MPI_COMM_WORLD);
      strcpy(message, "goodbye, I'm processor 0\n");
      printf("processor 0 sending message: %s", message);
      fflush(stdout);
      MPI_Send(message, strlen(message)+1, MPI_CHAR, 2, 110, MPI_COMM_WORLD);
    }
    if(myid == 2) {
      char message[MAXN];
      MPI_Recv(message, MAXN, MPI_CHAR, 0, 101, MPI_COMM_WORLD, &status);
      printf("processor 2 received message: %s", message);
      fflush(stdout);
      MPI_Recv(message, MAXN, MPI_CHAR, 0, 110, MPI_COMM_WORLD, &status);
      printf("processor 2 received message: %s", message);
      fflush(stdout);
    }
  MPI_Finalize();
  return 0;
}

如果上述的例子假設沒有標籤的化,那麼有可能進程0發送的第二個資訊如果比第一個資訊塊,那麼進程2接收的就是第二個資訊,如果此時存儲的地方不一樣,就會導致消息溝通的錯誤,所以我們需要消息標籤來進行區別。

通訊域

消息的發送和接收必須使用相同的消息標籤才能實施通訊。維護TAG來匹配消息是比較繁瑣的事情,因此我們同時提出了另一項通訊域。

一個通訊域包含一個進程組及其上下文。進程組是進程的有限有序集。有限是說進程的數量是有限的,有序是編號是從0~n-1。

通訊域限定了消息傳遞的進程範圍。

一個進程在一個通訊組中,用它的編號進行標識,組的大小和進程號可以用前面所說的MPI_Comm_sizeMPI_Comm_rank獲得。

MPI預先定義了兩個進程組:MPI_COMM_SELF(只包含自己的通訊域)和MPI_COMM_WORLD(包含所有MPI進程的進程組),同時,MPI對於通訊子(通訊組)提供了各種管理函數。

  • int MPI_Comm_compare(comm1, comm2, result)

其中result是整型指針的傳遞,這裡比較comm1和comm2,如果comm1和comm2是相同的句柄,則result為MPI_Ident(感覺上是一個整型,但是實測的時候沒法列印,反正該函數通過result值得不同來表示結果),如果僅僅是個進程組得成員和序列號都相同,則result為MPI_Congruent,如果兩者得組成員相同但序列號不同則結果為MPI_Similar,否則結果就為MPI_Unequal

  • int MPI_Comm_dup(comm, newcomm)

對comm進行複製得到新的通訊域newcomm,注意這邊得newcomm是通過指針傳遞的,類型為MPI_Comm*

  • int MPI_Comm_solit(comm, color, key, newcomm)

通訊域分裂,本函數要求comm進程組中的每個進程都要執行,每個進程指定一個color(整型),如果具有相同的color值的進程形成一個新的進程組,新產生的通訊域與這些進程組一一對應。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define MAXN 512

int main() {
  MPI_Comm a;
  MPI_Status status;
  int myid, numprocs;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("MPI_COMM_WORLD:%d\n", myid);
    MPI_Comm_split(MPI_COMM_WORLD, myid%2, myid, &a);
    MPI_Comm_size(a, &numprocs);
    printf("%d\n", numprocs);
    MPI_Comm_rank(a, &myid);
    printf("a:%d\n", myid);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    if(myid == 0) {
       char buf[MAXN];
       strcpy(buf, "hello world from 0\n");
       printf("processor 0 sending : %s", buf);
       MPI_Send(buf, strlen(buf)+1, MPI_CHAR, 1, 110, a);
    }
    if(myid == 2) { // 這裡發現0和2是一組,0和3不是一組
      char buf[MAXN];
      MPI_Recv(buf, MAXN, MPI_CHAR, 0, 110, a, &status);
      printf("%s", buf);
    }
  MPI_Finalize();
  return 0;
}

注意新產生的通訊域包含舊的所有進程,只是不同的進程可能在不同的組別之中。新的進程組中,各個進程的順序編號根據key(整型)的大小決定,如果key越小,則相應進程在新通訊域中的順序編號也越小,如果key值相同,則根據這兩個進程在原來通訊域中順序號決定新的進程號。一個進程可能提供color值為MPI_Undefined,此時,newcomm返回MPI_COMM_NULL(分裂失敗)

  • int MPI_Comm_free(comm)

釋放給定的通訊域,注意這裡傳遞的是指針

狀態字(status)

狀態字的主要功能就是保存接收到的消息的狀態。

while(true) {
  MPI_Recv(..., ..., ..., MPI_ANY_SOURCE, MPI_ANY_TAG,...,...);
  switch(status.MPI_TAG) {
    case 0: ...;
    case 1: ...;
    case 2: ...;
  }
}

這裡的MPI_Recv沒有指定從哪裡接收資訊,可以接收任意來源的資訊,任意標籤的資訊(MPI_ANY_TAG),我們可以通過檢查status中的MPI_TAG可以有效把消息區分開來。當一個接收者能從不同進程接收不同大小和標籤的消息時,比如伺服器進程,查閱狀態資訊就會很有用。我們可以利用狀態字的標籤可以進行更多的有意思的操作。

通訊匹配聖經

  • 通訊數據類型匹配
  • 消息標籤,通訊域匹配
  • 發送進程與接收進程號對應
  • 接收消息的緩衝區大於發送過來的消息的大小

現在考慮如果當初的資訊大家都是先接收然後再發送,程式會怎麼樣呢?運行後會發現,程式進入了停滯狀態,此時0,1,2,3都是在receiving狀態,而這時候沒有進程可以發送消息來結束這個狀態,這種大家都在等待的狀態,稱為「死鎖」,死鎖現象在多進程,多執行緒編程中是經常發生的現象。 因為MPI_Send或MPI_Recv正確返回的前提是該通訊操作已經完成。對於發送操作來說就是緩衝區可以被其他的操作更新,對於接收操作來說就是該緩衝區中的數據已經可以被完整的使用。我們稱這樣的形式為阻塞通訊,如果沒有完成之前,其不會結束該次通訊操作。當然反過來,先發送再接收是可以執行下去的,因為發送操作不需要等待其他的先行操作,因此阻塞可以是有限的。阻塞通訊中點對點消息的匹配也對正確通訊有著至關重要的影響。

統計時間

編寫並行程式的目的是為了提高程式運行性能。為了檢驗並行化的效果,我們經常會用到統計時間的函數。MPI提供兩個時間函數MPI_WtimeMPI_Wtick

  • MPI_Wtime返回一個雙精度數,標識從過去的某點時間到當前時間所消耗的時間秒數

  • MPI_Wtick返回MPI_Wtime結果的精度

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main(int argc, char* argv[]) {
  int myid, numprocs, next, namelen;
  char buffer[BUFLEN], processor_name[MPI_MAX_PROCESSOR_NAME];
  MPI_Status status;
  double t1, t2, t3, tick;

  MPI_Init(&argc, &argv);
  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
  MPI_Comm_rank(MPI_COMM_WORLD, &myid);
  MPI_Get_processor_name(processor_name, &namelen);

  t1 = MPI_Wtime();

  printf("Processor %d on %s\n", myid, processor_name);
  printf("Processor %d of %d\n", myid, numprocs);
  memset(buffer, 0, BUFLEN*sizeof(char));
  if(myid == numprocs-1)
    next = 0;
  else
    next = myid + 1;

  if(myid == 0) {
    strcpy(buffer, "hello there");
    printf("%d sending '%s'\n", myid, buffer); fflush(stdout);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
    printf("%d receiving\n", myid); fflush(stdout);
    MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
    printf("%d received '%s'\n", myid, buffer); fflush(stdout);
  }else{
    printf("%d receiving\n", myid); fflush(stdout);
    MPI_Recv(buffer, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, 99, MPI_COMM_WORLD, &status);
    printf("%d received '%s'\n", myid, buffer); fflush(stdout);
    MPI_Send(buffer, strlen(buffer)+1, MPI_CHAR, next, 99, MPI_COMM_WORLD);
    printf("%d sent '%s'\n", myid, buffer); fflush(stdout);
  }

  t2 = MPI_Wtime();
  t3 = t2 - t1;
  tick = MPI_Wtick();
  printf("%d process time is '%.10f'\n", myid, t3);
  printf("%d process tick is '%.10f'\n", myid, tick);
  MPI_Finalize();
  return 0;
}

其實本質上和前面的時鐘打點函數的用法差不多,這裡MPI_Wtime就是獲得程式當前運行了多少時間,而MPI_Wtick就是獲得計時的精度。

錯誤管理

  • 通過status.MPI_ERROR來獲取錯誤碼
#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid;
  MPI_Status status;
  char buf[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("processor %d running\n", myid);
    if(myid == 0) {
      strcpy(buf, "hello, processor 1 from processor 0");
      printf("processor %d sending %s\n", myid, buf); fflush(stdout);
      MPI_Send(buf, strlen(buf)+1, MPI_CHAR, 1, 101, MPI_COMM_WORLD);
    }
    if(myid == 1) {
      MPI_Recv(buf, BUFLEN, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
      printf("processor %d received %s\n", myid, buf); fflush(stdout);
      printf("tag %d source %d\n", status.MPI_TAG, status.MPI_SOURCE);
      printf("error code %d\n", status.MPI_ERROR);
    }
  MPI_Finalize();
  return 0;
}
  • MPI終止MPI程式執行的函數MPI_Abort

int MPI_Abort(MPI_Comm, int errorcode)

該函數的作用使通訊域comm的所有進程退出,返回errorcode給調用的環境。通訊域comm中的任意進程調用此函數都能使該通訊域內所有的進程結束運行。這裡只要執行到這個程式碼,那麼所有的進程都會結束,類似於拋出異常的處理機制。

接下來進入本章的最後一個環節啦,加油。

MPI群集通訊

除了之前介紹的點對點通訊,MPI還有群集通訊。群集通訊,說白了就是包含一對多,多對一,多對多的進程通訊模式(就是不帶一對一玩,但其實本質上就是多對多,因為一對多和多對一不過是多對多的特例)。此時的通訊方式變成了多個進程參與通訊。

同步

int MPI_Barrier(MPI_Comm comm)

如下面這段程式碼,如果沒有MPI_Barrier,那麼進程運行快的會直接執行下面的程式碼,而有的進程還沒有執行第一行的輸出。

#include"mpi.h"
#include<stdio.h>

int main() {
  int myid;
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    printf("processor %d running\n", myid); fflush(stdout);
    MPI_Barrier(MPI_COMM_WORLD);
    printf("hello world %d\n", myid); fflush(stdout);
  MPI_Finalize();
  return 0;
}

這個函數就像是一道路障。使得通訊子comm中的所有進程相互同步,知道所有的進程都執行了他們各自的MPI_Barrier函數,然後各自開始執行後面的程式碼。同步函數是並行程式中控制執行順序的常用手段。(本質上就是強迫所有在通訊子comm中的進程,重新在Barrier那一行一起進行,讓某些執行緒達到同步,此時有點串列的味道)

廣播

廣播就是一對多的傳送消息,從一個root進程向組內所有其他的進程發送一條消息。

int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root,MPI_Comm)

相比於之前的MPI_Send,MPI_Bcast就是少了目標進程,此時的目標進程擴大為組內的所有進程。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char buf[BUFLEN], Buf[BUFLEN], name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, name); fflush(stdout);
    memset(buf, 0, sizeof(buf));
    memset(Buf, 0, sizeof(Buf));
    if(myid == 0) {
      strcpy(buf, "hello, I\'m processor 0\n");
    }
    printf("processor %d\'s buf : %s", myid, buf); fflush(stdout);
    printf("\nMPI_Bcast is started\n"); fflush(stdout);
    if(myid == 0) MPI_Bcast(buf, strlen(buf)+1, MPI_CHAR, 0, MPI_COMM_WORLD);
    MPI_Bcast(Buf, BUFLEN, MPI_CHAR, 0, MPI_COMM_WORLD);
    printf("processor %d\'s now buf : %s", myid, Buf); fflush(stdout);
  MPI_Finalize();
  return 0;
}

用法如上,本質上和Recv和Send很相似,不過沒有了tag,同時MPI_Bcast廣播本身可以做發送和接收,如果當前進程號等於root,那就是發送,否則就是接收。

聚集

int MPI_Gather(void* sendbuf, int sendcnt, MPI_Datatype sendtype, void* recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm)

該函數的作用就是root進程接收該通訊組每一個成員進程(包括root自己)發送的資訊。這n個消息的連接按進程號排列存放在root進程的接收緩衝中。每個緩衝由三元組(sendbuf, sendcnt, sendtype)標識。所有非root進程忽略接收緩衝。跟多的是接收的作用,只不過此時接收的是其他進程中發送過來的資訊。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, name); fflush(stdout);
    sprintf(buf, "hello, I\'m processor %d.", myid);
    printf("%s\n", buf); fflush(stdout);
    int len = strlen(buf);
    MPI_Gather(buf, len, MPI_CHAR, BUF, len, MPI_CHAR, 1, MPI_COMM_WORLD);
    //MPI_Barrier(MPI_COMM_WORLD);
    printf("processor %d\'BUF is %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

MPI_Gather注意這邊的函數sendcnt和recvcnt要匹配。如果不相等可能會造成通訊錯誤,其實質就是運行這些函數的進程開始相互通訊。注意該函數自帶有barrier的功能。

播撒

int MPI_Scatter(void* sendbuf, int sendcnt, MPI_Datatype sendtype, void* recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm)

MPI_scatter是一對多傳遞消息。和廣播不同的是,root進程向各個進程傳遞的消息可以是不同的。Scatter實際上執行的是與Gather相反的操作。

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Status status;
  MPI_Init(NULL, NULL);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, processor_name);
    memset(buf, 0, sizeof(buf));
    if(myid == 0) strcpy(buf, "hello, I\'m processor 0");
    printf("processor %d buf %s\n", myid, buf); fflush(stdout);
    int len = strlen(buf), next = (myid + 1) % numprocs;
    MPI_Barrier(MPI_COMM_WORLD);
    if(myid == 0) {
      MPI_Send(&len, 1, MPI_INT, next, 101,  MPI_COMM_WORLD);
      MPI_Recv(&len, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }else{
      MPI_Recv(&len, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
      MPI_Send(&len, 1, MPI_INT, next, 101,  MPI_COMM_WORLD);
    }
    MPI_Barrier(MPI_COMM_WORLD);
    printf("processor %d len %d\n", myid, len); fflush(stdout);
    MPI_Scatter(buf, len/4, MPI_CHAR, BUF, len/4, MPI_CHAR, 0, MPI_COMM_WORLD);
    printf("processor %d BUF %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

注意方便起見,建議這裡的sendcnt和recvcnt保持一直,同時注意這裡的recvcnt是表示每個進程接收的數量,而不是發送的總數量,注意這個區別,類似於一種分配塊中任務的數量。當然root進程可以給自己發送資訊。

擴展的聚集和播撒操作

MPI_Allgather的作用是每一個進程都收集到其他所有進程的消息,它相當於每一個進程都執行了MPI_Gather執行完了MPI_Gather之後,所有的進程的接收緩衝區的內容都是相同的,也就是說每個進程給所有進程都發送了一個相同的消息,所以名為allgather。本函數的介面是:

int MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char buf[BUFLEN], BUF[BUFLEN], name[MPI_MAX_PROCESSOR_NAME];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(name, &namelen);
    printf("processor %d of %d running on %s\n", myid, numprocs, name);
    memset(buf, 0, sizeof(buf));
    memset(BUF, 0, sizeof(BUF));
    sprintf(buf, "hello, I'm processor %d", myid);
    MPI_Allgather(buf, strlen(buf), MPI_CHAR, BUF, strlen(buf), MPI_CHAR, MPI_COMM_WORLD);
    printf("processor %d get message : %s\n", myid, BUF); fflush(stdout);
  MPI_Finalize();
  return 0;
}

全局交換

MPI_Allgather每個進程發送一個相同的消息給所有的進程,而MPI_Alltoall散發給不同進程的消息是不同的。因此,它的發送緩衝區也是一個數組。MPI_Alltoall的每個進程可以向每個接收者發送數目不同的數據,第i個進程發送的第j塊數據將被第j 個進程接收並存放在其他消息緩衝區recvbuf的第i塊,每個進程的sendcount和sendtype的類型必須和所有其他進程的recvcount和recvtype相同,這也意味著在每個進程和根進程之間發送的數據量必須和接收的數據量相等。函數介面為:

int MPI_Alltoall(void* sendbug, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype, MPI_Comm comm)

#include"mpi.h"
#include<stdio.h>
#include<string.h>

#define BUFLEN 512

int main() {
  int myid, numprocs, namelen;
  char processor_name[MPI_MAX_PROCESSOR_NAME], buf[BUFLEN], BUF[BUFLEN];
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Get_processor_name(processor_name, &namelen);
    printf("%d of %d running on %s\n", myid, numprocs, processor_name); fflush(stdout);
    sprintf(buf, "I\'m processor %d, hello!", myid);
    printf("processor %d : %s\n", myid, buf); fflush(stdout);
    memset(BUF, 0, sizeof(BUF));
    int len = strlen(buf);
    MPI_Alltoall(buf, len/numprocs, MPI_CHAR, BUF, len/numprocs, MPI_CHAR, MPI_COMM_WORLD);
    printf("processor %d get message: %s\n", myid, BUF);
  MPI_Finalize();
  return 0;
}

規約與掃描

MPI提供了兩種類型的聚合操作

規約

int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)

這裡的每個進程的待處理數據存放在sendbuf中,可以是標量也可以是向量。所有進程將這些值通過輸入的操作子op計算為最終結果並將它存入root進程的recvbuf中。具體的規約操作包括:

操作子 功能
MPI_MAX 求最大值
MPI_MIN 求最小值
MPI_SUM 求和
MPI_PROD 求積
MPI_LAND 邏輯與
MPI_BAND 按位與
MPI_LOR 邏輯或
MPI_BOR 按位或
MPI_LXOR 邏輯異或
MPI_BXOR 按位異或
MPI_MAXLOC 最大值且對應的位置
MPI_MINLOC 最小值且相應的位置

規約操作的數據類型與C中的整數類型對應。

#include"mpi.h"
#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<string.h>

#define LEN 10
#define BASE 1000

int main() {
  int myid, numprocs, num[LEN], out[LEN];
  srand(time(NULL));
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    for(int i = 0; i < LEN; i++) num[i] = 10*myid + i;
    printf("processor %d array: ", myid);
    for(int i = 0; i < LEN; i++) printf("%d ", num[i]);
    printf("\n");fflush(stdout);
    memset(out, 0, sizeof(out));
    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Reduce(&num, &out, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
    printf("processor %d array:", myid);
    if(out[0] == 0) printf("no\n");
    else{
      for(int i = 0; i < 10; i++) printf("%d ", out[i]);
      printf("\n");
    }
    fflush(stdout);
  MPI_Finalize();
  return 0;
}

注意這裡的數據量count指的是幾個數據參加,而這邊的操作其實是對所有執行緒的第i個數據進行的,所以傳遞的時候接收的就是經歷這些操作過後留下來的數據。化多執行緒為一個執行緒上的數據,歸一。

掃描

int MPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype, MPI_Op op, MPI_Comm comm)

MPI_Scan常用於對分布於族中的數據做前置規約操作。此操作將序列號為0,…,i(包括i)的進程發送緩衝區的規約結果存入序列號為i的進程接收消息緩衝區中。這種操作支援的數據類型,操作以及對發送及接收緩衝區的限制和規約相同。與規約相比,掃描操作設過去了root域,因為掃描是將部分值組合成n個最終值,並存放在n個進程的recvbuf中。具體的掃描操作有Op域定義。

MPI的規約和掃描操作允許每個進程貢獻向量值,而不只是標量值。向量的長度由Count定義。

#include"mpi.h"
#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<string.h>

#define LEN 10
#define BASE 1000

int main() {
  int myid, numprocs, num[LEN], out[LEN];
  srand(time(NULL));
  MPI_Init(NULL, NULL);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    for(int i = 0; i < LEN; i++) num[i] = 10*myid + i+1;
    printf("processor %d array: ", myid);
    for(int i = 0; i < LEN; i++) printf("%d ", num[i]);
    printf("\n");fflush(stdout);
    memset(out, 0, sizeof(out));
    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Scan(&num, &out, 10, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
    printf("processor %d array:", myid);
    if(out[0] == 0) printf("no\n");
    else{
      for(int i = 0; i < 10; i++) printf("%d ", out[i]);
      printf("\n");
    }
    fflush(stdout);
  MPI_Finalize();
  return 0;
}

與規約是類似的不過就是這裡的最終結果一定存放在最後一個進程中,同時注意隨著進程號的迭代,裡面的進程中的最後一個存放當前的結果。比如對於四進程來說,第二個進程存放一二進程中op操作子過後的值。

簡單示例

相關程式碼如下:

#include"mpi.h"
#include<stdio.h>
#include<math.h>

double f(double);

double f(double a) {
    return (4.0 / (1.0 + a*a));
}

int main(int argc, char* argv[]) {
    int n, myid, numprocs, i;
    double PI25DT = 3.141592653589793238462643;
    double mypi, pi, h, sum, x;
    double starttime = 0.0, endwtime;
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Get_processor_name(processor_name, &namelen);

    fprintf(stdout, "Process %d of %d is on %s\n", myid, numprocs, processor_name);
    fflush(stdout);
    n = 10000;
    if(myid == 0)
      starttime = MPI_Wtime();
    MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
    h = 1.0/(double)n;
    sum = 0.0;
    for(i = myid+1; i <= n; i += numprocs){
      x = h * ((double)i - 0.5);
      sum += f(x);
    }
    mypi = h * sum;
    MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
    if(myid == 0) {
      endwtime = MPI_Wtime();
      printf("pi is approximately %.16f, Error is %.16f\n", pi, fabs(pi-PI25DT));
      printf("wall clock time = %f\n", endwtime-starttime);
      fflush(stdout);
    }
    MPI_Finalize();
    return 0;
}

這裡本質上利用的是積分求pi,1/(1+x^2)的積分是arctanx,通過這種方式來實現。最後通過規約操作中的求和將四個執行緒的內容相加就可以了,也就是將其中的操作基本上分成四等分,然後來求解。

小結

  • 通訊子中的所有進程必須調用群集通訊歷程。如果有意個進程沒有調用,會產生奇奇怪怪的錯誤。
  • 一個進程一旦結束了群集操作就從群集常式中返回。
  • 每個群集歷程,也就是前面的群集函數都有阻塞的功能

MPI入門到此ending。完結撒花,感謝陪伴。

何當共剪西窗燭,卻話巴山夜雨時。

江湖再會,哈哈哈。