0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于PyTorch的模型并行分布式训练Megatron解析

jf_pmFSk4VX 来源: 罗西的思考 2023-10-23 11:01 次阅读

0x00 摘要

NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。

本系列大概有6~7篇文章,通过论文和源码和大家一起学习研究。本文将对 Megatron 的基本架构做一下梳理。

0x01 启动

1.1 分布式启动

启动脚本在 examples/pretrain_bert_distributed.sh,其利用了 torch.distributed.launch 来启动多个进程。具体业务代码是 pretrain_bert.py。

因为 GPUS_PER_NODE 是8,所以 nproc_per_node 是8,这样,在本机上就启动了8个进程,每个进程之中含有模型的一部分。进程的 rank 是被 torch.distributed.launch 调用 elastic 自动分配的。

#!/bin/bash

GPUS_PER_NODE=8
#Changeformultinodeconfig
MASTER_ADDR=localhost
MASTER_PORT=6000
NNODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))

DATA_PATH=_text_sentence
CHECKPOINT_PATH=

DISTRIBUTED_ARGS="--nproc_per_node$GPUS_PER_NODE--nnodes$NNODES--node_rank$NODE_RANK--master_addr$MASTER_ADDR--master_port$MASTER_PORT"

python-mtorch.distributed.launch$DISTRIBUTED_ARGS
pretrain_bert.py
--num-layers24
--hidden-size1024
--num-attention-heads16
--micro-batch-size4
--global-batch-size32
--seq-length512
--max-position-embeddings512
--train-iters1000000
--save$CHECKPOINT_PATH
--load$CHECKPOINT_PATH
--data-path$DATA_PATH
--vocab-filebert-vocab.txt
--data-implmmap
--split949,50,1
--distributed-backendnccl
--lr0.0001
--lr-decay-stylelinear
--min-lr1.0e-5
--lr-decay-iters990000
--weight-decay1e-2
--clip-grad1.0
--lr-warmup-fraction.01
--log-interval100
--save-interval10000
--eval-interval1000
--eval-iters10
--fp16

1.2 构造基础

pretrain_bert.py 会调用 pretrain 进行预训练。

if__name__=="__main__":

pretrain(train_valid_test_datasets_provider,model_provider,
ModelType.encoder_or_decoder,
forward_step,args_defaults={'tokenizer_type':'BertWordPieceLowerCase'})

1.2.1 获取模型

model_provider返回模型普通版本(vanilla version)。所谓vanilla,我们指的是一个简单的cpu模型,没有 fp16或 ddp,但是已经被 Megatron 改造为并行的版本。

defmodel_provider(pre_process=True,post_process=True):
"""Buildthemodel."""

print_rank_0('buildingBERTmodel...')

args=get_args()
num_tokentypes=2ifargs.bert_binary_headelse0
model=BertModel(
num_tokentypes=num_tokentypes,
add_binary_head=args.bert_binary_head,
parallel_output=True,
pre_process=pre_process,
post_process=post_process)

returnmodel

1.2.2 获取数据集

train_valid_test_datasets_provider 会接受train/valid/test数据集的大小,并返回 “train,valid,test” 数据集。

deftrain_valid_test_datasets_provider(train_val_test_num_samples):
"""Buildtrain,valid,andtestdatasets."""
args=get_args()

print_rank_0('>buildingtrain,validation,andtestdatasets'
'forBERT...')
train_ds,valid_ds,test_ds=build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=(notargs.mmap_warmup),
binary_head=args.bert_binary_head)
print_rank_0(">finishedcreatingBERTdatasets...")

returntrain_ds,valid_ds,test_ds

1.2.3 步进函数

forward_step函数接受一个“数据迭代器”和“模型”,并返回一个“loss”标量,该标量带有一个字典,其中key:value是希望在训练期间监视的信息,例如“lm loss:value”。还要求此函数将“batch generator”添加到timers类中。

defforward_step(data_iterator,model):
"""Forwardstep."""
args=get_args()

#Getthebatch.
tokens,types,sentence_order,loss_mask,lm_labels,padding_mask=get_batch(
data_iterator)

ifnotargs.bert_binary_head:
types=None

#Forwardpassthroughthemodel.
output_tensor=model(tokens,padding_mask,tokentype_ids=types,
lm_labels=lm_labels)

returnoutput_tensor,partial(loss_func,loss_mask,sentence_order)

1.2.3.1 广播数据

forward_step 会调用 get_batch 获取batch 数据,其内部会从迭代器获取数据,然后使用broadcast_data函数把输入数据从 rank 0 广播到所有tensor-model-parallel 其他 ranks之上。

注意,数据并行是把不同数据加载到不同的rank之上,而 Tensor模型并行组之中每个rank都加载同样数据。

defget_batch(data_iterator):
"""Buildthebatch."""

#Itemsandtheirtype.
keys=['text','types','labels','is_random','loss_mask','padding_mask']
datatype=torch.int64

#Broadcastdata.
ifdata_iteratorisnotNone:
data=next(data_iterator)#获取数据
else:
data=None
data_b=mpu.broadcast_data(keys,data,datatype)#把数据广播到各个GPU

#Unpack.
tokens=data_b['text'].long()
types=data_b['types'].long()
sentence_order=data_b['is_random'].long()
loss_mask=data_b['loss_mask'].float()
lm_labels=data_b['labels'].long()
padding_mask=data_b['padding_mask'].long()

returntokens,types,sentence_order,loss_mask,lm_labels,padding_mask

broadcast_data 在每个model parallel group之上,把数据从rank 0发送到同组其他成员。

defbroadcast_data(keys,data,datatype):
"""Broadcastdatafromrankzeroofeachmodelparallelgrouptothe
membersofthesamemodelparallelgroup.

Arguments:
keys:listofkeysinthedatadisctionarytobebroadcasted
data:datadictionaryofstringkeysandcputensorvalues.
datatype:torchdatatypeofalltensorsindataassociated
withkeys.
"""
#Build(key,size)and(key,numberofelements)dictionariesalong
#withthetotalnumberofelementsonallranks.
key_size,key_numel,total_numel=_build_key_size_numel_dictionaries(keys,
data)

#Packonrankzero.
ifget_tensor_model_parallel_rank()==0:#rank0才压缩
#Checkthatallkeyshavethesamedatatype.
_check_data_types(keys,data,datatype)
#Flattenthedataassociatedwiththekeys
flatten_data=torch.cat(
[data[key].contiguous().view(-1)forkeyinkeys],dim=0).cuda()
else:
flatten_data=torch.empty(total_numel,
device=torch.cuda.current_device(),
dtype=datatype)

#Broadcast
torch.distributed.broadcast(flatten_data,get_tensor_model_parallel_src_rank(),
group=get_tensor_model_parallel_group())

#Unpack
output={}
offset=0
forkeyinkeys:
size=key_size[key]
numel=key_numel[key]
output[key]=flatten_data.narrow(0,offset,numel).view(size)
offset+=numel

returnoutput

get_tensor_model_parallel_src_rank 计算与张量模型并行组中第一个local rank对应的全局rank。

