PyTorch提速四倍!提高DALI利用率,创建基于CPU的Pipeline
- 2020 年 2 月 21 日
- 笔记

大数据文摘出品
来源:medium
编译:赵吉克
在过去的几年里,深度学习硬件方面取得了巨大的进步,Nvidia的最新产品Tesla V100和Geforce RTX系列包含专用的张量核,用于加速神经网络中常用的操作。
特别值得一提的是,V100有足够的能力以每秒数千张图的速度训练神经网络,这使得基于ImageNet数据集小模型在单GPU上训练只需几小时,与2012年在ImageNet上训练AlexNet模型所花费的5天时间相比简直是天壤之别!
然而,强大的GPU使数据预处理管道不堪重负。为了解决这个问题,Tensorflow发布了一个新的数据加载器:tf.data.Dataset,用C++编写,并使用基于图的方法将多个预处理操作链接在一起。
另一方面,PyTorch使用在PIL库上用Python编写的数据加载器,既方便优灵活,但在速度上有欠缺(尽管PIL-SIMD库确实稍微改善了这种情况)。
进入NVIDIA数据加载器(DALI):旨在消除数据预处理瓶颈,允许训练和推理全速运行。DALI主要用于在GPU上的预处理,但是大多数操作也在CPU上有快速实现。本文主要关注PyTorch,但是DALI也支持Tensorflow、MXNet和TensorRT,尤其是TensorRT有高度支持。它允许训练和推理步骤使用完全相同的预处理代码。需注意,不同的框架(如Tensorflow和PyTorch)通常在数据加载器之间有很小的差异,这可能会影响准确性。
本文是Medium上一位博主展示了一些技术来提高DALI的使用率并创建了一个完全基于CPU的管道。这些技术用于保持长期的内存稳定,并且与DALI包提供的CPU和GPU管道相比,可以增加50%的批处理大小。
DALI长期内存使用
第一个问题是,RAM的使用随着训练时间的增加而增加,这会导致OOM错误(即使是在拥有78GB RAM的VM上),并且尚未修正。

唯一解决方案是重新import DALI并每次重构训练和验证通道:
del self.train_loader, self.val_loader, self.train_pipe, self.val_pipe torch.cuda.synchronize() torch.cuda.empty_cache() gc.collect() importlib.reload(dali) from dali import HybridTrainPipe, HybridValPipe, DaliIteratorCPU, DaliIteratorGPU <rebuild DALI pipeline>
请注意,使用这种方法,DALI仍然需要大量RAM才能获得最好的结果。考虑到如今RAM的价格,这并不是什么大问题。从下表可以看出,DALI的最大批大小可能比TorchVision低50%:

接下来的部分涉及降低GPU占用率的方法。
构建一个完全基于CPU的Pipeline
让我们首先看看示例CPU管道。当不考虑峰值吞吐量时,基于CPU的管道非常有用。CPU训练管道只在CPU上执行解码和调整大小的操作,而CropMirrorNormalize操作则在GPU上运行。由于仅仅是传输输出到GPU与DALI就使用了大量的GPU内存,为了避免这种情况,我们修改了示例CPU管道,使其完全运行在CPU上:
class HybridTrainPipe(Pipeline): def __init__(self, batch_size, num_threads, device_id, data_dir, crop, mean, std, local_rank=0, world_size=1, dali_cpu=False, shuffle=True, fp16=False, min_crop_size=0.08): # As we're recreating the Pipeline at every epoch, the seed must be -1 (random seed) super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed=-1) # Enabling read_ahead slowed down processing ~40% self.input = ops.FileReader(file_root=data_dir, shard_id=local_rank, num_shards=world_size, random_shuffle=shuffle) # Let user decide which pipeline works best with the chosen model if dali_cpu: decode_device = "cpu" self.dali_device = "cpu" self.flip = ops.Flip(device=self.dali_device) else: decode_device = "mixed" self.dali_device = "gpu" output_dtype = types.FLOAT if self.dali_device == "gpu" and fp16: output_dtype = types.FLOAT16 self.cmn = ops.CropMirrorNormalize(device="gpu", output_dtype=output_dtype, output_layout=types.NCHW, crop=(crop, crop), image_type=types.RGB, mean=mean, std=std,) # To be able to handle all images from full-sized ImageNet, this padding sets the size of the internal nvJPEG buffers without additional reallocations device_memory_padding = 211025920 if decode_device == 'mixed' else 0 host_memory_padding = 140544512 if decode_device == 'mixed' else 0 self.decode = ops.ImageDecoderRandomCrop(device=decode_device, output_type=types.RGB, device_memory_padding=device_memory_padding, host_memory_padding=host_memory_padding, random_aspect_ratio=[0.8, 1.25], random_area=[min_crop_size, 1.0], num_attempts=100) # Resize as desired. To match torchvision data loader, use triangular interpolation. self.res = ops.Resize(device=self.dali_device, resize_x=crop, resize_y=crop, interp_type=types.INTERP_TRIANGULAR) self.coin = ops.CoinFlip(probability=0.5) print('DALI "{0}" variant'.format(self.dali_device)) def define_graph(self): rng = self.coin() self.jpegs, self.labels = self.input(name="Reader") # Combined decode & random crop images = self.decode(self.jpegs) # Resize as desired images = self.res(images) if self.dali_device == "gpu": output = self.cmn(images, mirror=rng) else: # CPU backend uses torch to apply mean & std output = self.flip(images, horizontal=rng) self.labels = self.labels.gpu() return [output, self.labels]
DALI管道现在在CPU上输出一个8位张量。我们需要使用PyTorch来完成CPU-> GPU的传输、浮点数的转换和归一化。这最后两个操作是在GPU上完成的,快速并且减少了CPU -> GPU内存带宽需求。在转移到GPU之前,尝试过固定这个张量,但是没有得到任何性能提升,把它和一个预存器放在一起:
def _preproc_worker(dali_iterator, cuda_stream, fp16, mean, std, output_queue, proc_next_input, done_event, pin_memory): """ Worker function to parse DALI output & apply final preprocessing steps """ while not done_event.is_set(): # Wait until main thread signals to proc_next_input -- normally class="ql-long-20347411" style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> proc_next_input.wait() proc_next_input.clear() if done_event.is_set(): print('Shutting down preproc thread') break try: data = next(dali_iterator) # Decode the data output input_orig = data[0]['data'] target = data[0]['label'].squeeze().long() # DALI should already output target style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> # Copy to GPU and apply final processing in separate CUDA stream with torch.cuda.stream(cuda_stream): input = input_orig if pin_memory: input = input.pin_memory() del input_orig # Save memory input = input.cuda(non_blocking=True) input = input.permute(0, 3, 1, 2) # Input tensor is kept as 8-bit integer for transfer to GPU, to save bandwidth if fp16: input = input.half() else: input = input.float() input = input.sub_(mean).div_(std) # Put the result class="ql-long-20347411" style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> output_queue.put((input, target)) except StopIteration: print('Resetting DALI loader') dali_iterator.reset() output_queue.put(None) class DaliIteratorCPU(DaliIterator): """ Wrapper class to decode the DALI iterator output & provide iterator that functions in the same way as TorchVision. Note that permutation to channels first, converting from 8-bit integer to float & normalization are all performed style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> pipelines (Pipeline): DALI pipelines size (int): Number of examples in set fp16 (bool): Use fp16 as output format, f32 otherwise mean (tuple): Image mean value for each channel std (tuple): Image standard deviation value for each channel pin_memory (bool): Transfer input tensor to pinned memory, before moving to GPU """ def __init__(self, fp16=False, mean=(0., 0., 0.), std=(1., 1., 1.), pin_memory=True, **kwargs): super().__init__(**kwargs) print('Using DALI CPU iterator') self.stream = torch.cuda.Stream() self.fp16 = fp16 self.mean = torch.tensor(mean).cuda().view(1, 3, 1, 1) self.std = torch.tensor(std).cuda().view(1, 3, 1, 1) self.pin_memory = pin_memory if self.fp16: self.mean = self.mean.half() self.std = self.std.half() self.proc_next_input = Event() self.done_event = Event() self.output_queue = queue.Queue(maxsize=5) self.preproc_thread = threading.Thread( target=_preproc_worker, kwargs={'dali_iterator': self._dali_iterator, 'cuda_stream': self.stream, 'fp16': self.fp16, 'mean': self.mean, 'std': self.std, 'proc_next_input': self.proc_next_input, 'done_event': self.done_event, 'output_queue': self.output_queue, 'pin_memory': self.pin_memory}) self.preproc_thread.daemon = True self.preproc_thread.start() self.proc_next_input.set() def __next__(self): torch.cuda.current_stream().wait_stream(self.stream) data = self.output_queue.get() self.proc_next_input.set() if data is None: raise StopIteration return data def __del__(self): self.done_event.set() self.proc_next_input.set() torch.cuda.current_stream().wait_stream(self.stream) self.preproc_thread.join()
基于GPU的Pipeline
测试中,在类似最大批处理大小下,上述CPU管道的速度大约是TorchVision数据加载器的两倍。CPU管道可以很好地与像ResNet50这样的大型模型一起工作;然而,当使用像AlexNet或ResNet18这样的小模型时,CPU更好。GPU管道的问题是最大批处理大小减少了近50%,限制了吞吐量。
一种显著减少GPU内存使用的方法是将验证管道与GPU隔离直到最后再调用。这很容易做到,因为我们已经重新导入DALI,并在每个epoch中重新创建数据加载器。
更多小提示
在验证时,将数据集均分的批处理大小效果最好,这避免了在验证数据集结束时还需要进行不完整的批处理。
与Tensorflow和PyTorch数据加载器类似,TorchVision和DALI管道不会产生相同的输出—您将看到验证精度略有不同。我发现这是由于不同的JPEG图像解码器。另一方面,DALI支持TensorRT,允许使用完全相同的预处理来进行训练和推理。
对于峰值吞吐量,尝试将数据加载器的数量设置为number_of_virtual_CPU核心,2个虚拟核对应1个物理核。
如果你想要绝对最好的性能并且不需要有类似TorchVision的输出,尝试关闭DALI三角形插值。
不要忘记磁盘IO。确保您有足够的内存来缓存数据集和/或一个非常快的SSD。DALI读取高达400Mb/s !
合并
为了方便地集成这些修改,我创建了一个data loader类,其中包含这里描述的所有修改,包括DALI和TorchVision后端。使用很简单。实例化数据加载程序:
dataset = Dataset(data_dir, batch_size, val_batch_size workers, use_dali, dali_cpu, fp16)
然后得到训练和验证数据装载器:
train_loader = dataset.get_train_loader() val_loader = dataset.get_val_loader()
在每个训练周期结束时重置数据加载器:
dataset.reset()
或者,验证管道可以在模型验证之前在GPU上重新创建:
dataset.prep_for_val()
基准
以下是使用ResNet18的最大批量大小:

因此,通过应用这些修改,DALI可以在CPU和GPU模式下使用的最大批处理大小增加了约50%!
这里是一些使用Shufflenet V2 0.5和批量大小512的吞吐量图:

这里是一些使用DALI GPU管道训练各种网络,包括在TorchVision:

所有测试都在谷歌Cloud V100实例上运行,该实例有12个vcpu(6个物理核心),78GB RAM,并使用Apex FP16培训。要重现这些结果,请使用以下参数:
— fp16 — batch-size 512 — workers 10 — arch “shufflenet_v2_x0_5 or resnet18” — prof — use-dali
所以,DALI使得单核特斯拉V100可以达到接近4000张/秒的图像处理速度!这达到了Nvidia DGX-1的一半多一点(它有8个V100 gpu),尽管我们使用了小模型。对我来说,能够在几个小时内在一个GPU上运行ImageNet是生产力进步。
本文中提供的代码如下:
https://github.com/yaysummeriscoming/DALI_pytorch_demo
相关报道:
https://towardsdatascience.com/nvidia-dali-speeding-up-pytorch-876c80182440