Pytorch 分散式模式介紹

  • 2020 年 2 月 14 日
  • 筆記

一  分散式訓練策略

數據較多或者模型較大時,為提高機器學習模型訓練效率,一般採用多GPU的分散式訓練。

按照並行方式,分散式訓練一般分為數據並行和模型並行兩種, 模型並行:分散式系統中的不同GPU負責網路模型的不同部分。例如,神經網路模型的不同網路層被分配到不同的GPU,或者同一層內部的不同參數被分配到不同GPU;

數據並行:不同的GPU有同一個模型的多個副本,每個GPU分配到不同的數據,然後將所有GPU的計算結果按照某種方式合併。

注意,上述中的不用GPU可以是同一台機上的多個GPU,也可以是不用機上的GPU。

當然也有數據並行和模型並行的混合模式

因為模型並行各個部分存在一定的依賴,規模伸縮性差(意思是不能隨意增加GPU的數量),在實際訓練中用的不多。而數據並行,則各部分獨立,規模伸縮性好,實際訓練中更為常用,提速效果也更好。

數據並行會涉及到各個GPU之間同步模型參數,一般分為同步更新和非同步更新。同步更新要等到所有GPU的梯度計算完成,再統一計算新權值,然後所有GPU同步新值後,才進行下一輪計算。非同步更新,每個GPU梯度計算完後,無需等待其他GPU的梯度計算(有時可以設置需要等待的梯度個數),可立即更新整體權值,然後同步此權值,即可進行下一輪計算。同步更新有等待,非同步更新基本沒有等待,但非同步更新涉及到梯度過時等更複雜問題。

1.模型並行

所謂模型並行指的是將模型部署到很多設備上(設備可能分布在不同機器上)運行,比如多個機器的GPUs。當神經網路模型很大時,由於顯示記憶體限制,它是難以在跑在單個GPU上,這個時候就需要模型並行。比如Google的神經機器翻譯系統,其可能採用深度LSTM模型,如下圖所示,此時模型的不同部分需要分散到許多設備上進行並行訓練。深度學習模型一般包含很多層,如果要採用模型並行策略,一般需要將不同的層運行在不同的設備上,但是實際上層與層之間的運行是存在約束的:前向運算時,後面的層需要等待前面層的輸出作為輸入,而在反向傳播時,前面的層又要受限於後面層的計算結果。所以除非模型本身很大,一般不會採用模型並行,因為模型層與層之間存在串列邏輯。但是如果模型本身存在一些可以並行的單元,那麼也是可以利用模型並行來提升訓練速度,比如GoogLeNet的Inception模組。

2.數據並行

深度學習模型最常採用的分散式訓練策略是數據並行,因為訓練費時的一個重要原因是訓練數據量很大。數據並行就是在很多設備上放置相同的模型,並且各個設備採用不同的訓練樣本對模型訓練。訓練深度學習模型常採用的是batch SGD方法,採用數據並行,可以每個設備都訓練不同的batch,然後收集這些梯度用於模型參數更新。前面所說的Facebook訓練Resnet50就是採用數據並行策略,使用256個GPUs,每個GPU讀取32個圖片進行訓練,如下圖所示,這樣相當於採用非常大的batch(256 × 32 = 8192)來訓練模型。

數據並行可以是同步的(synchronous),也可以是非同步的(asynchronous)。所謂同步指的是所有的設備都是採用相同的模型參數來訓練,等待所有設備的mini-batch訓練完成後,收集它們的梯度然後取均值,然後執行模型的一次參數更新。這相當於通過聚合很多設備上的mini-batch形成一個很大的batch來訓練模型,Facebook就是這樣做的,但是他們發現當batch大小增加時,同時線性增加學習速率會取得不錯的效果。同步訓練看起來很不錯,但是實際上需要各個設備的計算能力要均衡,而且要求集群的通訊也要均衡,類似於木桶效應,一個拖油瓶會嚴重拖慢訓練進度,所以同步訓練方式相對來說訓練速度會慢一些。非同步訓練中,各個設備完成一個mini-batch訓練之後,不需要等待其它節點,直接去更新模型的參數,這樣總體會訓練速度會快很多。但是非同步訓練的一個很嚴重的問題是梯度失效問題(stale gradients),剛開始所有設備採用相同的參數來訓練,但是非同步情況下,某個設備完成一步訓練後,可能發現模型參數其實已經被其它設備更新過了,此時這個梯度就過期了,因為現在的模型參數和訓練前採用的參數是不一樣的。由於梯度失效問題,非同步訓練雖然速度快,但是可能陷入次優解(sub-optimal training performance)。