defget_tensor_model_parallel_src_rank():
"""Calculatetheglobalrankcorrespondingtothefirstlocalrank
inthetensormodelparallelgroup."""
global_rank=torch.distributed.get_rank()
local_world_size=get_tensor_model_parallel_world_size()
return(global_rank//local_world_size)*local_world_size

逻辑图具体如下,三个不同的函数分别为预训练提供不同的功能输入,做到了解耦。

ccdc1b94-70af-11ee-939d-92fbcf53809c.jpg

0x02 Pretrain

BERT训练主要分为两步:

Pre-train:pre-train是迁移学习的基础,是训练token-level的语义理解。

Fine-tuning:在已经训练好的语言模型基础之上,加入特定领域(比如金融医疗)的参数来重新训练,比如对于分类问题就可以在pre-train模型基础之上加上一个softmax,再使用语料 fine-tune。

Pre-train 主要如下:

初始化Megatron。

使用model_provider设置模型、优化器和lr计划。

调用train_val_test_data_provider以获取train/val/test数据集。

使用forward_step_func训练模型。

具体代码如下:

defpretrain(train_valid_test_dataset_provider,
model_provider,
model_type,
forward_step_func,
extra_args_provider=None,
args_defaults={}):
"""Maintrainingprogram.

Thisfunctionwillrunthefollowingsintheorderprovided:
1)initializeMegatron.
2)setupmodel,optimizerandlrscheduleusingthemodel_provider.
3)calltrain_val_test_data_providertogettrain/val/testdatasets.
4)trainthemodleusingtheforward_step_func.
"""

#Initalizeandgetarguments,timers,andTensorboardwriter.
initialize_megatron(extra_args_provider=extra_args_provider,
args_defaults=args_defaults)

#Adjustthestartuptimesoitreflectsthelargestvalue.
#Thiswillbeclosertowhatschedulerwillsee(outsideof
#image...launches.
global_TRAIN_START_TIME
start_time_tensor=torch.cuda.DoubleTensor([_TRAIN_START_TIME])
torch.distributed.all_reduce(start_time_tensor,
op=torch.distributed.ReduceOp.MIN)
_TRAIN_START_TIME=start_time_tensor.item()

args=get_args()
timers=get_timers()

#Model,optimizer,andlearningrate.使用model_provider设置模型、优化器和lr计划
model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider,
model_type)

#Datastuff.调用train_val_test_data_provider以获取train/val/测试数据集
ifargs.virtual_pipeline_model_parallel_sizeisnotNone:
all_data_iterators=[
build_train_valid_test_data_iterators(train_valid_test_dataset_provider)
for_inrange(len(model))
]
train_data_iterator=[data_iterators[0]fordata_iteratorsinall_data_iterators]
valid_data_iterator=[data_iterators[1]fordata_iteratorsinall_data_iterators]
test_data_iterator=[data_iterators[2]fordata_iteratorsinall_data_iterators]
else:
train_data_iterator,valid_data_iterator,test_data_iterator
=build_train_valid_test_data_iterators(
train_valid_test_dataset_provider)

iteration=0
ifargs.do_trainandargs.train_iters>0:
iteration=train(forward_step_func,#训练模型
model,optimizer,lr_scheduler,
train_data_iterator,valid_data_iterator)

ifargs.do_valid:
prefix='theendoftrainingforvaldata'
evaluate_and_print_results(prefix,forward_step_func,
valid_data_iterator,model,
iteration,False)

ifargs.saveanditeration!=0:
save_checkpoint(iteration,model,optimizer,lr_scheduler)

ifargs.do_test:
#Runontestdata.
prefix='theendoftrainingfortestdata'
evaluate_and_print_results(prefix,forward_step_func,
test_data_iterator,model,
0,True)

对于我们分析来说,initialize_megatron 是重点,这里初始化了 megatron。

0x03 初始化

3.1 initialize_megatron

initialize_megatron 方法会设置全局变量,初始化分布式环境等等。

definitialize_megatron(extra_args_provider=None,args_defaults={},
ignore_unknown_args=False,allow_no_cuda=False):
"""Setglobalvariables,initializedistributed,and
setautoresumeandrandomseeds.
`allow_no_cuda`shouldnotbesetunlessusingmegatronforcpuonly
dataprocessing.Ingeneralthisargshouldnotbesetunlessyouknow
whatyouaredoing.
Returnsafunctiontofinalizedistributedenvinitialization
(optionally,onlywhenargs.lazy_mpu_init==True)
"""
ifnotallow_no_cuda:
#Makesurecudaisavailable.
asserttorch.cuda.is_available(),'MegatronrequiresCUDA.'

#Parseargs,buildtokenizer,andsetadlr-autoresume,
#tensorboard-writer,andtimers.
set_global_variables(extra_args_provider=extra_args_provider,#设置全局变量
args_defaults=args_defaults,
ignore_unknown_args=ignore_unknown_args)

#torch.distributedinitialization
deffinish_mpu_init():
args=get_args()
#Pytorchdistributed.
_initialize_distributed()#设置分布式

#Randomseedsforreproducibility.
ifargs.rank==0:
print('>settingrandomseedsto{}...'.format(args.seed))
_set_random_seed(args.seed)

#SetpytorchJITlayerfusionoptions.
_set_jit_fusion_options()

args=get_args()
ifargs.lazy_mpu_init:
args.use_cpu_initialization=True
#delayedinitializationofDDP-relatedstuff
#WeonlysetbasicDDPglobals
set_tensor_model_parallel_world_size(args.tensor_model_parallel_size)
#andreturnfunctionforexternalDDPmanager
#tocallwhenithasDDPinitialized
set_tensor_model_parallel_rank(args.rank)
returnfinish_mpu_init
else:
#Megatron'sMPUisthemaster.Completeinitializationrightaway.
finish_mpu_init()

#Autoresume.
_init_autoresume()

#Compiledependencies.
_compile_dependencies()

#Nocontinuationfunction
returnNone

3.2 初始化分布式环境

_initialize_distributed 代码位于 megatron/initialize.py,此方法会:

调用 torch.distributed.init_process_group 初始化分布式环境。

调用 mpu.initialize_model_parallel 来设置模型并行,数据并行等各种进程组,我们下文会重点讨论。

创建完worker进程之后,程序需要知道哪些进程在训练同一个模型,torch.distributed.init_process_group 就实现了这个功能。torch.distributed.init_process_group 会生成一个进程组,同组内进程训练同一个模型,也能确定用什么方式进行通信。进程组会给组内每个进程一个序号,就是gloabl rank,如果是多机并行,每个机器创建的进程之间也有一个序号,就是 local rank。如果是单机多卡并行,local rank 和 global rank是一致的。

def_initialize_distributed():
"""Initializetorch.distributedandmpu."""
args=get_args()

device_count=torch.cuda.device_count()
iftorch.distributed.is_initialized():
args.rank=torch.distributed.get_rank()
args.world_size=torch.distributed.get_world_size()
else:
#Manuallysetthedeviceids.
ifdevice_count>0:
device=args.rank%device_count
ifargs.local_rankisnotNone:
assertargs.local_rank==device,
'expectedlocal-ranktobethesameasrank%device-count.'
else:
args.local_rank=device
torch.cuda.set_device(device)
#Calltheinitprocess
torch.distributed.init_process_group(#初始化PyTorch分布式环境
backend=args.distributed_backend,
world_size=args.world_size,rank=args.rank,
timeout=timedelta(minutes=10))

#Setthetensormodel-parallel,pipelinemodel-parallel,and
#data-parallelcommunicators.
ifdevice_count>0:
ifmpu.model_parallel_is_initialized():
print('modelparallelisalreadyinitialized')
else:
#初始化模型并行,比如设置各种进程组
mpu.initialize_model_parallel(args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size,
args.pipeline_model_parallel_split_rank)

