pytorch并行计算DistributedDataParallel

@TOC

nn.parallel.DistributedDataParallel

这部分是nn.DataParallel的后续,想看nn.DataParallel点击这里

为什么要用nn.parallel.DistributedDataParallel呢,首先我们看PyTorch官网对nn.DataParallel的一段话

WARNING

It is recommended to use DistributedDataParallel, instead of this class, to do multi-GPU training, even if there is only a single node. See: Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel.

这段话的意思是说即使在单机多卡中也建议使用DistributedDataParallel,这就不得不说二者的区别

  • nn.DataParallel:只能用于单机多卡的情况,且不能使用apex加速
  • nn.parallel.DistributedDataParallel:既可以用于单机多卡也可以用于多机多卡的情况,可以使用apex加速

apex是什么?

apex是由Nvidia维护的一个支持混合精度分布式训练的PyTorch扩展,不仅能加速收敛,还能节省显存,但由于本文是介绍并行计算,所以这里不作过多的apex介绍

好!下面开始我们的介绍

一、为什么要并行计算?

在我们训练大型数据集或者很大的模型时一块GPU很难放下,例如最初的AlexNet就是在两块GPU上计算的。并行计算一般采取两个策略:一个是模型并行,一个是数据并行。左图中是将模型的不同部分放在不同GPU上进行训练,最后汇总计算。而右图中是将数据放在不同GPU上进行训练,最后汇总计算,不仅能加快我们的计算速度,增大BatchSize,一次epoch所需要的iter降低了,还能使结果更加精确(Batch增大了)

二、基本概念

在使用nn.parallel.DistributedDataParallel时会有一些参数,这里做简要说明

多机多卡

  • world_size:代表有几台机器,可以理解为几个服务器
  • rank:第几台机器,即第几个服务器
  • local_rank:某台机器中的第几块GPU

单机多卡

  • world_size:代表机器一共有几块GPU
  • rank:第几块GPU
  • local_rank:第几块GPU,与rank相同

三、DistributedDataParallel的使用

nn.parallel.DistributedDataParallel的使用一共有两种方法

  • torch.multiprocessing: 不需要在命令行加distributed的启动命令
  • torch.distributed: 需要在终端加上python -m torch.distributed.launch --nproc_per_node=8 --xx --xx train.py

1. multiprocessing

官方的一个示例,下面介绍一些主要的步骤,最后作总结

导入必要的模块,multiprocessing导入的是以下模块

1
2
3
4
5
6
7
8
9
10
11
12
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.multiprocessing import Process

parser = argparse.ArgumentParser()
''' ...distributed params'''
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
parser.add_argument('--syncBN', type=bool, default=True) # 是否启用SyncBatchNorm
# 开启的进程数,不用设置该参数,会根据nproc_per_node自动设置
parser.add_argument('--world-size', default=4, type=int, help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
opt = parser.parse_args()

定义main函数以及对数据集的加载做变化

分布式的数据集加载不同于之前的单卡,这里需要将数据集分为N部分,N为卡的数量。数据集加载方式变为:DatasetsDistributedSamplerBatchSamplerDataLoader(BatchSampler可以省略)

DistributedSampler将数据集N等分,BatchSamper将每一等分后的数据内部进行batch的划分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
'''     数据集     '''
# 加载datasets
train_datasets = MyDataSet(xxx)
val_datasets = MyDataSet(xxx)

# 给每个rank对应的进程分配训练的样本索引,比如一共800样本8张卡
# 那么每张卡对应分配100个样本
train_sampler = torch.utils.data.distributed.DistributedSampler(train_datasets)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_datasets)