非同步訓練和同步訓練在TensorFlow中不同點如下圖所示:

為了解決非同步訓練出現的梯度失效問題,微軟提出了一種Asynchronous Stochastic Gradient Descent方法,主要是通過梯度補償來提升訓練效果。應該還有其他類似的研究,感興趣的可以深入了解一下。

二 分散式訓練系統架構

系統架構層包括兩種架構:

Parameter Server Architecture(就是常見的PS架構,參數伺服器)

Ring-allreduce Architecture

1.Parameter server架構

在Parameter server架構(PS架構)中,集群中的節點被分為兩類:parameter server和worker。其中parameter server存放模型的參數,而worker負責計算參數的梯度。在每個迭代過程,worker從parameter sever中獲得參數,然後將計算的梯度返回給parameter server,parameter server聚合從worker傳回的梯度,然後更新參數,並將新的參數廣播給worker。

PS架構是深度學習最常採用的分散式訓練架構。採用同步SGD方式的PS架構如下圖所示:

2.Ring-allreduce架構

在Ring-allreduce架構中,各個設備都是worker,並且形成一個環,如下圖所示,沒有中心節點來聚合所有worker計算的梯度。在一個迭代過程,每個worker完成自己的mini-batch訓練,計算出梯度,並將梯度傳遞給環中的下一個worker,同時它也接收從上一個worker的梯度。對於一個包含N個worker的環,各個worker需要收到其它N-1個worker的梯度後就可以更新模型參數。其實這個過程需要兩個部分:scatter-reduce和allgather,百度的教程對這個過程給出了詳細的圖文解釋。百度開發了自己的allreduce框架,並將其用在了深度學習的分散式訓練中。

相比PS架構,Ring-allreduce架構是頻寬優化的,因為集群中每個節點的頻寬都被充分利用。此外,在深度學習訓練過程中,計算梯度採用BP演算法,其特點是後面層的梯度先被計算,而前面層的梯度慢於前面層,Ring-allreduce架構可以充分利用這個特點,在前面層梯度計算的同時進行後面層梯度的傳遞,從而進一步減少訓練時間。在百度的實驗中,他們發現訓練速度基本上線性正比於GPUs數目(worker數)。

一般的多卡gpu訓練有一個很大的缺陷,就是因為每次都需要一個gpu(cpu)從其他gpu上收集訓練的梯度,然後將新的模型分發到其他gpu上。這樣的模型最大的缺陷是gpu 0的通訊時間是隨著gpu卡數的增長而線性增長的。

所以就有了ring-allreduce,如下圖:

該演算法的基本思想是取消Reducer,讓數據在gpu形成的環內流動,整個ring-allreduce的過程分為兩大步,第一步是scatter-reduce,第二步是allgather。

先說第一步:首先我們有n塊gpu,那麼我們把每個gpu上的數據(均等的)劃分成n塊,並給每個gpu指定它的左右鄰居(圖中0號gpu的左鄰居是4號,右鄰居是1號,1號gpu的左鄰居是0號,右鄰居是2號……),然後開始執行n-1次操作,在第i次操作時,gpu j會將自己的第(j – i)%n塊數據發送給gpu j+1,並接受gpu j-1的(j – i – 1)%n塊數據。並將接受來的數據進行reduce操作,示意圖如下:

當n-1次操作完成後,ring-allreduce的第一大步scatter-reduce就已經完成了,此時,第i塊gpu的第(i + 1) % n塊數據已經收集到了所有n塊gpu的第(i + 1) % n塊數據,那麼,再進行一次allgather就可以完成演算法了。

第二步allgather做的事情很簡單,就是通過n-1次傳遞,把第i塊gpu的第(i + 1) % n塊數據傳遞給其他gpu,同樣也是在i次傳遞時,gpu j把自己的第(j – i – 1)%n塊數據發送給右鄰居,接受左鄰居的第(j – i – 2) % n數據,但是接受來的數據不需要像第一步那樣做reduce,而是直接用接受來的數據代替自己的數據就好了。

最後每個gpu的數據就變成了這樣:

首先是第一步,scatter-reduce:

然後是allgather的例子:

為什麼需要分散式

眾所周知,深度神經網路發展到現階段,離不開GPU和數據。經過這麼多年的積累,GPU的計算能力越來越強,數據也積累的越來越多,大家會發現在現有的單機單卡或者單機多卡上很難高效地復現模型,甚至對於有些新的數據集來講,單機訓練簡直就是噩夢。

DatasetImages MS COCO115,000  Open Image dataset v4 1,740,000

為什麼單機8卡也會是噩夢呢?我們拿COCO和Google最近Release出來的Open Image dataset v4來做比較,訓練一個resnet152的檢測模型,在COCO上大概需要40個小時,而在OIDV4上大概需要40天,這還是在各種超參數正確的情況下,如果加上調試的時間,可能一個模型調完就該過年了吧。

所以這個時候我們需要分散式。 Pytorch 分散式簡介

PyTorch 1.0穩定版終於正式發布了!新版本增加了JIT編譯器、全新的分散式包、C++ 前端,以及Torch Hub等新功能,支援AWS、Google雲、微軟Azure等雲平台。

torch.distributed軟體包和torch.nn.parallel.DistributedDataParallel模組由全新的、重新設計的分散式庫提供支援。新的庫的主要亮點有:

  • 新的 torch.distributed 是性能驅動的,並且對所有後端 (Gloo,NCCL 和 MPI) 完全非同步操作
  • 顯著的分散式數據並行性能改進,尤其適用於網路較慢的主機,如基於乙太網的主機
  • 為torch.distributed  package中的所有分散式集合操作添加非同步支援
  • 在Gloo後端添加以下CPU操作:send,recv,reduce,all_gather,gather,scatter
  • 在NCCL後端添加barrier操作
  • 為NCCL後端添加new_group支援

1.0的多機多卡的計算模型並沒有採用主流的Parameter Server結構,而是直接用了Uber Horovod的形式,也是百度開源的RingAllReduce演算法。

採用PS計算模型的分散式,通常會遇到網路的問題,隨著worker數量的增加,其加速比會迅速的惡化,例如resnet50這樣的模型,目前的TF在10幾台機器的時候,加速比已經開始惡化的不可接受了。因此,經常要上RDMA、InfiniBand等技術,並且還帶來了一波網卡的升級,有些大廠直接上了100GBs的網卡,有錢任性。而Uber的Horovod,採用的RingAllReduce的計算方案,其特點是網路通訊量不隨著worker(GPU)的增加而增加,是一個恆定值。簡單看下圖理解下,GPU 集群被組織成一個邏輯環,每個GPU有一個左鄰居、一個右鄰居,每個GPU只從左鄰居接受數據、並發送數據給右鄰居。即每次梯度每個gpu只獲得部分梯度更新,等一個完整的Ring完成,每個GPU都獲得了完整的參數。

這裡引入了一個新的函數model = torch.nn.parallel.DistributedDataParallel(model)為的就是支援分散式模式

不同於原來在multiprocessing中的model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()函數,這個函數只是實現了在單機上的多GPU訓練,根據官方文檔的說法,甚至在單機多卡的模式下,新函數表現也會優於這箇舊函數。