3.3 初始化进程组全局变量

因为调用了 mpu.initialize_model_parallel 来设置模型并行,数据并行等各种进程组,所以我们假定目前进程组都已经设置成功,所以每个 rank 对应的进程都有自己的全局变量。假定目前有16个GPU,属于两个node,rank 0 ~7 属于第一个节点,rank 8 ~ 15 属于第二个节点。下面的 gi 指的是第 i 个 GPU。

_TENSOR_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是tensor 并行进程组。

假如每一层分为两个tensor,则 _TENSOR_MODEL_PARALLEL_GROUP 例子为:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。

_PIPELINE_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是流水线进程组。

假如流水线深度为4,则例子为 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。

_MODEL_PARALLEL_GROUP :当前 rank 所属于的模型并行进程组,包括了以上两组。

针对我们例子,就是完整模型被复制了两份,两份分别对应的 GPU 具体是[0, 1, 4, 5, 8, 9, 12, 13],[2, 3, 6, 7, 10, 11, 14, 15]

_EMBEDDING_GROUP :嵌入对应的进程组。

_DATA_PARALLEL_GROUP :当前 rank 所属于的Data parallel group。

假如数据并行度数为2,则例子为[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。

#Intra-layermodelparallelgroupthatthecurrentrankbelongsto.
_TENSOR_MODEL_PARALLEL_GROUP=None
#Inter-layermodelparallelgroupthatthecurrentrankbelongsto.
_PIPELINE_MODEL_PARALLEL_GROUP=None
#Modelparallelgroup(bothintra-andpipeline)thatthecurrentrankbelongsto.
_MODEL_PARALLEL_GROUP=None
#Embeddinggroup.
_EMBEDDING_GROUP=None
#Dataparallelgroupthatthecurrentrankbelongsto.
_DATA_PARALLEL_GROUP=None

0x04 设置模型

在 Pretrain 之中,会调用如下来设置模型,优化器等等。

#Model,optimizer,andlearningrate.使用model_provider设置模型、优化器和lr计划
model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider,
model_type)

4.1 setup_model_and_optimizer

setup_model_and_optimizer 方法会设置模型和优化器,其中重点是get_model。

defsetup_model_and_optimizer(model_provider_func,model_type):
"""Setupmodelandoptimizer."""
args=get_args()
model=get_model(model_provider_func,model_type)
unwrapped_model=unwrap_model(model,
(torchDDP,LocalDDP,Float16Module))
optimizer=get_megatron_optimizer(unwrapped_model)
lr_scheduler=get_learning_rate_scheduler(optimizer)

ifargs.loadisnotNone:
timers=get_timers()
#Extrabarrierisaddedtomakesureallranksreportthe
#maxtime.
torch.distributed.barrier()
args.iteration=load_checkpoint(model,optimizer,lr_scheduler)
torch.distributed.barrier()
else:
args.iteration=0

#WeonlysupportlocalDDPwithmultiplemicro-batches.
iflen(model)>1ormpu.get_pipeline_model_parallel_world_size()>1:
assertargs.DDP_impl=='local'

#getmodelwithoutFP16and/orTorchDDPwrappers
ifargs.iteration==0andlen(unwrapped_model)==1
andhasattr(unwrapped_model[0],'init_state_dict_from_bert'):
unwrapped_model[0].init_state_dict_from_bert()
ifargs.fp16:
optimizer.reload_model_params()

returnmodel,optimizer,lr_scheduler

4.2 模型

4.2.1 BertModel

我们首先看看 BertModel 的初始化函数,略过其他功能函数。其主要调用了 get_language_model。

classBertModel(MegatronModule):
"""BertLanguagemodel."""

def__init__(self,
num_tokentypes=2,
add_binary_head=True,
parallel_output=True,
pre_process=True,
post_process=True):
super(BertModel,self).__init__()
args=get_args()

self.fp16_lm_cross_entropy=args.fp16_lm_cross_entropy
self.add_binary_head=add_binary_head
self.parallel_output=parallel_output
self.pre_process=pre_process
self.post_process=post_process

init_method=init_method_normal(args.init_method_std)
scaled_init_method=scaled_init_method_normal(args.init_method_std,
args.num_layers)

#获取语言模型
self.language_model,self._language_model_key=get_language_model(
num_tokentypes=num_tokentypes,
add_pooler=self.add_binary_head,
encoder_attn_mask_type=AttnMaskType.padding,
init_method=init_method,
scaled_init_method=scaled_init_method,
pre_process=self.pre_process,
post_process=self.post_process)

self.initialize_word_embeddings(init_method_normal)
ifself.post_process:#如果是最后一层,会特殊处理
self.lm_head=BertLMHead(
self.word_embeddings_weight().size(0),
args.hidden_size,init_method,args.layernorm_epsilon,parallel_output)
self._lm_head_key='lm_head'
self.binary_head=None
ifself.add_binary_head:
self.binary_head=get_linear_layer(args.hidden_size,2,
init_method)
self._binary_head_key='binary_head'

4.2.2 语言模型

get_language_model 会获取一个 TransformerLanguageModel。

defget_language_model(num_tokentypes,add_pooler,
encoder_attn_mask_type,init_method=None,
scaled_init_method=None,add_encoder=True,
add_decoder=False,
decoder_attn_mask_type=AttnMaskType.causal,
pre_process=True,post_process=True):
"""Buildlanguagemodelandreturnalongwiththekeytosave."""
args=get_args()

ifinit_methodisNone:
init_method=init_method_normal(args.init_method_std)

ifscaled_init_methodisNone:
scaled_init_method=scaled_init_method_normal(args.init_method_std,
args.num_layers)

#Languagemodel.
language_model=TransformerLanguageModel(
init_method,
scaled_init_method,
encoder_attn_mask_type,
num_tokentypes=num_tokentypes,
add_encoder=add_encoder,
add_decoder=add_decoder,
decoder_attn_mask_type=decoder_attn_mask_type,
add_pooler=add_pooler,
pre_process=pre_process,
post_process=post_process
)
#keyusedforcheckpoints.
language_model_key='language_model'

returnlanguage_model,language_model_key

TransformerLanguageModel 就是具体的语言模型,其中重要的是 ParallelTransformer。这里会依据传入的配置来进行生成。

如果是第一层,即有 pre_process,则会加入 embedding layer。

如果是中间层,则会根据 encoder 还是 decoder 来生成对应的 ParallelTransformer。

如果是最后一层,即有 post_process,则会加入 Pooler,在外层 BertModel 也会有对应处理。

classTransformerLanguageModel(MegatronModule):
"""Transformerlanguagemodel.

Arguments:
transformer_hparams:transformerhyperparameters
vocab_size:vocabularysize
max_sequence_length:maximumsizeofsequence.This
isusedforpositionalembedding
embedding_dropout_prob:dropoutprobabilityforembeddings
num_tokentypes:sizeofthetoken-typeembeddings.0value
willignorethisembedding
"""

def__init__(self,
init_method,
output_layer_init_method,
encoder_attn_mask_type,
num_tokentypes=0,
add_encoder=True,
add_decoder=False,
decoder_attn_mask_type=AttnMaskType.causal,
add_pooler=False,
pre_process=True,
post_process=True):
super(TransformerLanguageModel,self).__init__()
args=get_args()

