nn.DataParallel

@TOC

PyTorch并行计算

nn.DataParallel

1. 官网实例

PyTorch官网的例子:DATA PARALLELISM

PyTorch官网的手册:torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

2. 使用方法

具体使用也比较简单,如下所示,其余的不需要变化

1
2
3
4
5
6
7
8
9
10
# 当不限制GPUs的个数时,默认使用全部的GPU
model = Model()
model = nn.DataParallel(model)
model.cuda()

# 当限制GPUs的个数时
os.environ['CUDA_VISIBLE_DEVICES'] = '0, 1'
model = Model()
model = nn.DataParallel(model)
model.cuda()

3. 运行过程

  • 首先把模型分别加载到各个GPU上,同时将数据平均分配到GPU上
  • 每个GPU分别前向传播,将最后的结果汇总到第一块GPU上
  • 计算Loss反向传播

这里就有一个问题,当所有的batch进行汇总到第一个GPU时,第一块GPU的内存会明显高于其他的,造成不均衡,同时计算Loss的时候可能会出现问题,而且如果分别计算Loss是不是会更快呢,这个问题之后写解决方案,这个问题在pytorch上有人提出

4. 源代码解读

这里我将源代码copy了出来进行调试,加了注释,很方便理解

所在电脑有两块GPU,即device_ids = [0, 1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()

'''1.
查看用GPU还是CPU,如果是CPU直接返回,如果是GPU继续
torch.cuda.is_available()
return "cuda"/None
'''
device_type = _get_available_device_type() # cuda()
if device_type is None:
self.module = module
self.device_ids = []
return


'''2.
如果没有指定device_ids,默认使用所有GPU
如果没有指定output_device,默认devices=0
torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
'''
if device_ids is None:
device_ids = _get_all_device_indices()

if output_device is None:
output_device = device_ids[0]


'''3.
获取变量
torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
torch._utils._get_device_index() 默认返回第一个GPU
device_ids = [0, 1]
output_device = 0 默认第一块GPU,即0
'''
self.dim = dim
self.module = module
self.device_ids = [_get_device_index(x, True) for x in device_ids]
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])


'''4.
检查GPU之间的性能
_check_balance会检查各个GPU之间的性能差异,有两种情况会报错:
(1)剩余内存最小的GPU与最大的GPU比值小于0.75,不论是GPU本身的内存还是
被其他人占用后剩余的内存只要小于0.75就会报错
(2)线程最小的GPU与最大的GPU比值小于0.75
'''
_check_balance(self.device_ids)


'''5.
如果只有一块GPU,将module载入即可
'''
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)

def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DataParallel.forward"):
if not self.device_ids:
return self.module(*inputs, **kwargs)


'''6.
将module.parameters()以及module.buffers()进行迭代,看看是否在GPU上
没有的话继续,有的话返回错误信息cpu
'''
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))


'''7.
后面这部分在下面的data_parallel中看注释
'''
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs = ((),)
kwargs = ({},)

if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)

def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())

def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)

def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])

def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)



def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
r"""Evaluates module(input) in parallel across the GPUs given in device_ids.

This is the functional version of the DataParallel module.

Args:
module (Module): the module to evaluate in parallel
inputs (Tensor): inputs to the module
device_ids (list of int or torch.device): GPU ids on which to replicate module
output_device (list of int or torch.device): GPU location of the output Use -1 to indicate the CPU.
(default: device_ids[0])
Returns:
a Tensor containing the result of module(input) located on
output_device
"""


'''1.
检查是否有输入,即我们的batchsize_input
如果有,不改变输入;如果没有变为空()
'''
if not isinstance(inputs, tuple):
inputs = (inputs,) if inputs is not None else ()


'''2.
检查当前设备类型:一般为cuda()
'''
device_type = _get_available_device_type()


'''3.
检查当前是否规定了输入和输出的GPU
默认:输入设备为所有GPU;输出为GPU:0,即第一块GPU(理解为所有输入GPU中的第一块)
'''
if device_ids is None:
device_ids = _get_all_device_indices()

if output_device is None:
output_device = device_ids[0]


'''4.
确定输入输出GPU,同时确定源GPU,可能是最终计算用来汇总的GPU:0
'''
device_ids = [_get_device_index(x, True) for x in device_ids]
output_device = _get_device_index(output_device, True)
src_device_obj = torch.device(device_type, device_ids[0])


'''5.
检查输入以及模型参数是否都在同一个设备上,比如GPU或者CPU
'''
for t in chain(module.parameters(), module.buffers()):
if t.device != src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(src_device_obj, t.device))


'''6.
scatter_kwargs将输入分成m份,m=batch_size/GPUs
返回tuple(inputs),有几个GPU,inputs有几份
'''
inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
# for module without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not module_kwargs:
inputs = ((),)
module_kwargs = ({},)

if len(device_ids) == 1:
return module(*inputs[0], **module_kwargs[0])
used_device_ids = device_ids[:len(inputs)]


'''7.
replicate将模型复制m份,m为GPUs数目,并加载到每个GPU上
outputs与inputs对应,为每个GPUs的输出结果
'''
replicas = replicate(module, used_device_ids)
outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)


'''8.
gather将outputs整合为一个,比如有2个GPU,outputs分别在每个GPU上各有一份
gather就将每个GPU上的outputs整合起来默认放到GPU:0上
'''
return gather(outputs, output_device, dim)