這裡要提到兩個問題:

  • 每個進程都有自己的Optimizer同時每個迭代中都進行完整的優化步驟,雖然這可能看起來是多餘的,但由於梯度已經聚集在一起並跨進程平均,因此對於每個進程都是相同的,這意味著不需要參數廣播步驟,從而減少了在節點之間傳輸張量tensor所花費的時間。
  • 另外一個問題是Python解釋器的,每個進程都包含一個獨立的Python解釋器,消除了來自單個Python進程中的多個執行執行緒,模型副本或GPU的額外解釋器開銷和「GIL-thrashing」。 這對於大量使用Python運行時的模型尤其重要。

初始化

torch.distributed.init_process_group(backend, init_method='env://', **kwargs)

參數說明:

  • backend(str): 後端選擇,包括上面那幾種 gloo,nccl,mpi
  • init_method(str,optional): 用來初始化包的URL, 我理解是一個用來做並發控制的共享方式
  • world_size(int, optional): 參與這個工作的進程數
  • rank(int,optional): 當前進程的rank
  • group_name(str,optional): 用來標記這組進程名的

Backends

Backends that come with PyTorch

PyTorch distributed currently only supports Linux. By default, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.)

Which backend to use?

In the past, we were often asked: 「which backend should I use?」.

  • Rule of thumb
  • Use the NCCL backend for distributed GPU training.
  • Use the Gloo backend for distributed CPU training.

init_method分析

『init_method』支援三種方式:

1. TCP initialization

tcp:// IP組播(要求所有進程都在同一個網路中)比較好理解,   以TCP協議的方式進行不同分散式進程之間的數據交流,需要設置一個埠,不同進程之間公用這一個埠,並且設置host的級別和host的數量。設計兩個參數rank和world_size。其中rank為host的編號,默認0為主機,埠應該位於該主機上。world_size為分散式主機的個數。

用該方式,運行上面的程式碼可以使用如下指令:

在主機01上:

python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 0 --world-size 2

在主機02上:

python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 1 --world-size 2

這裡沒有設置backend參數,所以默認是gloo。22225是埠號,用一個沒有沒佔用的就行。這兩句指令的先後順序沒有要求,只有兩條指令都輸入,程式才會運行起來。

2. Shared file-system initialization

file:// 共享文件系統(要求所有進程可以訪問單個文件系統)有共享文件系統可以選擇

提供的第二種方式是文件共享,機器有共享的文件系統,故可以採用這種方式,也避免了基於TCP的網路傳輸。這裡使用方式是使用絕對路徑在指定一個共享文件系統下不存在的文件。

在主機01上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2

在主機02上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

這裡相比於TCP的方式麻煩一點的是運行完一次必須更換共享的文件名,或者刪除之前的共享文件,不然第二次運行會報錯。

3. Environment variable initialization

env:// 環境變數(需要你手動分配等級並知道所有進程可訪問節點的地址)默認是這個

MASTER_PORT - required; has to be a free port on machine with rank 0  MASTER_ADDR - required (except for rank 0); address of rank 0 node  WORLD_SIZE - required; can be set either here, or in a call to init function  RANK - required; can be set either here, or in a call to init function

但是前兩個並沒有在init_process_group的參數里。這裡有一個官方文檔中的用例:

Node 1: (IP: 192.168.1.1, and has a free port: 1234)    >>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE             --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"             --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3             and all other arguments of your training script)  Node 2:    >>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE             --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"             --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3             and all other arguments of your training script)

需要注意的點

  • 一定要加rank, world_size參數
  • train_dataset最好不要用自己寫的sampler,否則還需要再實現一遍分散式的數據劃分方式

Dataloader中的參數

如果你的選項剛好是最壞情況,優化這個有可能達到2倍左右的性能提升,解釋一下DataLoader中其中兩個參數:

  • num_worker:數據集載入的時候,控制用於同時載入數據的執行緒數(默認為0,即在主執行緒讀取) 存在最優值,你會看到運行的時候pytorch會新建恰等於這個值的數據讀取執行緒,我猜,執行緒多於必要的時候,數據讀取執行緒返回到主執行緒反而會因為執行緒間通訊減慢數據。因此大了不好小了也不好。建議把模型,loss,優化器全注釋了只跑一下數據流速度,確定最優值
  • pin_memory:是否提前申請CUDA記憶體(默認為False,但有說法除非數據集很小,否則在N卡上推薦總是打開)在MNIST這樣的小數據集上好像是關閉比較好,到底多小算小說不清楚,建議自己試一下。