self.pre_process=pre_process
self.post_process=post_process
self.hidden_size=args.hidden_size
self.num_tokentypes=num_tokentypes
self.init_method=init_method
self.add_encoder=add_encoder
self.encoder_attn_mask_type=encoder_attn_mask_type
self.add_decoder=add_decoder
self.decoder_attn_mask_type=decoder_attn_mask_type
self.add_pooler=add_pooler
self.encoder_hidden_state=None

#Embeddings.
ifself.pre_process:
self.embedding=Embedding(self.hidden_size,
args.padded_vocab_size,
args.max_position_embeddings,
args.hidden_dropout,
self.init_method,
self.num_tokentypes)
self._embedding_key='embedding'

#Transformer.
#Encoder(usuallysettoTrue,Falseifpartofanencoder-decoder
#architectureandinencoder-onlystage).
ifself.add_encoder:
self.encoder=ParallelTransformer(
self.init_method,
output_layer_init_method,
self_attn_mask_type=self.encoder_attn_mask_type,
pre_process=self.pre_process,
post_process=self.post_process
)
self._encoder_key='encoder'
else:
self.encoder=None

#Decoder(usuallysettoFalse,Trueifpartofanencoder-decoder
#architectureandindecoder-onlystage).
ifself.add_decoder:
#Temporaryassertionuntilweverifycorrectnessofpipelineparallelism
#implementationofT5.
self.decoder=ParallelTransformer(
self.init_method,
output_layer_init_method,
layer_type=LayerType.decoder,
self_attn_mask_type=self.decoder_attn_mask_type,
pre_process=self.pre_process,
post_process=self.post_process)
self._decoder_key='decoder'
else:
self.decoder=None

ifself.post_process:
#Pooler.
ifself.add_pooler:
self.pooler=Pooler(self.hidden_size,self.init_method)
self._pooler_key='pooler'

4.2.3 ParallelTransformer

这里会调用 ParallelTransformerLayer 生成具体的 Transformer层,我们会在后文中进行分析。

即,ParallelTransformer 包括多个 Transformer,其中每层 Transformer 是一个 ParallelTransformerLayer。

classParallelTransformer(MegatronModule):
"""Transformerclass."""

def__init__(self,init_method,output_layer_init_method,
layer_type=LayerType.encoder,
self_attn_mask_type=AttnMaskType.padding,
pre_process=True,post_process=True):
super(ParallelTransformer,self).__init__()
args=get_args()

self.bf16=args.bf16
self.fp32_residual_connection=args.fp32_residual_connection
self.pre_process=pre_process
self.post_process=post_process
self.input_tensor=None

#Storeactivationcheckpoitingflag.
self.activations_checkpoint_method=args.activations_checkpoint_method
self.activations_checkpoint_num_layers=args.activations_checkpoint_num_layers
self.distribute_checkpointed_activations=args.distribute_checkpointed_activations

#Numberoflayers.
self.num_layers=mpu.get_num_layers(#获得本Transformer的具体层数
args,args.model_type==ModelType.encoder_and_decoder)

