1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
| class DataParallel(Module): def __init__(self, module, device_ids=None, output_device=None, dim=0): super(DataParallel, self).__init__()
'''1. 查看用GPU还是CPU,如果是CPU直接返回,如果是GPU继续 torch.cuda.is_available() return "cuda"/None ''' device_type = _get_available_device_type() if device_type is None: self.module = module self.device_ids = [] return
'''2. 如果没有指定device_ids,默认使用所有GPU 如果没有指定output_device,默认devices=0 torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1] ''' if device_ids is None: device_ids = _get_all_device_indices()
if output_device is None: output_device = device_ids[0]
'''3. 获取变量 torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1] torch._utils._get_device_index() 默认返回第一个GPU device_ids = [0, 1] output_device = 0 默认第一块GPU,即0 ''' self.dim = dim self.module = module self.device_ids = [_get_device_index(x, True) for x in device_ids] self.output_device = _get_device_index(output_device, True) self.src_device_obj = torch.device(device_type, self.device_ids[0])
'''4. 检查GPU之间的性能 _check_balance会检查各个GPU之间的性能差异,有两种情况会报错: (1)剩余内存最小的GPU与最大的GPU比值小于0.75,不论是GPU本身的内存还是 被其他人占用后剩余的内存只要小于0.75就会报错 (2)线程最小的GPU与最大的GPU比值小于0.75 ''' _check_balance(self.device_ids) '''5. 如果只有一块GPU,将module载入即可 ''' if len(self.device_ids) == 1: self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs): with torch.autograd.profiler.record_function("DataParallel.forward"): if not self.device_ids: return self.module(*inputs, **kwargs)
'''6. 将module.parameters()以及module.buffers()进行迭代,看看是否在GPU上 没有的话继续,有的话返回错误信息cpu ''' for t in chain(self.module.parameters(), self.module.buffers()): if t.device != self.src_device_obj: raise RuntimeError("module must have its parameters and buffers " "on device {} (device_ids[0]) but found one of " "them on device: {}".format(self.src_device_obj, t.device))
'''7. 后面这部分在下面的data_parallel中看注释 ''' inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) if not inputs and not kwargs: inputs = ((),) kwargs = ({},)
if len(self.device_ids) == 1: return self.module(*inputs[0], **kwargs[0]) replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) outputs = self.parallel_apply(replicas, inputs, kwargs) return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids): return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids): return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs): return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device): return gather(outputs, output_device, dim=self.dim)
def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None): r"""Evaluates module(input) in parallel across the GPUs given in device_ids.
This is the functional version of the DataParallel module.
Args: module (Module): the module to evaluate in parallel inputs (Tensor): inputs to the module device_ids (list of int or torch.device): GPU ids on which to replicate module output_device (list of int or torch.device): GPU location of the output Use -1 to indicate the CPU. (default: device_ids[0]) Returns: a Tensor containing the result of module(input) located on output_device """ '''1. 检查是否有输入,即我们的batchsize_input 如果有,不改变输入;如果没有变为空() ''' if not isinstance(inputs, tuple): inputs = (inputs,) if inputs is not None else ()
'''2. 检查当前设备类型:一般为cuda() ''' device_type = _get_available_device_type()
'''3. 检查当前是否规定了输入和输出的GPU 默认:输入设备为所有GPU;输出为GPU:0,即第一块GPU(理解为所有输入GPU中的第一块) ''' if device_ids is None: device_ids = _get_all_device_indices()
if output_device is None: output_device = device_ids[0]
'''4. 确定输入输出GPU,同时确定源GPU,可能是最终计算用来汇总的GPU:0 ''' device_ids = [_get_device_index(x, True) for x in device_ids] output_device = _get_device_index(output_device, True) src_device_obj = torch.device(device_type, device_ids[0])
'''5. 检查输入以及模型参数是否都在同一个设备上,比如GPU或者CPU ''' for t in chain(module.parameters(), module.buffers()): if t.device != src_device_obj: raise RuntimeError("module must have its parameters and buffers " "on device {} (device_ids[0]) but found one of " "them on device: {}".format(src_device_obj, t.device))
'''6. scatter_kwargs将输入分成m份,m=batch_size/GPUs 返回tuple(inputs),有几个GPU,inputs有几份 ''' inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim) if not inputs and not module_kwargs: inputs = ((),) module_kwargs = ({},)
if len(device_ids) == 1: return module(*inputs[0], **module_kwargs[0]) used_device_ids = device_ids[:len(inputs)] '''7. replicate将模型复制m份,m为GPUs数目,并加载到每个GPU上 outputs与inputs对应,为每个GPUs的输出结果 ''' replicas = replicate(module, used_device_ids) outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids) '''8. gather将outputs整合为一个,比如有2个GPU,outputs分别在每个GPU上各有一份 gather就将每个GPU上的outputs整合起来默认放到GPU:0上 ''' return gather(outputs, output_device, dim)
|