pin_memory就是鎖頁記憶體,創建DataLoader時,設置pin_memory=True,則意味著生成的Tensor數據最開始是屬於記憶體中的鎖頁記憶體,這樣將記憶體的Tensor轉義到GPU的顯示記憶體就會更快一些。主機中的記憶體,有兩種存在方式,一是鎖頁,二是不鎖頁,鎖頁記憶體存放的內容在任何情況下都不會與主機的虛擬記憶體進行交換(註:虛擬記憶體就是硬碟),而不鎖頁記憶體在主機記憶體不足時,數據會存放在虛擬記憶體中。顯示卡中的顯示記憶體全部是鎖頁記憶體,當電腦的記憶體充足的時候,可以設置pin_memory=True。當系統卡住,或者交換記憶體使用過多的時候,設置pin_memory=False。因為pin_memory與電腦硬體性能有關,pytorch開發者不能確保每一個煉丹玩家都有高端設備,因此pin_memory默認為False。

如果機子的記憶體比較大,建議開啟pin_memory=Ture,如果開啟後發現有卡頓現象或者記憶體佔用過高。

總之官方的默認值很有可能不是最好的。在MNIST這樣的小數據集上,pin_memory關閉比較好。而且,num_workers需要調節,除了默認情況外,最快和最慢是有一定差距的,建議在自己的程式碼上只跑數據讀取這一塊,確定這兩個參數的最優值。

分散式 Hello World

啟動輔助工具 Launch utility

torch.distributed.launch 例子

torch.distributed包提供了一個啟動實用程式torch.distributed.launch,此幫助程式可用於為每個節點啟動多個進程以進行分散式訓練,它在每個訓練節點上產生多個分散式訓練進程。

這個工具可以用作CPU或者GPU,如果被用於GPU,每個GPU產生一個進程Process

該工具既可以用來做單節點多GPU訓練,也可用於多節點多GPU訓練。如果是單節點多GPU,將會在單個GPU上運行一個分散式進程,據稱可以非常好地改進單節點訓練性能。如果用於多節點分散式訓練,則通過在每個節點上產生多個進程來獲得更好的多節點分散式訓練性能。如果有Infiniband介面則加速比會更高。

在單節點分散式訓練或多節點分散式訓練的兩種情況下,該工具將為每個節點啟動給定數量的進程(–nproc_per_node)。如果用於GPU訓練,則此數字需要小於或等於當前系統上的GPU數量(nproc_per_node),並且每個進程將在從GPU 0到GPU(nproc_per_node – 1)的單個GPU上運行。

1、Single-Node multi-process distributed training

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE         YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other         arguments of your training script)

2、Multi-Node multi-process distributed training: (e.g. two nodes)

Node 1: (IP: 192.168.1.1, and has a free port: 1234)

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE         --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"         --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3         and all other arguments of your training script)

Node 2:

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE         --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"         --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3         and all other arguments of your training script)

需要注意的地方:

  • 後端最好用「NCCL」,才能獲取最好的分散式性能
  • 訓練程式碼必須從命令行解析–local_rank=LOCAL_PROCESS_RANK
import argparse  parser = argparse.ArgumentParser()  parser.add_argument("--local_rank", type=int)  args = parser.parse_args()    torch.cuda.set_device(arg.local_rank)

torch.distributed初始化方式:

torch.distributed.init_process_group(backend='nccl',init_method='env://')

model

model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[arg.local_rank],output_device=arg.local_rank)

其他地方一般就不用修改了,

我們的訓練程式碼中這樣寫:

import torch.distributed as dist  # 這個參數是torch.distributed.launch傳遞過來的,我們設置位置參數來接受,local_rank代表當前程式進程使用的GPU標號  parser.add_argument("--local_rank", type=int, default=0)    def synchronize():      """      Helper function to synchronize (barrier) among all processes when      using distributed training      """      if not dist.is_available():         return      if not dist.is_initialized():         return      world_size = dist.get_world_size()      if world_size == 1:         return      dist.barrier()      ## WORLD_SIZE 由torch.distributed.launch.py產生 具體數值為 nproc_per_node*node(主機數,這裡為1)  num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1    is_distributed = num_gpus > 1    if is_distributed:     torch.cuda.set_device(args.local_rank)  # 這裡設定每一個進程使用的GPU是一定的     torch.distributed.init_process_group(     backend="nccl", init_method="env://"      )  synchronize()    # 將模型移至到DistributedDataParallel中,此時就可以進行訓練了  if is_distributed:  model = torch.nn.parallel.DistributedDataParallel(          model, device_ids=[args.local_rank], output_device=args.local_rank,          # this should be removed if we update BatchNorm stats          broadcast_buffers=False)    # 注意,在測試的時候需要執行 model = model.module

WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

https://github.com/pytorch/examples/tree/master/imagenet 這裡,常規的操作就不多敘述了,主要講一下和分散式相關的程式碼部分。

parser.add_argument('--world-size', default=2, type=int, help='number of distributed processes')  parser.add_argument('--dist-url', default='tcp://172.16.1.186:2222', type=str, help='url used to set up distributed training')  parser.add_argument('--dist-backend', default='gloo', type=str, help='distributed backend')  parser.add_argument('--dist-rank', default=0, type=int, help='rank of distributed processes')

這幾個是必要的參數設置,其中最後一個是官網沒有的

if args.distributed:     dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size,rank=args.dist_rank)

這個是分散式的初始化,同樣,最後添加一個rank

model.cuda()  model = torch.nn.parallel.DistributedDataParallel(model)

這裡,把我們平時使用的單機多卡,數據並行的API

model = torch.nn.DataParallel(model).cuda()

換掉即可。

if args.distributed:              train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

最後使用這個官方給的劃分方法,把數據集劃分即可。

MNIST 分散式例子

MNIST 分散式 (2台 GPU機器,每台一張GPU):

  • GPU1:   python mnist_dist.py –init-method=file:///home/workspace/share/1 –rank=0 –world-size=2
  • GPU2:   python mnist_dist.py –init-method=file:///home/workspace/share/1 –rank=1 –world-size=2