# 刚才每张卡分了100个样本,假设BatchSize=16,那么能分成100/16=6...4
# 即多出4个样本,下面的drop_last=True表示舍弃这四个样本,False将剩余4个样本为一组
train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, batch_size, drop_last=True)
# pin_memory将数据加载到GPU
# 验证集没有打乱val_batch_sampler,所以直接用batch_size
train_dataloader = torch.utils.data.DataLoader(train_datasets,
batch_sampler=train_batch_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=train_data_set.collate_fn)
val_dataloader = torch.utils.data.DataLoader(val_datasets,
batch_size=batch_size,
sampler=val_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=val_data_set.collate_fn)

mp.spawn函数,这个函数是调用GPU并行的,看一下函数的参数

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

  • fn:这个就是我们要分布式运行的函数,一般来说是main函数,main(rank, *args),其中rank为必须,单机多卡中可以理解为第几个GPU,args为函数传入的参数,类型tuple,在spawn(…args)的args参数中定义
  • args:传入fn的参数,tuple
  • nprocs:进程数,即几张卡
  • join: 默认为True即可
  • daemon: 默认为False即可
1
2
3
4
mp.spawn(main,
args=(opt, ),
nprocs=opt.world_size,
join=True)

下面给一个完整的训练代码供调试~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 单机多卡并行计算示例
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6, 7"

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
# create default process group
dist.init_process_group("gloo", init_method='tcp://127.0.0.1:6666', rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
print("finished rank: {}".format(rank))

def main():
world_size = torch.cuda.device_count()
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)

if __name__=="__main__":
main()

2. distributed

这个方法我在调试时有两个问题:

  • 有时会出现终止程序时显存未释放的情况,记得nvidia-smi看一下显存是否释放,如果没有释放使用kill -9 PID命令进行释放。如果kill也无法释放显存,直接将terminal关闭重新开一个即可
  • 经常第一次调试后进行第二次调试时会提示我xx端口被占用了,这里最快的解决方法时将当前terminal关闭,然后重新开一个即可

第一步,导入必要的模块,其中distributed中导入的是以下模块

1
import torch.distributed as dist

第二步,需要用argparse写自己的一些参数

  • 注意这里如果使用了argparse方法的话,必须传入local_rank参数,系统会自动给他进行赋值,如果不传入会报错!!!
1
2
3
4
5
6
7
8
9
10
11
parser = argparse.ArgumentParser()

''' ...your params '''

''' ...distributed params'''
parser.add_argument('--syncBN', type=bool, default=True) # 是否启用SyncBatchNorm
# 开启的进程数,不用设置该参数,会根据nproc_per_node自动设置
parser.add_argument('--world-size', default=4, type=int, help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
parser.add_argument('--local_rank', type=int, help='rank of distributed processes')
opt = parser.parse_args()

第三步,初始化distributed

这里的os.environ["RANK"]等变量是没有值的,运行时在命令行输入

1
python -m torch.distributed.launch --nproc_per_node=4 --use_env train.py

会自动给其赋值,比如在单机多卡中--nproc_per_node=8会自动给os.environ["WORLD_SIZE"]赋值为8,经过实验,如果在argparse中忘记定义local_rank变量了,此处就要加上use_env参数

下面代码是具体的初始化过程,其中有一个dist.barrier()函数,这个函数在文末有详细说明,这里简单理解为:假设我们的world_size=8,那么我们有8张GPU初始化,初始化有快有慢,快的GPU初始化会在dist.barrier()处停下来等待,当所有的GPU都到达这个函数时,才会继续运行之后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
if torch.cuda.is_available() is False:
raise EnvironmentError("not find GPU device for training.")
# 初始化各进程环境
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
args.rank = int(os.environ["RANK"])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ['LOCAL_RANK'])
"""
print arguments about GPUs

- world_size
- rank
- local_rank
"""
print("os.environ[\"WORLD_SIZE\"]: ", os.environ["WORLD_SIZE"])
print("os.environ[\"RANK\"]: ", os.environ["RANK"])
print("os.environ[\"LOCAL_RANK\"]: ", os.environ["LOCAL_RANK"])
else:
print('Not using distributed mode')
args.distributed = False
return