#Transformerlayers.
defbuild_layer(layer_number):
returnParallelTransformerLayer(#返回一层Transformmer
init_method,
output_layer_init_method,
layer_number,
layer_type=layer_type,
self_attn_mask_type=self_attn_mask_type)
ifargs.virtual_pipeline_model_parallel_sizeisnotNone:
#Numberoflayersineachmodelchunkisthenumberoflayersinthestage,
#dividedbythenumberofmodelchunksinastage.
self.num_layers=self.num_layers//args.virtual_pipeline_model_parallel_size
#With8layers,2stages,and4modelchunks,wewantanassignmentof
#layerstostageslike(eachlistisamodelchunk):
#Stage0:[0][2][4][6]
#Stage1:[1][3][5][7]
#With8layers,2stages,and2virtualstages,wewantanassignmentof
#layerstostageslike(eachlistisamodelchunk):
#Stage0:[0,1][4,5]
#Stage1:[2,3][6,7]
offset=mpu.get_virtual_pipeline_model_parallel_rank()*(
args.num_layers//args.virtual_pipeline_model_parallel_size)+
(mpu.get_pipeline_model_parallel_rank()*self.num_layers)
else:
#Eachstagegetsacontiguoussetoflayers.
offset=mpu.get_pipeline_model_parallel_rank()*self.num_layers

self.layers=torch.nn.ModuleList(#生成num_layers个Transformer
[build_layer(i+1+offset)foriinrange(self.num_layers)])

ifself.post_process:
#Finallayernormbeforeoutput.
self.final_layernorm=LayerNorm(
args.hidden_size,
eps=args.layernorm_epsilon,
no_persist_layer_norm=args.no_persist_layer_norm)

目前逻辑如下,我们假定有两个 transformer:

ccf0ac8a-70af-11ee-939d-92fbcf53809c.jpg

4.2.3.1 获取层数

这里一个重点就是获取层数,即获取本模型在并行处理状况下,应该拥有多少层。如果模型一共64层,流水线深度为16,则并行每个阶段有4层,则本子模型拥有4层。

defget_num_layers(args,is_encoder_and_decoder_model):
"""Computethenumberoftransformerlayersresidentonthecurrentrank."""
ifget_pipeline_model_parallel_world_size()>1:
ifis_encoder_and_decoder_model:
assertargs.pipeline_model_parallel_split_rankisnotNone
num_ranks_in_encoder=args.pipeline_model_parallel_split_rank
num_ranks_in_decoder=get_pipeline_model_parallel_world_size()-num_ranks_in_encoder
ifis_pipeline_stage_before_split():
num_layers=args.num_layers//num_ranks_in_encoder
else:
num_layers=args.num_layers//num_ranks_in_decoder
else:
num_layers=args.num_layers//get_pipeline_model_parallel_world_size()
else:
num_layers=args.num_layers
returnnum_layers

get_pipeline_model_parallel_world_size 获取本流水线组world size数目,就是流水线深度。

defget_pipeline_model_parallel_world_size():
"""Returnworldsizeforthepipelinemodelparallelgroup."""
global_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
if_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZEisnotNone:
return_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
returntorch.distributed.get_world_size(group=get_pipeline_model_parallel_group())

_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE 的意思是流水线深度 p,就是纵向切 p-1刀。比如一共 12 层,纵向切 5 刀,则有 6 个stage,每个 stage 有 2 层。

4.2.3.2 前向传播

我们接着看看其前向传播函数,这里主要就是调用内部 ParallelTransformerLayer 的 forward 方法,如果是第一层或者最后一层,则做特殊处理。

defforward(self,hidden_states,attention_mask,
encoder_output=None,enc_dec_attn_mask=None,
inference_params=None):

ifself.pre_process:
#Dataformatchangetoavoidexplicittranposes:[bsh]-->[sbh].
#Iftheinputflagforfp32residualconnectionisset,convertforfloat.
ifself.fp32_residual_connection:
hidden_states=hidden_states.transpose(0,1).contiguous().float()
#Otherwise,leaveitasis.
else:
hidden_states=hidden_states.transpose(0,1).contiguous()
else:
#Seeset_input_tensor()
hidden_states=self.input_tensor

ifencoder_outputisnotNone:
encoder_output=encoder_output.transpose(0,1).contiguous()

ifself.activations_checkpoint_methodisnotNone:
hidden_states=self._checkpointed_forward(hidden_states,
attention_mask,
encoder_output,
enc_dec_attn_mask)
else:
forindexinrange(self.num_layers):
layer=self._get_layer(index)
hidden_states=layer(#调用ParallelTransformerLayer的forward函数
hidden_states,
attention_mask,
encoder_output=encoder_output,
enc_dec_attn_mask=enc_dec_attn_mask,
inference_params=inference_params)


#Finallayernorm.
ifself.post_process:
#Revertingdataformatchange[sbh]-->[bsh].
hidden_states=hidden_states.transpose(0,1).contiguous()
output=self.final_layernorm(hidden_states)
else:
output=hidden_states

returnoutput

4.3 get_model

现在让我们回到 get_model,把生成模型的流程整理出来。

BERT之中含有多个transformer,所以直接按照层数切分,每一层是一模一样的transformer layer。前面提到了,在我们样例之中启动了8个进程,每个进程里面有一个子模型,即原始BERT模型的部分层。但是怎么知道每个子模型包含了多少层?答案是:因为已经建立了各种进程组,所以 get_model 方法会依据目前进程组情况进行处理。单个进程内模型获取如下:

如果是有 virtual 设置,则会遍历 virtual size,生成对应数目的模型(BertModel)。

否则如果是 encoder_and_decoder,则针对split进行配置。

设置 tensor model parallel 属性。

把本模型放置到GPU之上。

如果需要数据并行,则配置DDP。

具体代码如下:

defget_model(model_provider_func,model_type=ModelType.encoder_or_decoder,wrap_with_ddp=True):
"""Buildthemodel."""
args=get_args()
args.model_type=model_type

#Buildmodel.
ifmpu.get_pipeline_model_parallel_world_size()>1and
args.virtual_pipeline_model_parallel_sizeisnotNone:#有virtual设置,后续会提到
model=[]
foriinrange(args.virtual_pipeline_model_parallel_size):#遍历virtual
#设置rank,主要是为了看是不是第一层,最后一层
mpu.set_virtual_pipeline_model_parallel_rank(i)
#Setpre_processandpost_processonlyaftervirtualrankisset.
pre_process=mpu.is_pipeline_first_stage()
post_process=mpu.is_pipeline_last_stage()
this_model=model_provider_func(#获取原始模型BertModel
pre_process=pre_process,
post_process=post_process
)
this_model.model_type=model_type
model.append(this_model)#模型列表之中添加一个新的BertModel
else:
pre_process=mpu.is_pipeline_first_stage()#是不是第一层
post_process=mpu.is_pipeline_last_stage()#是不是最后一层
add_encoder=True
add_decoder=True
ifmodel_type==ModelType.encoder_and_decoder:
ifmpu.get_pipeline_model_parallel_world_size()>1:
rank=mpu.get_pipeline_model_parallel_rank()
split_rank=args.pipeline_model_parallel_split_rank
world_size=mpu.get_pipeline_model_parallel_world_size()
pre_process=rank==0orrank==split_rank#是不是第一层
post_process=(rank==(split_rank-1))or(#是不是最后一层
rank==(world_size-1))
add_encoder=mpu.is_pipeline_stage_before_split()
add_decoder=mpu.is_pipeline_stage_after_split()
model=model_provider_func(#获取原始模型
pre_process=pre_process,
post_process=post_process,
add_encoder=add_encoder,
add_decoder=add_decoder)
else:
model=model_provider_func(#获取原始模型
pre_process=pre_process,
post_process=post_process
)
model.model_type=model_type

ifnotisinstance(model,list):
model=[model]

#Settensormodelparallelattributesifnotset.
#Onlyparametersthatarealreadytensormodelparallelhavethese
#attributessetforthem.Weshouldmakesurethedefaultattributes
#aresetforallparamssotheoptimizercanusethem.
formodel_moduleinmodel:
forparaminmodel_module.parameters():
mpu.set_defaults_if_not_set_tensor_model_parallel_attributes(param)

#GPUallocation.
formodel_moduleinmodel:#把本模型放置到GPU之上
model_module.cuda(torch.cuda.current_device())

#Fp16conversion.
ifargs.fp16orargs.bf16:
model=[Float16Module(model_module,args)formodel_moduleinmodel]

ifwrap_with_ddp:#如果需要数据并行,则配置DDP
ifargs.DDP_impl=='torch':
i=torch.cuda.current_device()
model=[torchDDP(model_module,device_ids=[i],output_device=i,
process_group=mpu.get_data_parallel_group())
formodel_moduleinmodel]

elifargs.DDP_impl=='local':
model=[LocalDDP(model_module,
args.accumulate_allreduce_grads_in_fp32,
args.use_contiguous_buffers_in_local_ddp)
formodel_moduleinmodel]

else:
raiseNotImplementedError('UnknownDDPimplementationspecified:'
'{}.Exiting.'.format(args.DDP_impl))

returnmodel

单个进程内的逻辑大致如下,这里 torchDDP 的意思是把 BertModel 之中的 module 用 torchDDP 来封装。

ccfb70ca-70af-11ee-939d-92fbcf53809c.jpg

0x05 数据并行

5.1 设置数据

build_train_valid_test_data_iterators 方法会对数据进行处理,提供了 train,valid,test 三种不同的数据集。

defbuild_train_valid_test_data_iterators(
build_train_valid_test_datasets_provider):
"""XXX"""
args=get_args()
(train_dataloader,valid_dataloader,test_dataloader)=(None,None,None)


#Backwardcompatibility,assumefixedbatchsize.
ifargs.iteration>0andargs.consumed_train_samples==0:
args.consumed_train_samples=args.iteration*args.global_batch_size
ifargs.iteration>0andargs.consumed_valid_samples==0:
ifargs.train_samplesisNone:
args.consumed_valid_samples=(args.iteration//args.eval_interval)*
args.eval_iters*args.global_batch_size

#Dataloaderonlyonrank0ofeachmodelparallelgroup.
ifmpu.get_tensor_model_parallel_rank()==0:

#Numberoftrain/valid/testsamples.
ifargs.train_samples:
train_samples=args.train_samples
else:
train_samples=args.train_iters*args.global_batch_size
eval_iters=(args.train_iters//args.eval_interval+1)*
args.eval_iters
test_iters=args.eval_iters
train_val_test_num_samples=[train_samples,
eval_iters*args.global_batch_size,
test_iters*args.global_batch_size]

#Buildthedatasets.
train_ds,valid_ds,test_ds=build_train_valid_test_datasets_provider(
train_val_test_num_samples)

#Builddataloders.
train_dataloader=build_pretraining_data_loader(
train_ds,args.consumed_train_samples)
valid_dataloader=build_pretraining_data_loader(
valid_ds,args.consumed_valid_samples)
test_dataloader=build_pretraining_data_loader(test_ds,0)

#Flagstoknowifweneedtodotraining/validation/testing.
do_train=train_dataloaderisnotNoneandargs.train_iters>0
do_valid=valid_dataloaderisnotNoneandargs.eval_iters>0
do_test=test_dataloaderisnotNoneandargs.eval_iters>0
#Needtobroadcastnum_tokensandnum_type_tokens.
flags=torch.cuda.LongTensor(
[int(do_train),int(do_valid),int(do_test)])
else:
flags=torch.cuda.LongTensor([0,0,0])

#Broadcastnumtokens.
torch.distributed.broadcast(flags,
mpu.get_tensor_model_parallel_src_rank(),
group=mpu.get_tensor_model_parallel_group())
args.do_train=flags[0].item()
args.do_valid=flags[1].item()
args.do_test=flags[2].item()

#Builditerators.
dl_type=args.dataloader_type

iftrain_dataloaderisnotNone:
train_data_iterator=iter(train_dataloader)ifdl_type=='single'
elseiter(cyclic_iter(train_dataloader))
else:
train_data_iterator=None

ifvalid_dataloaderisnotNone:
valid_data_iterator=iter(valid_dataloader)ifdl_type=='single'
elseiter(cyclic_iter(valid_dataloader))
else:
valid_data_iterator=None

iftest_dataloaderisnotNone:
test_data_iterator=iter(test_dataloader)ifdl_type=='single'
elseiter(cyclic_iter(test_dataloader))
else:
test_data_iterator=None

returntrain_data_iterator,valid_data_iterator,test_data_iterator

5.2 DDP

在 get_model 之中,有如下代码使用 DDP。

frommegatron.modelimportDistributedDataParallelasLocalDDP
fromtorch.nn.parallel.distributedimportDistributedDataParallelastorchDDP

ifwrap_with_ddp:
ifargs.DDP_impl=='torch':
i=torch.cuda.current_device()
model=[torchDDP(model_module,device_ids=[i],output_device=i,
process_group=mpu.get_data_parallel_group())
formodel_moduleinmodel]

elifargs.DDP_impl=='local':
model=[LocalDDP(model_module,
args.accumulate_allreduce_grads_in_fp32,
args.use_contiguous_buffers_in_local_ddp)
formodel_moduleinmodel]

else:
raiseNotImplementedError('UnknownDDPimplementationspecified:'
'{}.Exiting.'.format(args.DDP_impl))

所以我们看看 megatron 自己的 DDP实现。

5.2.1 定义

定义只有注释可以看看,使用连续的(contiguous)内存来存储和累积梯度,每一种类型的张量属于一个统一的内存,可以统一做 allreduce。

classDistributedDataParallel(DistributedDataParallelBase):
"""DDPwithcontiguousbuffersoptionstostorreandaccumulategradients.
Thisclass:
-hasthepotentialtoreducememoryfragmentation.
-providestheoptiontodothegradientaccumulation
inatypeotherthantheparamstype(forexamplefp32)

Arguments:
module:inputmodel.
accumulate_allreduce_grads_in_fp32:iftruedothegradientaccumulation
andthegradientall-reduceallininfloat32.Ifthisoptionis
true,werequire`use_contiguous_buffers`tobetruetoo.
use_contiguous_buffers:iftrue,useacontiguousbuffertostorethe
gradients.
"""

5.2.2 初始化

初始化方法的目的是把同类型梯度连续存储。

def__init__(self,module,
accumulate_allreduce_grads_in_fp32,
use_contiguous_buffers):

super(DistributedDataParallel,self).__init__(module)

self.accumulate_allreduce_grads_in_fp32
=accumulate_allreduce_grads_in_fp32
self.use_contiguous_buffers=use_contiguous_buffers
#Ifweareusingfp32-accumulate-allreduceexplicitly
#thismeansweneedmaingradsinacontinousbuffer.
ifself.accumulate_allreduce_grads_in_fp32:
assertself.use_contiguous_buffers

#===================================
#Restofthispartappliesonlyto
#thecaseweusecontinuousbuffers.
#===================================
self._grad_buffers=None
ifself.use_contiguous_buffers:#这里只考虑连续内存
self._grad_buffers={}#定义buffer

#Simplefunctiontodefinebuffertype.
def_get_buffer_type(param):#返回buffer类型
returntorch.floatif
self.accumulate_allreduce_grads_in_fp32elseparam.dtype

#Firstcalculatetotalnumberofelementspertype.
type_num_elements={}
forparaminself.module.parameters():#遍历模型参数
ifparam.requires_grad:#如果需要计算梯度
dtype=_get_buffer_type(param)#获取参数类型
type_num_elements[dtype]=type_num_elements.get(dtype,0)
+param.data.nelement()#该类型参数数目做相应增加

#目前type_num_elements是各种类型参数的个数
#Allocatethebuffer.
fordtype,num_elementsintype_num_elements.items():#遍历各种类型
self._grad_buffers[dtype]=MemoryBuffer(num_elements,dtype)#分配内存

#这里是假定反向传播是参数的反方向,存储每个参数梯度的起始位置
#Assumethebackproporderisreversetheparamsorder,
#storethestartindexforthegradients.
forparaminself.module.parameters():#遍历模型参数
ifparam.requires_grad:#如果需要计算梯度
dtype=_get_buffer_type(param)#获取参数类型
type_num_elements[dtype]-=param.data.nelement()#减少size
#确定该参数在MemoryBuffer的位置
param.main_grad=self._grad_buffers[dtype].get(#获取该参数对应的内存
param.data.shape,type_num_elements[dtype])

#Backwardhook.
#Accumalationfunctionforthegradients.Weneed
#tostorethemsotheydon'tgooutofscope.
self.grad_accs=[]
#Loopoveralltheparametersinthemodel.
forparaminself.module.parameters():#遍历模型参数
ifparam.requires_grad:#如果需要计算梯度
#Expandsowegetaccesstograd_fn.
param_tmp=param.expand_as(param)
#Getthegradientaccumulatorfuncttion.
grad_acc=param_tmp.grad_fn.next_functions[0][0]#得到参数对应的梯度函数
grad_acc.register_hook(self._make_param_hook(param))#注册了hook
self.grad_accs.append(grad_acc)#统一管理梯度函数,其实就是bookkeeping作用

5.2.3 内存

MemoryBuffer 是内存抽象。

classMemoryBuffer:

def__init__(self,numel,dtype):
self.numel=numel
self.dtype=dtype
self.data=torch.zeros(self.numel,#初始化内存
dtype=self.dtype,
device=torch.cuda.current_device(),
requires_grad=False)


defzero(self):
"""Resetthebuffertozero."""
self.data.zero_()


defget(self,shape,start_index):
"""Returnatensorwiththeinput`shape`asaviewintothe
1-Ddatastartingat`start_index`."""
end_index=start_index+shape.numel()#定位到该张量在内存buffer之中的位置
assertend_index<= self.numel, 
            'requested tensor is out of the buffer range.'
        buffer_tensor = self.data[start_index:end_index] # 拿到内存
        buffer_tensor = buffer_tensor.view(shape)
        return buffer_tensor # 

5.2.4 支撑函数

下面是两个支撑函数,分别是用于拷贝梯度和将buffer清零。

def_make_param_hook(self,param):
"""Createtheall-reducehookforbackprop."""
#Hookusedforback-prop.
defparam_hook(*unused):
#Addthegradienttothebuffer.
ifparam.grad.dataisnotNone:
param.main_grad.add_(param.grad.data)#把梯度拷贝到连续内存之中
#Nowwecandeallocategradmemory.
param.grad=None
returnparam_hook

defzero_grad_buffer(self):
"""Setthegradbufferdatatozero.Needstobecalledatthe
beginingofeachiteration."""
assertself._grad_buffersisnotNone,'buffersarenotinitialized.'
for_,buffer_inself._grad_buffers.items():
buffer_.zero()

我们假定模型有6个参数,3个 fp32,3 个 fp16,所以被组合成两个连续内存 MemoryBuffer。

cd060c2e-70af-11ee-939d-92fbcf53809c.jpg

5.2.5 梯度规约

allreduce_gradients 是 DDP 对外提供的 API,在后面 train step 之中会调用到。

defallreduce_gradients(self):
"""Reducegradientsacrossdataparallelranks."""
#Ifwehavebuffers,simplyreducethedatainthebuffer.
ifself._grad_buffersisnotNone:
#连续内存
for_,buffer_inself._grad_buffers.items():#遍历各种类型的buffer
buffer_.data/=mpu.get_data_parallel_world_size()
torch.distributed.all_reduce(#统一归并
buffer_.data,group=mpu.get_data_parallel_group())
else:
#Otherwise,bucketizeandall-reduce
buckets={}#否则还是用桶来归并
#Packthebuckets.
forparaminself.module.parameters():#遍历梯度
ifparam.requires_gradandparam.gradisnotNone:
tp=param.data.type()
iftpnotinbuckets:
buckets[tp]=[]
buckets[tp].append(param)#同类型的梯度放到对应类型的桶之中
param.main_grad=param.grad

#Foreachbucket,all-reduceandcopyall-reducedgrads.
fortpinbuckets:
bucket=buckets[tp]
grads=[param.grad.dataforparaminbucket]#把桶里的梯度拿出来
coalesced=_flatten_dense_tensors(grads)#打平梯度
coalesced/=mpu.get_data_parallel_world_size()
torch.distributed.all_reduce(#归并
coalesced,group=mpu.get_data_parallel_group())
forbuf,syncedinzip(grads,_unflatten_dense_tensors(
coalesced,grads)):
buf.copy_(synced)

运行时候,分别对两种类型的连续内存做 AllReduce。

cd0f3dda-70af-11ee-939d-92fbcf53809c.jpg

0x06 训练

Pretrain 之中会调用 train 来进行训练。

ifargs.do_trainandargs.train_iters>0:
iteration=train(forward_step_func,
model,optimizer,lr_scheduler,
train_data_iterator,valid_data_iterator)

6.1 训练主体

train 是常规的套路,大家基本上按照名字就可以理解。

deftrain(forward_step_func,model,optimizer,lr_scheduler,
train_data_iterator,valid_data_iterator):
"""Trainthemodelfunction."""
args=get_args()
timers=get_timers()

#Writeargstotensorboard
write_args_to_tensorboard()

#Turnontrainingmodewhichenablesdropout.
formodel_moduleinmodel:
model_module.train()#

#Trackingloss.
total_loss_dict={}

#Iterations.
iteration=args.iteration

report_memory_flag=True
whileiteration< args.train_iters:
        update_num_microbatches(args.consumed_train_samples)
        loss_dict, skipped_iter, grad_norm, num_zeros_in_grad = 
            train_step(forward_step_func, # 训练
                       train_data_iterator,
                       model,
                       optimizer,
                       lr_scheduler)
        iteration += 1
        args.consumed_train_samples += mpu.get_data_parallel_world_size() * 
                                       args.micro_batch_size * 
                                       get_num_microbatches()

        # Logging.
        loss_scale = optimizer.get_loss_scale().item()
        params_norm = None
        if args.log_params_norm:
            params_norm = calc_params_l2_norm(model)
        report_memory_flag = training_log(loss_dict, total_loss_dict,
                                          optimizer.param_groups[0]['lr'],
                                          iteration, loss_scale,
                                          report_memory_flag, skipped_iter,
                                          grad_norm, params_norm, num_zeros_in_grad)

        # Autoresume
        if args.adlr_autoresume and 
           (iteration % args.adlr_autoresume_interval == 0):
            check_adlr_autoresume_termination(iteration, model, optimizer,
                                              lr_scheduler)

        # Evaluation
        if args.eval_interval and iteration % args.eval_interval == 0 and 
           args.do_valid:
            prefix = 'iteration {}'.format(iteration)
            evaluate_and_print_results(prefix, forward_step_func,
                                       valid_data_iterator, model,
                                       iteration, False)

        # Checkpointing
        saved_checkpoint = False
        if args.exit_signal_handler:
            signal_handler = get_signal_handler()
            if any(signal_handler.signals_received()):
                save_checkpoint_and_time(iteration, model, optimizer,
                                         lr_scheduler)
                sys.exit()

        if args.save and args.save_interval and 
           iteration % args.save_interval == 0:
            save_checkpoint_and_time(iteration, model, optimizer,
                                     lr_scheduler)
            saved_checkpoint = True

        # Exiting based on duration
        if args.exit_duration_in_mins:
            train_time = (time.time() - _TRAIN_START_TIME) / 60.0
            done_cuda = torch.cuda.IntTensor(
                [train_time >args.exit_duration_in_mins])
torch.distributed.all_reduce(
done_cuda,op=torch.distributed.ReduceOp.MAX)
done=done_cuda.item()
ifdone:
ifnotsaved_checkpoint:
save_checkpoint_and_time(iteration,model,optimizer,
lr_scheduler)
sys.exit()

#Exitingbasedoniterations
ifargs.exit_intervalanditeration%args.exit_interval==0:
ifnotsaved_checkpoint:
save_checkpoint_and_time(iteration,model,optimizer,
lr_scheduler)
torch.distributed.barrier()
sys.exit()

returniteration

6.2 训练step

train_step 会获取 get_forward_backward_func 得到 schedule,因为是流水线并行,所以需要 schedule 如何具体训练。

deftrain_step(forward_step_func,data_iterator,
model,optimizer,lr_scheduler):
"""Singletrainingstep."""
args=get_args()
timers=get_timers()

#Setgradtozero.
ifargs.DDP_impl=='local'andargs.use_contiguous_buffers_in_local_ddp:
forpartitioninmodel:
partition.zero_grad_buffer()
optimizer.zero_grad()

#获取训练schedule
forward_backward_func=get_forward_backward_func()
losses_reduced=forward_backward_func(#进行训练
forward_step_func,data_iterator,model,
optimizer,timers,forward_only=False)

#Emptyunusedmemory
ifargs.empty_unused_memory_level>=1:
torch.cuda.empty_cache()

#All-reduceifneeded.
ifargs.DDP_impl=='local':
formodel_moduleinmodel:
model_module.allreduce_gradients()

#All-reduceword_embeddings'gradacrossfirstandlaststagestoensure
#thatword_embeddingsparametersstayinsync.
#Thisshouldonlyrunformodelsthatsupportpipelinedmodelparallelism
#(BERTandGPT-2).
ifmpu.is_rank_in_embedding_group(ignore_virtual=True)and
mpu.get_pipeline_model_parallel_world_size()>1:
ifmpu.is_pipeline_first_stage(ignore_virtual=True):
unwrapped_model=model[0]
elifmpu.is_pipeline_last_stage(ignore_virtual=True):
unwrapped_model=model[-1]
else:#WedonotsupporttheinterleavedscheduleforT5yet.
unwrapped_model=model[0]
unwrapped_model=unwrap_model(
unwrapped_model,(torchDDP,LocalDDP,Float16Module))

ifunwrapped_model.share_word_embeddings:
word_embeddings_weight=unwrapped_model.word_embeddings_weight()
ifargs.DDP_impl=='local':
grad=word_embeddings_weight.main_grad
else:
grad=word_embeddings_weight.grad
torch.distributed.all_reduce(grad,group=mpu.get_embedding_group())

#Updateparameters.
update_successful,grad_norm,num_zeros_in_grad=optimizer.step()

#Updatelearningrate.
ifupdate_successful:
increment=get_num_microbatches()*
args.micro_batch_size*
args.data_parallel_size
lr_scheduler.step(increment=increment)
skipped_iter=0
else:
skipped_iter=1

#Emptyunusedmemory
ifargs.empty_unused_memory_level>=2:
torch.cuda.empty_cache()

ifmpu.is_pipeline_last_stage(ignore_virtual=True):
#Averagelossacrossmicrobatches.
loss_reduced={}
forkeyinlosses_reduced[0]:
losses_reduced_for_key=[x[key]forxinlosses_reduced]
loss_reduced[key]=sum(losses_reduced_for_key)/len(losses_reduced_for_key)
returnloss_reduced,skipped_iter,grad_norm,num_zeros_in_grad
return{},skipped_iter,grad_norm,num_zeros_in_grad

6.3 获取schedule

get_forward_backward_func 获取 pipeline 的schedule,这里分为 flush 和 interleaving 两种,我们后续会分析这两种schedule。

defget_forward_backward_func():
args=get_args()
ifmpu.get_pipeline_model_parallel_world_size()>1:
ifargs.virtual_pipeline_model_parallel_sizeisnotNone:
forward_backward_func=forward_backward_pipelining_with_interleaving
else:
forward_backward_func=forward_backward_pipelining_without_interleaving
else:
forward_backward_func=forward_backward_no_pipelining
returnforward_backward_func

训练逻辑大体拓展为:

cd23ad56-70af-11ee-939d-92fbcf53809c.jpg

至此,Megatron 基本架构分析完毕,下一篇我们介绍模型并行设置。

审核编辑:汤梓红

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • NVIDIA
    +关注

    关注

    14

    文章

    4595

    浏览量

    101724
  • 源码
    +关注

    关注

    8

    文章

    574

    浏览量

    28589
  • 模型
    +关注

    关注

    1

    文章

    2704

    浏览量

    47689
  • 语言模型
    +关注

    关注

    0

    文章

    435

    浏览量

    10047
  • pytorch
    +关注

    关注

    2

    文章

    762

    浏览量

    12836

原文标题:[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构

文章出处:【微信号:GiantPandaCV,微信公众号:GiantPandaCV】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    基于Transformer做大模型训练基本的并行范式

    在之前的内容中,我们已经介绍过流水线并行、数据并行(DP,DDP和ZeRO)。 今天我们将要介绍最重要,也是目前基于Transformer做大模型训练最基本的
    的头像 发表于 05-31 14:38 1711次阅读
    基于Transformer做大<b class='flag-5'>模型</b>预<b class='flag-5'>训练</b>基本的<b class='flag-5'>并行</b>范式

    分布式软件系统

    分布式软件系统分布式软件系统(Distributed Software Systems)是支持分布式处理的软件系统,是在由通信网络互联的多处理机体系结构上执行任务的系统。它包括分布式
    发表于 07-22 14:53

    《无线通信FPGA设计》分布式FIR的并行改写

    《无线通信FPGA设计》分布式FIR的并行改写,结果与matlab仿真结果基本吻合
    发表于 02-26 09:09

    Pytorch模型训练实用PDF教程【中文】

    模型部分?还是优化器?只有这样不断的通过可视化诊断你的模型,不断的对症下药,才能训练出一个较满意的模型。本教程内容及结构:本教程内容主要为在 Py
    发表于 12-21 09:18

    如何利用FPGA设计无线分布式采集系统?

    的选择无线分布式采集来进行。现有的无线分布式采集系统中,往往使用单片机、DSP等作为系统的主控控制单元。但是由于其自身工作特点,往往对于精确的定时控制以及并行处理能力上比FPGA弱。
    发表于 10-14 07:10

    分布式系统的优势是什么?

    当讨论分布式系统时,我们面临许多以下这些形容词所描述的 同类型: 分布式的、删络的、并行的、并发的和分散的。分布式处理是一个相对较新的领域,所以还没有‘致的定义。与顺序计算相比、
    发表于 03-31 09:01

    各种分布式电源的电气特性

    特性(主要包括电压V、电流I、有功P、无功Q)不同,需要的建模方式也有所不同。1.常见的分布式电源2.分布式电源建模燃料电池是电力电子变换器接口型的潮流计算模型,它在潮流计算里面可以使用pq,pq节点来进行处理。是吗?是pq吗?
    发表于 07-12 07:54

    分布式电源的相关资料推荐

    (1)含分布式电源的配电网日前两阶段优化调度模型,EI,如图 1—3matlab源代码,高水平文章,保证正确,可先发您文章看是否满足您的要求在电力市场环境下,供电公司通过对接入配电网的分布式电源
    发表于 12-29 06:33

    怎样使用PyTorch Hub去加载YOLOv5模型

    PyTorch Hub 加载预训练的 YOLOv5s 模型,model并传递图像进行推理。'yolov5s'是最轻最快的 YOLOv5 型号。有关所有可用模型的详细信息,请参阅自述文
    发表于 07-22 16:02

    超大Transformer语言模型分布式训练框架

    NVIDIA Megatron 是一个基于 PyTorch 的框架,用于训练基于 Transformer 架构的巨型语言模型。本系列文章将详细介绍M
    的头像 发表于 10-11 16:46 2275次阅读
    超大Transformer语言<b class='flag-5'>模型</b>的<b class='flag-5'>分布式</b><b class='flag-5'>训练</b>框架

    探究超大Transformer语言模型分布式训练框架

    模型的预训练计算。 上篇主要介绍了大模型训练的发展趋势、NVIDIA Megatron模型
    的头像 发表于 10-20 09:25 2130次阅读

    图解大模型系列之:Megatron源码解读1,分布式环境初始化

    使用Megatron训练gpt类大模型的项目有很多。在这个系列里,我选择了由THUDM开发的CodeGeeX项目,它是gpt在代码生成方向上的应用,对标于openAI的CodeX。github地址在此。
    的头像 发表于 06-06 15:22 4094次阅读
    图解大<b class='flag-5'>模型</b>系列之:<b class='flag-5'>Megatron</b>源码解读1,<b class='flag-5'>分布式</b>环境初始化

    图解大模型训练之:Megatron源码解读2,模型并行

    前文说过,用Megatron分布式训练的开源大模型有很多,我们选用的是THUDM开源的CodeGeeX(代码生成式大模型,类比于openA
    的头像 发表于 06-07 15:08 2613次阅读
    图解大<b class='flag-5'>模型</b><b class='flag-5'>训练</b>之:<b class='flag-5'>Megatron</b>源码解读2,<b class='flag-5'>模型</b><b class='flag-5'>并行</b>

    DeepSpeed结合Megatron-LM训练GPT2模型笔记

    本文基于DeepSpeedExamples仓库中给出的Megatron相关例子探索一下训练GPT2模型的流程。主要包含3个部分,第一个部分是基于原始的Megatron如何
    的头像 发表于 06-19 14:45 1940次阅读
    DeepSpeed结合<b class='flag-5'>Megatron</b>-LM<b class='flag-5'>训练</b>GPT2<b class='flag-5'>模型</b>笔记

    模型分布式训练并行技术(一)-概述

    数据并行是最常见的并行形式,因为它很简单。在数据并行训练中,数据集被分割成几个碎片,每个碎片被分配到一个设备上。这相当于沿批次(Batch)维度对
    的头像 发表于 08-24 15:17 664次阅读
    大<b class='flag-5'>模型</b><b class='flag-5'>分布式</b><b class='flag-5'>训练</b><b class='flag-5'>并行</b>技术(一)-概述