Pytorch多GPU的計算和Sync BatchNorm

  • 2020 年 2 月 20 日
  • 筆記

nn.DataParallel

pytorch中使用GPU非常方便和簡單:

import torch  import torch.nn as nn    input_size = 5  output_size = 2    class Model(nn.Module):        def __init__(self, input_size, output_size):          super(Model, self).__init__()          self.fc = nn.Linear(input_size, output_size)        def forward(self, input):          output = self.fc(input)          print("[In Model]: device",torch.cuda.current_device() ," input size", input.size()," output size", output.size())          return output      device = torch.device('cuda:0')    model = Model(input_size, output_size)  model.to(device)    x = torch.Tensor(2,5)  x = x.to(device)  y = model(x)

這裡需要注意的是,僅僅調用Tensor.to()只會在GPU上返回一個新的copy,並不會對原來的引用造成變化,因此需要通過賦值rewrite。

上述只是對單個GPU的使用方法,對於多個GPU,pytorch也提供了封裝好的介面——DataParallel,只需要將model 對象放入容器中即可:

model = Model(input_size, output_size)    print("Let's use", torch.cuda.device_count(), "GPUs!n")  model = nn.DataParallel(model)  model.to(device)    print(model)    # output  Let's use 2 GPUs!    DataParallel(    (module): Model(      (fc): Linear(in_features=5, out_features=2, bias=True)    )  )

看到這次輸出的model外面還有一層DataParallel,但這裡並沒有體現出存在多個GPU。

接下來構造一個Dummy DataSet,來跑一下模型:

from torch.utils.data import Dataset, DataLoader    batch_size = 30  data_size = 100    class RandomDataset(Dataset):        def __init__(self, size, length):          self.len = length          self.data = torch.randn(length, size)  # 有length個樣本,每個樣本是size長度的向量        def __getitem__(self, index):          return self.data[index]        def __len__(self):          return self.len    rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),                           batch_size=batch_size, shuffle=True)    for data in rand_loader:      input = data.to(device)      output = model(input)      print("[Outside]: input size", input.size(),            "output_size", output.size())    # output  [In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])  [In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])  [In Model]: device 0  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [In Model]: device 1  input size torch.Size([15, 5])  output size torch.Size([15, 2])  [Outside]: input size torch.Size([30, 5]) output_size torch.Size([30, 2])  [In Model]: device 0  input size torch.Size([5, 5])  output size torch.Size([5, 2])  [In Model]: device 1  input size torch.Size([5, 5])  output size torch.Size([5, 2])  [Outside]: input size torch.Size([10, 5]) output_size torch.Size([10, 2])

可以看到這裡每次從data loader中取數據後,在兩個GPU上執行了forward,並且每個GPU上的batch size都只有原來的一半,所以DataParallel將輸入數據平分到了每個GPU上,從而實現並行計算。

進一步了解 DataParallel

上述文字來自官方文檔,在forward階段,當前GPU上的module會被複制到其他GPU上,輸入數據則會被切分,分別傳到不同的GPU上進行計算;在backward階段,每個GPU上的梯度會被求和並傳回當前GPU上,並更新參數。也就是複製module -> forward -> 計算loss -> backward -> 匯總gradients -> 更新參數 -> 複製module -> ...的不斷重複執行,示意圖如下:

因為數據會被均分到不同的GPU上,所以要求batch_size大於GPU的數量。下面對DataParallel的forward函數做一個簡單的解釋:

class DataParallel(Module):        def __init__(self, module, device_ids=None, output_device=None, dim=0):          super(DataParallel, self).__init__()            if not torch.cuda.is_available():              self.module = module              self.device_ids = []              return            if device_ids is None:              device_ids = list(range(torch.cuda.device_count()))          if output_device is None:              output_device = device_ids[0]            self.dim = dim          self.module = module   # 待並行計算的模型          self.device_ids = list(map(lambda x: _get_device_index(x, True), device_ids))          self.output_device = _get_device_index(output_device, True)          self.src_device_obj = torch.device("cuda:{}".format(self.device_ids[0]))            _check_balance(self.device_ids)            if len(self.device_ids) == 1:              self.module.cuda(device_ids[0])        def forward(self, *inputs, **kwargs):          if not self.device_ids:              return self.module(*inputs, **kwargs)            for t in chain(self.module.parameters(), self.module.buffers()):              if t.device != self.src_device_obj:                  raise RuntimeError("module must have its parameters and buffers "                                     "on device {} (device_ids[0]) but found one of "                                     "them on device: {}".format(self.src_device_obj, t.device))            inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)          if len(self.device_ids) == 1:              return self.module(*inputs[0], **kwargs[0])          replicas = self.replicate(self.module, self.device_ids[:len(inputs)])          outputs = self.parallel_apply(replicas, inputs, kwargs)          return self.gather(outputs, self.output_device)        def replicate(self, module, device_ids):          '''replicate對輸入模型的parameters、buffers、modules都一一進行copy,並返回copy的list,          因為modules最終是以類似鏈表的形式存儲的,所以list中只包含第一個module'''          return replicate(module, device_ids)        def scatter(self, inputs, kwargs, device_ids):          '''scatter_kwargs內部調用名為scatter的函數,作用是將Tensor對象均分,以及複製其他類型對象的引用'''          return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)        def parallel_apply(self, replicas, inputs, kwargs):          '''內部調用python的Thread將分割好的input分配到不同的GPU上計算,並返回result dict'''          return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])        def gather(self, outputs, output_device):          '''從不同GPU上取回結果'''          return gather(outputs, output_device, dim=self.dim)

parallel_apply()之前都不能確定input數據會被分配到哪個GPU上,因此在forward之前的Tensor.to()或者Tensor.cuda()都會導致錯誤。

GatherScatter的進一步觀察會發現(如下),兩者在backward時,只會傳遞梯度資訊。因此所有在forward期間的update都會被忽略(比如counter什麼的),除非是在device[0]上。

class Gather(Function):        @staticmethod      def forward(ctx, target_device, dim, *inputs):          ...        @staticmethod      def backward(ctx, grad_output):          scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)          if ctx.unsqueezed_scalar:              scattered_grads = tuple(g[0] for g in scattered_grads)          return (None, None) + scattered_grads      class Scatter(Function):        @staticmethod      def forward(ctx, target_gpus, chunk_sizes, dim, input):          ...        @staticmethod      def backward(ctx, *grad_output):          return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)

GPU之間除了在scatter和gather時有交集,除此之外不會交換任何資訊,這會阻礙一些功能的實現,比如Batch Normalization,如果只是模型加入torch.nn.BatchNorm2d(),那麼在並行計算時,它只會統計當前GPU上這一部分數據的資訊而不是所有的輸入數據,有可能會使統計得到的均值和標準差出現偏差。