@TOC
nn.parallel.DistributedDataParallel
这部分是nn.DataParallel的后续,想看nn.DataParallel的点击这里
为什么要用nn.parallel.DistributedDataParallel呢,首先我们看PyTorch官网对nn.DataParallel的一段话
WARNING
It is recommended to use DistributedDataParallel , instead of this class, to do multi-GPU training, even if there is only a single node. See: Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel .
这段话的意思是说即使在单机多卡 中也建议使用DistributedDataParallel,这就不得不说二者的区别
nn.DataParallel:只能用于单机多卡 的情况,且不能使用apex 加速
nn.parallel.DistributedDataParallel:既可以用于单机多卡 也可以用于多机多卡 的情况,可以使用apex 加速
apex是什么?
apex是由Nvidia维护的一个支持混合精度分布式训练的PyTorch扩展,不仅能加速收敛,还能节省显存,但由于本文是介绍并行计算,所以这里不作过多的apex介绍
好!下面开始我们的介绍
一、为什么要并行计算?
在我们训练大型数据集或者很大的模型时一块GPU很难放下,例如最初的AlexNet就是在两块GPU上计算的。并行计算一般采取两个策略:一个是模型并行,一个是数据并行。左图中是将模型的不同部分放在不同GPU上进行训练,最后汇总计算。而右图中是将数据放在不同GPU上进行训练,最后汇总计算,不仅能加快我们的计算速度,增大BatchSize,一次epoch所需要的iter降低了,还能使结果更加精确(Batch增大了)
二、基本概念
在使用nn.parallel.DistributedDataParallel时会有一些参数,这里做简要说明
多机多卡
world_size:代表有几台机器,可以理解为几个服务器
rank:第几台机器,即第几个服务器
local_rank:某台机器中的第几块GPU
单机多卡
world_size:代表机器一共有几块GPU
rank:第几块GPU
local_rank:第几块GPU,与rank相同
三、DistributedDataParallel的使用
nn.parallel.DistributedDataParallel的使用一共有两种方法
torch.multiprocessing: 不需要在命令行加distributed的启动命令
torch.distributed: 需要在终端加上python -m torch.distributed.launch --nproc_per_node=8 --xx --xx train.py
1. multiprocessing
官方的一个示例 ,下面介绍一些主要的步骤,最后作总结
导入必要的模块,multiprocessing导入的是以下模块
1 2 3 4 5 6 7 8 9 10 11 12 import torch.distributed as distimport torch.multiprocessing as mpfrom torch.multiprocessing import Processparser = argparse.ArgumentParser() ''' ...distributed params''' parser.add_argument('--device' , default='cuda' , help ='device id (i.e. 0 or 0,1 or cpu)' ) parser.add_argument('--syncBN' , type =bool , default=True ) parser.add_argument('--world-size' , default=4 , type =int , help ='number of distributed processes' ) parser.add_argument('--dist-url' , default='env://' , help ='url used to set up distributed training' ) opt = parser.parse_args()
定义main函数以及对数据集的加载做变化
分布式的数据集加载不同于之前的单卡,这里需要将数据集分为N部分,N为卡的数量。数据集加载方式变为:Datasets→DistributedSampler→BatchSampler→DataLoader(BatchSampler可以省略)
DistributedSampler将数据集N等分,BatchSamper将每一等分后的数据内部进行batch的划分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ''' 数据集 ''' train_datasets = MyDataSet(xxx) val_datasets = MyDataSet(xxx) train_sampler = torch.utils.data.distributed.DistributedSampler(train_datasets) val_sampler = torch.utils.data.distributed.DistributedSampler(val_datasets) train_batch_sampler = torch.utils.data.BatchSampler(train_sampler, batch_size, drop_last=True ) train_dataloader = torch.utils.data.DataLoader(train_datasets, batch_sampler=train_batch_sampler, pin_memory=True , num_workers=nw, collate_fn=train_data_set.collate_fn) val_dataloader = torch.utils.data.DataLoader(val_datasets, batch_size=batch_size, sampler=val_sampler, pin_memory=True , num_workers=nw, collate_fn=val_data_set.collate_fn)
mp.spawn函数,这个函数是调用GPU并行的,看一下函数的参数
torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')
fn:这个就是我们要分布式运行的函数,一般来说是main函数,main(rank, *args),其中rank为必须,单机多卡中可以理解为第几个GPU,args为函数传入的参数,类型tuple,在spawn(…args)的args参数中定义
args:传入fn的参数,tuple
nprocs:进程数,即几张卡
join: 默认为True即可
daemon: 默认为False即可
1 2 3 4 mp.spawn(main, args=(opt, ), nprocs=opt.world_size, join=True )
下面给一个完整的训练代码供调试~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import osos.environ["CUDA_VISIBLE_DEVICES" ] = "6, 7" import torchimport torch.distributed as distimport torch.multiprocessing as mpimport torch.nn as nnimport torch.optim as optimfrom torch.nn.parallel import DistributedDataParallel as DDPdef example (rank, world_size ): dist.init_process_group("gloo" , init_method='tcp://127.0.0.1:6666' , rank=rank, world_size=world_size) model = nn.Linear(10 , 10 ).to(rank) ddp_model = DDP(model, device_ids=[rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001 ) outputs = ddp_model(torch.randn(20 , 10 ).to(rank)) labels = torch.randn(20 , 10 ).to(rank) loss_fn(outputs, labels).backward() optimizer.step() print ("finished rank: {}" .format (rank)) def main (): world_size = torch.cuda.device_count() mp.spawn(example, args=(world_size,), nprocs=world_size, join=True ) if __name__=="__main__" : main()
2. distributed
这个方法我在调试时有两个问题:
有时会出现终止程序时显存未释放的情况,记得nvidia-smi看一下显存是否释放,如果没有释放使用kill -9 PID命令进行释放。如果kill也无法释放显存,直接将terminal关闭重新开一个即可
经常第一次调试后进行第二次调试时会提示我xx端口被占用了,这里最快的解决方法时将当前terminal关闭,然后重新开一个即可
第一步,导入必要的模块,其中distributed中导入的是以下模块
1 import torch.distributed as dist
第二步,需要用argparse写自己的一些参数
注意这里如果使用了argparse方法的话,必须传入local_rank参数,系统会自动给他进行赋值,如果不传入会报错!!!
1 2 3 4 5 6 7 8 9 10 11 parser = argparse.ArgumentParser() ''' ...your params ''' ''' ...distributed params''' parser.add_argument('--syncBN' , type =bool , default=True ) parser.add_argument('--world-size' , default=4 , type =int , help ='number of distributed processes' ) parser.add_argument('--dist-url' , default='env://' , help ='url used to set up distributed training' ) parser.add_argument('--local_rank' , type =int , help ='rank of distributed processes' ) opt = parser.parse_args()
第三步,初始化distributed
这里的os.environ["RANK"]等变量是没有值的,运行时在命令行输入
1 python -m torch.distributed.launch --nproc_per_node=4 --use_env train.py
会自动给其赋值,比如在单机多卡中--nproc_per_node=8会自动给os.environ["WORLD_SIZE"]赋值为8,经过实验,如果在argparse中忘记定义local_rank变量了,此处就要加上use_env参数
下面代码是具体的初始化过程,其中有一个dist.barrier()函数,这个函数在文末有详细说明,这里简单理解为:假设我们的world_size=8,那么我们有8张GPU初始化,初始化有快有慢,快的GPU初始化会在dist.barrier()处停下来等待,当所有的GPU都到达这个函数时,才会继续运行之后的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 if torch.cuda.is_available() is False : raise EnvironmentError("not find GPU device for training." ) if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ: args.rank = int (os.environ["RANK" ]) args.world_size = int (os.environ['WORLD_SIZE' ]) args.gpu = int (os.environ['LOCAL_RANK' ]) """ print arguments about GPUs - world_size - rank - local_rank """ print ("os.environ[\"WORLD_SIZE\"]: " , os.environ["WORLD_SIZE" ]) print ("os.environ[\"RANK\"]: " , os.environ["RANK" ]) print ("os.environ[\"LOCAL_RANK\"]: " , os.environ["LOCAL_RANK" ]) else : print ('Not using distributed mode' ) args.distributed = False return print ("args.local_rank: " , args.local_rank)print ("args.rank: " , args.rank)args.distributed = True torch.cuda.set_device(args.gpu) args.dist_backend = 'nccl' print ('| distributed init (rank {}): {}' .format (args.rank, args.dist_url), flush=True )dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank) dist.barrier() ''' os.environ["WORLD_SIZE"]: 4 os.environ["RANK"]: 1 os.environ["LOCAL_RANK"]: 1 args.local_rank: 1 args.rank: 1 os.environ["WORLD_SIZE"]: 4 os.environ["RANK"]: 2 os.environ["LOCAL_RANK"]: 2 args.local_rank: 2 args.rank: 2 | distributed init (rank 1): env:// | distributed init (rank 2): env:// os.environ["WORLD_SIZE"]: 4 os.environ["RANK"]: 0 os.environ["LOCAL_RANK"]: 0 args.local_rank: 0 args.rank: 0 | distributed init (rank 0): env:// os.environ["WORLD_SIZE"]: 4 os.environ["RANK"]: 3 os.environ["LOCAL_RANK"]: 3 args.local_rank: 3 args.rank: 3 | distributed init (rank 3): env:// '''
从上面我们print的输出可以得到以下几点信息:
不同GPUs初始化的速度是不同的,比如本次实验初始化顺序为1,2,0,3
local_rank是被默认赋值的,在单机多卡中他和rank的值相同
在命令行传入python -m torch.distributed.launch --nproc_per_node=4 --use_env train.py之后,环境变量被赋予了相应的值
第四步,和之前一样,加载数据和模型,然后训练即可
二者区别
二者的区别仅在于dist.init_process_group()函数之前,第一个不需要命令行指定参数,第二个需要,但是第二个代码更为简洁。之后的操作就完全一致了
一些BUG和问题
1. runtimeerror: address already in use
这种情况是端口被占用了,可能是由于你上次调试之后端口依旧占用的缘故,假设88889端口被占用了,用以下命令查询其PID,然后杀掉即可。第二种方法是将当前终端关闭,重新开一个他会自动解除占用
1 2 3 4 lsof -i:88889 或者 netstat -tunlp|grep 88889 kill -9 PID
2. 如何理解dist.barrier()函数?
详细参考StackOverflow
单机多卡环境下使用分布式训练具有更快的速度。PyTorch在分布式训练过程中,对于数据的读取是采用主进程预读取并缓存,然后其它进程从缓存中读取,不同进程之间的数据同步具体通过torch.distributed.barrier()实现
举个例子(来自StackOverflow)
1 2 3 4 5 6 7 if args.local_rank not in [-1 , 0 ]: torch.distributed.barrier() ... (loads the model and the vocabulary) if args.local_rank == 0 : torch.distributed.barrier()
假设我们有4张卡[0, 1, 2, 3],其中[0]卡是first process或者base process,有些操作不需要所有的卡同时进行,比如在预处理的时候只用base process即可。在上述代码中,第一个if是说除了主卡之外的卡运行到此处会被barrier,也就是说运行到这里就停止了,而base process不会停止会继续运行,执行预加载模型等操作,当主卡运行到第二个if时,他也会进入到barrier,就是说他已经预加载完了,现在他也需要被barrier了。==此时所有的卡都进入到了barrier==,意味着所有的卡可以继续运行(主卡已经加载完了,这个数据所有的卡都可以使用),可以理解为小弟在等大哥发号施令(小弟都在barrier),当大哥准备好了以后(进入到barrier),就告诉小弟可以出发了(所有的卡从barrier撤出)
a process is blocked by a barrier until all processes have encountered a barrier, upon which the barrier is lifted for all processes