0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Distributed Data Parallel中的分布式训练

深度学习自然语言处理 来源:深度学习自然语言处理 2023-01-06 09:20 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

实现原理

与DataParallel不同的是,Distributed Data Parallel会开设多个进程而非线程,进程数 = GPU数,每个进程都可以独立进行训练,也就是说代码的所有部分都会被每个进程同步调用,如果你某个地方print张量,你会发现device的差异

sampler会将数据按照进程数切分,

「确保不同进程的数据不同」

每个进程独立进行前向训练

每个进程利用Ring All-Reduce进行通信,将梯度信息进行聚合

每个进程同步更新模型参数,进行新一轮训练

按进程切分

如何确保数据不同呢?不妨看看DistributedSampler的源码

#判断数据集长度是否可以整除GPU数
#如果不能,选择舍弃还是补全,进而决定总数
#Ifthedatasetlengthisevenlydivisibleby#ofreplicas
#thenthereisnoneedtodropanydata,sincethedataset
#willbesplitequally.
if(self.drop_lastand
len(self.dataset)%self.num_replicas!=0):
#num_replicas=num_gpus
self.num_samples=math.ceil((len(self.dataset)-
self.num_replicas)/self.num_replicas)
else:
self.num_samples=math.ceil(len(self.dataset)/
self.num_replicas)
self.total_size=self.num_samples*self.num_replicas

#根据是否shuffle来创建indices
ifself.shuffle:
#deterministicallyshufflebasedonepochandseed
g=torch.Generator()
g.manual_seed(self.seed+self.epoch)
indices=torch.randperm(len(self.dataset),generator=g).tolist()
else:
indices=list(range(len(self.dataset)))
ifnotself.drop_last:
#addextrasamplestomakeitevenlydivisible
padding_size=self.total_size-len(indices)
ifpadding_size<= len(indices):
        # 不够就按indices顺序加
        # e.g., indices为[0, 1, 2, 3 ...],而padding_size为4
        # 加好之后的indices[..., 0, 1, 2, 3]
        indices += indices[:padding_size]
    else:
        indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
    # remove tail of data to make it evenly divisible.
    indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
# rank代表进程id
indices = indices[self.rankself.num_replicas]
return iter(indices)

Ring All-Reduce

那么什么是「Ring All-Reduce」呢?又为啥可以降低通信成本呢?