print("args.local_rank: ", args.local_rank)
print("args.rank: ", args.rank)

args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl' # 通信后端,nvidia GPU推荐使用NCCL
print('| distributed init (rank {}): {}'.format(args.rank, args.dist_url), flush=True)
# 进行初始化,确定端口等
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
dist.barrier() # 等待所有进程都初始化完毕,即所有GPU都要运行到这一步以后在继续

'''
os.environ["WORLD_SIZE"]: 4
os.environ["RANK"]: 1
os.environ["LOCAL_RANK"]: 1
args.local_rank: 1
args.rank: 1
os.environ["WORLD_SIZE"]: 4
os.environ["RANK"]: 2
os.environ["LOCAL_RANK"]: 2
args.local_rank: 2
args.rank: 2
| distributed init (rank 1): env://
| distributed init (rank 2): env://
os.environ["WORLD_SIZE"]: 4
os.environ["RANK"]: 0
os.environ["LOCAL_RANK"]: 0
args.local_rank: 0
args.rank: 0
| distributed init (rank 0): env://
os.environ["WORLD_SIZE"]: 4
os.environ["RANK"]: 3
os.environ["LOCAL_RANK"]: 3
args.local_rank: 3
args.rank: 3
| distributed init (rank 3): env://
'''

从上面我们print的输出可以得到以下几点信息:

  • 不同GPUs初始化的速度是不同的,比如本次实验初始化顺序为1,2,0,3
  • local_rank是被默认赋值的,在单机多卡中他和rank的值相同
  • 在命令行传入python -m torch.distributed.launch --nproc_per_node=4 --use_env train.py之后,环境变量被赋予了相应的值

第四步,和之前一样,加载数据和模型,然后训练即可

二者区别

二者的区别仅在于dist.init_process_group()函数之前,第一个不需要命令行指定参数,第二个需要,但是第二个代码更为简洁。之后的操作就完全一致了

一些BUG和问题

1. runtimeerror: address already in use

这种情况是端口被占用了,可能是由于你上次调试之后端口依旧占用的缘故,假设88889端口被占用了,用以下命令查询其PID,然后杀掉即可。第二种方法是将当前终端关闭,重新开一个他会自动解除占用

1
2
3
4
lsof -i:88889
或者
netstat -tunlp|grep 88889
kill -9 PID

2. 如何理解dist.barrier()函数?

详细参考StackOverflow

单机多卡环境下使用分布式训练具有更快的速度。PyTorch在分布式训练过程中,对于数据的读取是采用主进程预读取并缓存,然后其它进程从缓存中读取,不同进程之间的数据同步具体通过torch.distributed.barrier()实现

举个例子(来自StackOverflow)

1
2
3
4
5
6
7
if args.local_rank not in [-1, 0]:
torch.distributed.barrier() # Make sure only the first process in distributed training will download model & vocab

... (loads the model and the vocabulary)

if args.local_rank == 0:
torch.distributed.barrier() # Make sure only the first process in distributed training will download model & vocab

假设我们有4张卡[0, 1, 2, 3],其中[0]卡是first process或者base process,有些操作不需要所有的卡同时进行,比如在预处理的时候只用base process即可。在上述代码中,第一个if是说除了主卡之外的卡运行到此处会被barrier,也就是说运行到这里就停止了,而base process不会停止会继续运行,执行预加载模型等操作,当主卡运行到第二个if时,他也会进入到barrier,就是说他已经预加载完了,现在他也需要被barrier了。==此时所有的卡都进入到了barrier==,意味着所有的卡可以继续运行(主卡已经加载完了,这个数据所有的卡都可以使用),可以理解为小弟在等大哥发号施令(小弟都在barrier),当大哥准备好了以后(进入到barrier),就告诉小弟可以出发了(所有的卡从barrier撤出)

a process is blocked by a barrier until all processes have encountered a barrier, upon which the barrier is lifted for all processes