from __future__ import print_function  import argparse  import time    import torch  import torch.nn as nn  import torch.nn.functional as F  import torch.optim as optim  from torchvision import datasets, transforms    import torch.distributed as dist  import torch.utils.data  import torch.utils.data.distributed      class Net(nn.Module):      def __init__(self):          super(Net, self).__init__()          self.conv1 = nn.Conv2d(1, 20, 5, 1)          self.conv2 = nn.Conv2d(20, 50, 5, 1)          self.fc1 = nn.Linear(4 * 4 * 50, 500)          self.fc2 = nn.Linear(500, 10)        def forward(self, x):          x = F.relu(self.conv1(x))          x = F.max_pool2d(x, 2, 2)          x = F.relu(self.conv2(x))          x = F.max_pool2d(x, 2, 2)          x = x.view(-1, 4 * 4 * 50)          x = F.relu(self.fc1(x))          x = self.fc2(x)          return F.log_softmax(x)      def train(args, model, device, train_loader, optimizer, epoch):      model.train()      for batch_idx, (data, target) in enumerate(train_loader):          data, target = data.to(device), target.to(device)          optimizer.zero_grad()          output = model(data)          loss = F.nll_loss(output, target)          loss.backward()          optimizer.step()          if batch_idx % args.log_interval == 0:              print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(                  epoch, batch_idx * len(data), len(train_loader.dataset),                         100. * batch_idx / len(train_loader), loss.item()))      def test(args, model, device, test_loader):      model.eval()      test_loss = 0      correct = 0      for data, target in test_loader:          data, target = data.to(device), target.to(device)          output = model(data)          test_loss += F.nll_loss(output, target, size_average=False).item()          # sum up batch loss          pred = output.data.max(1, keepdim=True)[1]          # get the index of the max log-probability          correct += pred.eq(target.data.view_as(pred)).cpu().sum()        test_loss /= len(test_loader.dataset)        print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(              test_loss, correct, len(test_loader.dataset),              100. * correct / len(test_loader.dataset)))      def main():      # Training settings      parser = argparse.ArgumentParser(description='PyTorch MNIST Example')      parser.add_argument('--batch-size', type=int, default=64, metavar='N',                              help='input batch size for training (default: 64)')      parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',                              help='input batch size for testing (default: 1000)')      parser.add_argument('--epochs', type=int, default=10, metavar='N',                              help='number of epochs to train (default: 10)')      parser.add_argument('--lr', type=float, default=0.01, metavar='LR',                              help='learning rate (default: 0.01)')      parser.add_argument('--momentum', type=float, default=0.5, metavar='M',                              help='SGD momentum (default: 0.5)')      parser.add_argument('--no-cuda', action='store_true',                              help='disables CUDA training')      parser.add_argument('--seed', type=int, default=1, metavar='S',                              help='random seed (default: 1)')      parser.add_argument('--log-interval', type=int, default=500, metavar='N',                              help='how many batches to wait before logging training status')        parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')      parser.add_argument('--rank', type=int)      parser.add_argument('--world-size', type=int)        args = parser.parse_args()      use_cuda = not args.no_cuda and torch.cuda.is_available()      print(args)        # 初始化      dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,                                  group_name="pytorch_test")        torch.manual_seed(args.seed)      if use_cuda:         torch.cuda.manual_seed(args.seed)        train_dataset = datasets.MNIST('./data', train=True, download=False,                                         transform=transforms.Compose([                                             transforms.ToTensor(),                                             transforms.Normalize((0.1307,), (0.3081,))                                         ]))      # 分發數據      train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)        kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}        train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=args.batch_size,   shuffle=True, **kwargs)        test_loader = torch.utils.data.DataLoader(              datasets.MNIST('data', train=False, transform=transforms.Compose([                  transforms.ToTensor(),                  transforms.Normalize((0.1307,), (0.3081,))              ])),              batch_size=args.test_batch_size, shuffle=True, **kwargs)         device = torch.device("cuda" if use_cuda else "cpu")       print(device)       model = Net().to(device)       if use_cuda:          model = torch.nn.parallel.DistributedDataParallel(                  model) if use_cuda else torch.nn.parallel.DistributedDataParallelCPU(model)         optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)         total_time = 0       for epoch in range(1, args.epochs + 1):           # 設置epoch位置,這應該是個為了同步所做的工作           train_sampler.set_epoch(epoch)             start_cpu_secs = time.time()           train(args, model, device, train_loader, optimizer, epoch)           end_cpu_secs = time.time()             print("Epoch {} of {} took {:.3f}s".format(                  epoch, args.epochs, end_cpu_secs - start_cpu_secs))           total_time += end_cpu_secs - start_cpu_secs             test(args, model, device, test_loader)         print("Total time= {:.3f}s".format(total_time))      if __name__ == '__main__':     main()

單機例子(一台GPU機器一張GPU卡)

python mnist_no_dist.py