首先将每块GPU上的梯度拆分成四个部分,比如,如下图(此部分原理致谢下王老师,讲的很清晰[1]:

e48eda16-8d5b-11ed-bfe3-dac502259ad0.png

所有GPU的传播都是「同步」进行的,传播的规律有两条:

只与自己下一个位置的GPU进行通信,比如0 > 1,3 > 0

四个部分,哪块GPU上占的多,就由该块GPU往它下一个传,初始从主节点传播,即GPU0,你可以想象跟接力一样,a传b,b负责传给c

第一次传播如下:

e49c7b58-8d5b-11ed-bfe3-dac502259ad0.png

那么结果就是:

e4aba146-8d5b-11ed-bfe3-dac502259ad0.png

那么,按照谁多谁往下传的原则,此时应该是GPU1往GPU2传a0和a1,GPU2往GPU3传b1和b2,以此类推

e4bb30f2-8d5b-11ed-bfe3-dac502259ad0.png

接下来再传播就会有GPU3 a的部分全有,GPU0上b的部分全有等,就再往下传

e4c82640-8d5b-11ed-bfe3-dac502259ad0.png

再来几遍便可以使得每块GPU上都获得了来自其他GPU的梯度啦

e4d8c05e-8d5b-11ed-bfe3-dac502259ad0.png

代码使用

基础概念

第一个是后端的选择,即数据传输协议,从下表可以看出[2],当使用CPU时可以选择gloo而GPU则可以是nccl

「Backend」 「gloo」 「mpi」 「nccl」
Device CPU GPU CPU GPU CPU GPU
send ?
recv ?
broadcast ?
all_reduce ?
reduce ?
all_gather ?
gather ?
scatter ?
reduce_scatter
all_to_all ?
barrier ?

接下来是一些参数的解释[3]:

Arg Meaning
group 一次发起的所有进程构成一个group,除非想更精细通信,创建new_group
world_size 一个group中进程数目,即为GPU的数量
rank 进程id,主节点rank=0,其他的在0和world_size-1之间
local_rank 进程在本地节点/机器的id

举个例子,假如你有两台服务器(又被称为node),每台服务器有4张GPU,那么,world_size即为8,rank=[0, 1, 2, 3, 4, 5, 6, 7], 每个服务器上的进程的local_rank为[0, 1, 2, 3]

然后是「初始化方法」的选择,有TCP和共享文件两种,一般指定rank=0为master节点

TCP显而易见是通过网络进行传输,需要指定主节点的ip(可以为主节点实际IP,或者是localhost)和空闲的端口

importtorch.distributedasdist

dist.init_process_group(backend,init_method='tcp://ip:port',
rank=rank,world_size=world_size)

共享文件的话需要手动删除上次启动时残留的文件,加上官方有一堆警告,还是建议使用TCP

dist.init_process_group(backend,init_method='file://Path',
rank=rank,world_size=world_size)

launch方法

「初始化」

这里先讲用launch的方法,关于torch.multiprocessing留到后面讲

在启动后,rank和world_size都会自动被DDP写入环境中,可以提前准备好参数类,如argparse这种

args.rank=int(os.environ['RANK'])
args.world_size=int(os.environ['WORLD_SIZE'])
args.local_rank=int(os.environ['LOCAL_RANK'])

首先,在使用distributed包的任何其他函数之前,按照tcp方法进行初始化,需要注意的是需要手动指定一共可用的设备CUDA_VISIBLE_DEVICES

defdist_setup_launch(args):
#tellDDPavailabledevices[NECESSARY]
os.environ['CUDA_VISIBLE_DEVICES']=args.devices
args.rank=int(os.environ['RANK'])
args.world_size=int(os.environ['WORLD_SIZE'])
args.local_rank=int(os.environ['LOCAL_RANK'])

dist.init_process_group(args.backend,
args.init_method,
rank=args.rank,
world_size=args.world_size)
#thisisoptional,otherwiseyoumayneedtospecifythe
#devicewhenyoumovesomethinge.g.,model.cuda(1)
#ormodel.to(args.rank)
#Settingdevicemakesthingseasy:model.cuda()
torch.cuda.set_device(args.rank)
print('TheCurrentRankis%d|TheTotalRanksare%d'
%(args.rank,args.world_size))

「DistributedSampler」

接下来创建DistributedSampler,是否pin_memory,根据你本机的内存决定。pin_memory的意思是提前在内存中申请一部分专门存放Tensor。假如说你内存比较小,就会跟虚拟内存,即硬盘进行交换,这样转义到GPU上会比内存直接到GPU耗时。

因而,如果你的内存比较大,可以设置为True;然而,如果开了导致卡顿的情况,建议关闭

fromtorch.utils.dataimportDataLoader,DistributedSampler

train_sampler=DistributedSampler(train_dataset,seed=args.seed)
train_dataloader=DataLoader(train_dataset,
pin_memory=True,
shuffle=(train_samplerisNone),
batch_size=args.per_gpu_train_bs,
num_workers=args.num_workers,
sampler=train_sampler)

eval_sampler=DistributedSampler(eval_dataset,seed=args.seed)
eval_dataloader=DataLoader(eval_dataset,
pin_memory=True,
batch_size=args.per_gpu_eval_bs,
num_workers=args.num_workers,
sampler=eval_sampler)

「加载模型」

然后加载模型,跟DataParallel不同的是需要提前放置到cuda上,还记得上面关于设置cuda_device的语句嘛,因为设置好之后每个进程只能看见一个GPU,所以直接model.cuda(),不需要指定device

同时,我们必须给DDP提示目前是哪个rank

fromtorch.nn.parallelimportDistributedDataParallelasDDP
model=model.cuda()
#tellDDPwhichrank
model=DDP(model,find_unused_parameters=True,device_ids=[rank])

注意,当模型带有Batch Norm时:

ifargs.syncBN:
nn.SyncBatchNorm.convert_sync_batchnorm(model).cuda()

「训练相关」

每个epoch开始训练的时候,记得用sampler的set_epoch,这样使得每个epoch打乱顺序是不一致的

关于梯度回传和参数更新,跟正常情况无异

forepochinrange(epochs):
#recordepochs
train_dataloader.sampler.set_epoch(epoch)
outputs=model(inputs)
loss=loss_fct(outputs,labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()

这里有一点需要小心,这个loss是各个进程的loss之和,如果想要存储每个step平均损失,可以进行all_reduce操作,进行平均,不妨看官方的小例子来理解下:

>>>#Alltensorsbelowareoftorch.int64type.
>>>#Wehave2processgroups,2ranks.
>>>tensor=torch.arange(2,dtype=torch.int64)+1+2*rank
>>>tensor
tensor([1,2])#Rank0
tensor([3,4])#Rank1
>>>dist.all_reduce(tensor,op=ReduceOp.SUM)
>>>tensor
tensor([4,6])#Rank0
tensor([4,6])#Rank1
@torch.no_grad()
defreduce_value(value,average=True):
world_size=get_world_size()
ifworld_size< 2:  # 单GPU的情况
        return value
    dist.all_reduce(value)
    if average:
     value /= world_size
    return value

看到这,肯定有小伙伴要问,那这样我们是不是得先求平均损失再回传梯度啊,不用,因为,当我们回传loss后,DDP会自动对所有梯度进行平均[4],也就是说回传后我们更新的梯度和DP或者单卡同样batch训练都是一致的

loss=loss_fct(...)
loss.backward()
#注意在backward后面
loss=reduce_value(loss,world_size)
mean_loss=(step*mean_loss+loss.item())/(step+1)

还有个注意点就是学习率的变化,这个是和batch size息息相关的,如果batch扩充了几倍,也就是说step比之前少了很多,还采用同一个学习率,肯定会出问题的,这里,我们进行线性增大[5]

N=world_size
lr=args.lr*N

肯定有人说,诶,你线性增大肯定不能保证梯度的variance一致了,正确的应该是正比于,关于这个的讨论不妨参考[6]

「evaluate相关」

接下来,细心的同学肯定好奇了,如果验证集也切分了,metric怎么计算呢?此时就需要咱们把每个进程得到的预测情况集合起来,t就是一个我们需要gather的张量,最后将每个进程中的t按照第一维度拼接,先看官方小例子来理解all_gather

>>>#Alltensorsbelowareoftorch.int64dtype.
>>>#Wehave2processgroups,2ranks.
>>>tensor_list=[torch.zeros(2,dtype=torch.int64)for_inrange(2)]
>>>tensor_list
[tensor([0,0]),tensor([0,0])]#Rank0and1
>>>tensor=torch.arange(2,dtype=torch.int64)+1+2*rank
>>>tensor
tensor([1,2])#Rank0
tensor([3,4])#Rank1
>>>dist.all_gather(tensor_list,tensor)
>>>tensor_list
[tensor([1,2]),tensor([3,4])]#Rank0
[tensor([1,2]),tensor([3,4])]#Rank1
defsync_across_gpus(t,world_size):
gather_t_tensor=[torch.zeros_like(t)for_in
range(world_size)]
dist.all_gather(gather_t_tensor,t)
returntorch.cat(gather_t_tensor,dim=0)

可以简单参考我前面提供的源码的evaluate部分,我们首先将预测和标签比对,把结果为bool的张量存储下来,最终gather求和取平均。

这里还有个有趣的地方,tensor默认的类型可能是int,bool型的res拼接后自动转为0和1了,另外bool型的张量是不支持gather的

defeval(...)
results=torch.tensor([]).cuda()
forstep,(inputs,labels)inenumerate(dataloader):
outputs=model(inputs)
res=(outputs.argmax(-1)==labels)
results=torch.cat([results,res],dim=0)

results=sync_across_gpus(results,world_size)
mean_acc=(results.sum()/len(results)).item()
returnmean_acc

「模型保存与加载」

模型保存,参考部分官方教程[7],我们只需要在主进程保存模型即可,注意,这里是被DDP包裹后的,DDP并没有state_dict,这里barrier的目的是为了让其他进程等待主进程保存模型,以防不同步

defsave_checkpoint(rank,model,path):
ifis_main_process(rank):
#Allprocessesshouldseesameparametersastheyall
#startfromsamerandomparametersandgradientsare
#synchronizedinbackwardpasses.
#Therefore,savingitinoneprocessissufficient.
torch.save(model.module.state_dict(),path)

#Useabarrier()tokeepprocess1waitingforprocess0
dist.barrier()

加载的时候别忘了map_location,我们一开始会保存模型至主进程,这样就会导致cuda:0显存被占据,我们需要将模型remap到其他设备

defload_checkpoint(rank,model,path):
#remapthemodelfromcuda:0tootherdevices
map_location={'cuda:%d'%0:'cuda:%d'%rank}
model.module.load_state_dict(
torch.load(path,map_location=map_location)
)

进程销毁

运行结束后记得销毁进程:

defcleanup():
dist.destroy_process_group()

cleanup()

如何启动

在终端输入下列命令【单机多卡】

python-mtorch.distributed.launch--nproc_per_node=NUM_GPUS
main.py(--arg1--arg2--arg3andallother
argumentsofyourtrainingscript)

目前torch 1.10以后更推荐用run

torch.distributed.launch->torch.distributed.run/torchrun

多机多卡是这样的:

#第一个节点启动
python-mtorch.distributed.launch
--nproc_per_node=NUM_GPUS
--nnodes=2
--node_rank=0
--master_addr="192.168.1.1"
--master_port=1234main.py

#第二个节点启动
python-mtorch.distributed.launch
--nproc_per_node=NUM_GPUS
--nnodes=2
--node_rank=1
--master_addr="192.168.1.1"
--master_port=1234main.py

mp方法

第二个方法就是利用torch的多线程包

importtorch.multiprocessingasmp
#rankmp会自动填入
defmain(rank,arg1,...):
pass

if__name__=='__main__':
mp.spawn(main,nprocs=TOTAL_GPUS,args=(arg1,...))

这种运行的时候就跟正常的python文件一致:

pythonmain.py

优缺点

「优点」:相比于DP而言,不需要反复创建和销毁线程;Ring-AllReduce算法高通信效率;模型同步方便

「缺点」:操作起来可能有些复杂,一般可满足需求的可先试试看DataParallel。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • gpu
    gpu
    +关注

    关注

    28

    文章

    5099

    浏览量

    134420
  • PIN管
    +关注

    关注

    0

    文章

    36

    浏览量

    6766
  • TCP通信
    +关注

    关注

    0

    文章

    146

    浏览量

    4769

原文标题:深入理解Pytorch中的分布式训练

文章出处:【微信号:zenRRan,微信公众号:深度学习自然语言处理】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    如何解决分布式光伏计量难题?

    分布式光伏成增长主力 据《2025-2030年分布式光伏行业市场前景预测及未来发展趋势研究报告》显示,2024年分布式光伏新增装机1
    的头像 发表于 11-07 14:55 147次阅读
    如何解决<b class='flag-5'>分布式</b>光伏计量难题?

    【节能学院】Acrel-1000DP分布式光伏监控系统在奉贤平高食品 4.4MW 分布式光伏应用

    摘要:在“双碳”和新型电力系统建设背景下,分布式光伏接入比例不断提高,对配电网电压、调度运行及调峰等环节造成强烈冲击。本文设计包含平台层、设备层二层架构体系的分布式光伏管控平台,以及小容量工商业
    的头像 发表于 08-23 08:04 3300次阅读
    【节能学院】Acrel-1000DP<b class='flag-5'>分布式</b>光伏监控系统在奉贤平高食品 4.4MW <b class='flag-5'>分布式</b>光伏<b class='flag-5'>中</b>应用

    分布式光伏发电监测系统技术方案

    分布式光伏发电监测系统技术方案 柏峰【BF-GFQX】一、系统目标 :分布式光伏发电监测系统旨在通过智能化的监测手段,实现对分布式光伏电站的全方位、高精度、实时化管理。该系统能
    的头像 发表于 08-22 10:51 2878次阅读
    <b class='flag-5'>分布式</b>光伏发电监测系统技术方案

    一键部署无损网络:EasyRoCE助力分布式存储效能革命

    分布式存储的性能瓶颈往往在于网络。如何构建一个高带宽、超低时延、零丢包的无损网络,是释放分布式存储全部潜力、赋能企业关键业务(如实时数据库、AI训练、高性能计算)的关键挑战。
    的头像 发表于 08-04 11:34 1334次阅读
    一键部署无损网络:EasyRoCE助力<b class='flag-5'>分布式</b>存储效能革命

    曙光存储领跑中国分布式存储市场

    近日,赛迪顾问发布《中国分布式存储市场研究报告(2025)》,指出2024 年中国分布式存储市场首次超过集中式存储,规模达 198.2 亿元,增速 43.7%。
    的头像 发表于 05-19 16:50 1005次阅读

    多通道电源管理芯片在分布式能源系统的优化策略

    摘要: 随着分布式能源系统的广泛应用,对电源管理芯片的性能要求日益提升。本文深入探讨了多通道电源管理芯片在分布式能源系统的优化策略,以国科安芯的ASP4644芯片为例,从电气特性、工作模式、热管
    的头像 发表于 05-16 15:22 613次阅读

    使用VirtualLab Fusion中分布式计算的AR波导测试图像模拟

    总计算时间超过31小时。通过使用一个由8个多核PC组成的网络,提供35个客户端分布式计算,将模拟时间减少到1小时5分钟。基本模拟任务基本任务集合:FOV使用分布式计算的集合模拟概述模拟时间节省96%的计算时间!!!
    发表于 04-10 08:48

    分布式光纤das-tool涉及哪些领域

    一、分布式光纤DAS(声波传感系统) 1. 定义与技术原理 DAS(Distributed Acoustic Sensing) 是一种基于光纤传感技术的分布式声波监测系统。 原理:利用光纤作为传感
    的头像 发表于 04-02 10:26 924次阅读

    VirtualLab Fusion应用:基于分布式计算的AR光波导测试图像的仿真

    (10201次模拟):大约43小时。 模拟结果:不同视场角的辐射通量*。 *注: 21个×21个方向的结果存储在参数连续变化的光栅的查找表。 使用分布式计算 参数运行用于改变当前视场模式的角度,这
    发表于 02-19 08:51

    分布式云化数据库有哪些类型

    分布式云化数据库有哪些类型?分布式云化数据库主要类型包括:关系型分布式数据库、非关系型分布式数据库、新SQL分布式数据库、以列方式存储数据、
    的头像 发表于 01-15 09:43 863次阅读

    大模型训练框架(五)之Accelerate

    轻松切换不同的并行策略,同时它还支持混合精度训练,可以进一步提升训练效率。 1. 导入 Accelerate只需添加四行代码,即可在任何分布式配置运行相同的 PyTorch 代码!让
    的头像 发表于 01-14 14:24 1765次阅读

    基于ptp的分布式系统设计

    在现代分布式系统,精确的时间同步对于确保数据一致性、系统稳定性和性能至关重要。PTP(Precision Time Protocol)是一种网络协议,用于在分布式系统实现高精度的时
    的头像 发表于 12-29 10:09 961次阅读

    HarmonyOS Next 应用元服务开发-分布式数据对象迁移数据文件资产迁移

    填充到分布式数据对象数据。 调用genSessionId()接口生成数据对象组网id,并使用该id调用setSessionId()加入组网,激活分布式数据对象。 使用save()接口将已激活的
    发表于 12-24 10:11

    HarmonyOS Next 应用元服务开发-分布式数据对象迁移数据权限与基础数据

    填充到分布式数据对象数据。 调用genSessionId()接口生成数据对象组网id,并使用该id调用setSessionId()加入组网,激活分布式数据对象。 使用save()接口将已激活的
    发表于 12-24 09:40

    安科瑞Acrel-1000DP分布式光伏监控系统在8.3MWp分布式光伏发电的应用

    安科瑞分布式光伏监控系统在上海汽车变速器有限公司 8.3MWp分布式光伏发电项目中的应用
    发表于 12-16 15:03 0次下载