from __future__ import print_function  import argparse  import time    import torch  import torch.nn as nn  import torch.nn.functional as F  import torch.optim as optim  from torchvision import datasets, transforms      class Net(nn.Module):      def __init__(self):          super(Net, self).__init__()          self.conv1 = nn.Conv2d(1, 20, 5, 1)          self.conv2 = nn.Conv2d(20, 50, 5, 1)          self.fc1 = nn.Linear(4 * 4 * 50, 500)           self.fc2 = nn.Linear(500, 10)        def forward(self, x):          x = F.relu(self.conv1(x))          x = F.max_pool2d(x, 2, 2)          x = F.relu(self.conv2(x))          x = F.max_pool2d(x, 2, 2)          x = x.view(-1, 4 * 4 * 50)          x = F.relu(self.fc1(x))          x = self.fc2(x)          return F.log_softmax(x, dim=1)      def train(args, model, device, train_loader, optimizer, epoch):      model.train()      for batch_idx, (data, target) in enumerate(train_loader):          data, target = data.to(device), target.to(device)          optimizer.zero_grad()          output = model(data)          loss = F.nll_loss(output, target)          loss.backward()          optimizer.step()          if batch_idx % args.log_interval == 0:             print('Train Epoch: {} [{}/{} ({:.0f}%)]tLoss: {:.6f}'.format(                   epoch, batch_idx * len(data), len(train_loader.dataset),                   100. * batch_idx / len(train_loader), loss.item()))      def test(args, model, device, test_loader):      model.eval()      test_loss = 0      correct = 0      with torch.no_grad():          for data, target in test_loader:              data, target = data.to(device), target.to(device)              output = model(data)              test_loss += F.nll_loss(output, target, reduction='sum').item()              # sum up batch loss              pred = output.argmax(dim=1, keepdim=True)              # get the index of the max log-probability              correct += pred.eq(target.view_as(pred)).sum().item()        test_loss /= len(test_loader.dataset)        print('nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)n'.format(              test_loss, correct, len(test_loader.dataset),              100. * correct / len(test_loader.dataset)))      def main():      # Training settings      parser = argparse.ArgumentParser(description='PyTorch MNIST Example')      parser.add_argument('--batch-size', type=int, default=64, metavar='N',                              help='input batch size for training (default: 64)')      parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',                              help='input batch size for testing (default: 1000)')      parser.add_argument('--epochs', type=int, default=10, metavar='N',                              help='number of epochs to train (default: 10)')      parser.add_argument('--lr', type=float, default=0.01, metavar='LR',                              help='learning rate (default: 0.01)')      parser.add_argument('--momentum', type=float, default=0.5, metavar='M',                              help='SGD momentum (default: 0.5)')      parser.add_argument('--no-cuda', action='store_true', default=False,                              help='disables CUDA training')      parser.add_argument('--seed', type=int, default=1, metavar='S',                              help='random seed (default: 1)')      parser.add_argument('--log-interval', type=int, default=500, metavar='N',                              help='how many batches to wait before logging training status')        # parser.add_argument('--save-model', action='store_true', default=False,      #                     help='For Saving the current Model')      args = parser.parse_args()      print(args)        use_cuda = not args.no_cuda and torch.cuda.is_available()        torch.manual_seed(args.seed)        kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}      train_loader = torch.utils.data.DataLoader(          datasets.MNIST('./data', train=True, download=True,                         transform=transforms.Compose([                             transforms.ToTensor(),                             transforms.Normalize((0.1307,), (0.3081,))                          ])),          batch_size=args.batch_size, shuffle=True, **kwargs)      test_loader = torch.utils.data.DataLoader(              datasets.MNIST('./data', train=False, transform=transforms.Compose([                  transforms.ToTensor(),                  transforms.Normalize((0.1307,), (0.3081,))              ])),           batch_size=args.test_batch_size, shuffle=True, **kwargs)        device = torch.device("cuda" if use_cuda else "cpu")      print(device)      model = Net().to(device)      optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)        total_time = 0      for epoch in range(1, args.epochs + 1):          start_cpu_secs = time.time()          train(args, model, device, train_loader, optimizer, epoch)          end_cpu_secs = time.time()            print("Epoch {} of {} took {:.3f}s".format(                  epoch, args.epochs, end_cpu_secs - start_cpu_secs))          total_time += end_cpu_secs - start_cpu_secs            test(args, model, device, test_loader)        # print("Total time= {:.3f}s".format(total_time))      # if (args.save_model):      #     torch.save(model.state_dict(), "mnist_cnn.pt")      if __name__ == '__main__':     main()