0x00 摘要
NVIDIA Megatron 是一個基于 PyTorch 的分布式訓(xùn)練框架,用來訓(xùn)練超大Transformer語言模型,其通過綜合應(yīng)用了數(shù)據(jù)并行,Tensor并行和Pipeline并行來復(fù)現(xiàn) GPT3,值得我們深入分析其背后機理。
本系列大概有6~7篇文章,通過論文和源碼和大家一起學(xué)習(xí)研究。本文將對 Megatron 的基本架構(gòu)做一下梳理。
0x01 啟動
1.1 分布式啟動
啟動腳本在 examples/pretrain_bert_distributed.sh,其利用了 torch.distributed.launch 來啟動多個進程。具體業(yè)務(wù)代碼是 pretrain_bert.py。
因為 GPUS_PER_NODE 是8,所以 nproc_per_node 是8,這樣,在本機上就啟動了8個進程,每個進程之中含有模型的一部分。進程的 rank 是被 torch.distributed.launch 調(diào)用 elastic 自動分配的。
#!/bin/bash GPUS_PER_NODE=8 #Changeformultinodeconfig MASTER_ADDR=localhost MASTER_PORT=6000 NNODES=1 NODE_RANK=0 WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) DATA_PATH=_text_sentence CHECKPOINT_PATH= DISTRIBUTED_ARGS="--nproc_per_node$GPUS_PER_NODE--nnodes$NNODES--node_rank$NODE_RANK--master_addr$MASTER_ADDR--master_port$MASTER_PORT" python-mtorch.distributed.launch$DISTRIBUTED_ARGS pretrain_bert.py --num-layers24 --hidden-size1024 --num-attention-heads16 --micro-batch-size4 --global-batch-size32 --seq-length512 --max-position-embeddings512 --train-iters1000000 --save$CHECKPOINT_PATH --load$CHECKPOINT_PATH --data-path$DATA_PATH --vocab-filebert-vocab.txt --data-implmmap --split949,50,1 --distributed-backendnccl --lr0.0001 --lr-decay-stylelinear --min-lr1.0e-5 --lr-decay-iters990000 --weight-decay1e-2 --clip-grad1.0 --lr-warmup-fraction.01 --log-interval100 --save-interval10000 --eval-interval1000 --eval-iters10 --fp16
1.2 構(gòu)造基礎(chǔ)
pretrain_bert.py 會調(diào)用 pretrain 進行預(yù)訓(xùn)練。
if__name__=="__main__": pretrain(train_valid_test_datasets_provider,model_provider, ModelType.encoder_or_decoder, forward_step,args_defaults={'tokenizer_type':'BertWordPieceLowerCase'})
1.2.1 獲取模型
model_provider返回模型普通版本(vanilla version)。所謂vanilla,我們指的是一個簡單的cpu模型,沒有 fp16或 ddp,但是已經(jīng)被 Megatron 改造為并行的版本。
defmodel_provider(pre_process=True,post_process=True): """Buildthemodel.""" print_rank_0('buildingBERTmodel...') args=get_args() num_tokentypes=2ifargs.bert_binary_headelse0 model=BertModel( num_tokentypes=num_tokentypes, add_binary_head=args.bert_binary_head, parallel_output=True, pre_process=pre_process, post_process=post_process) returnmodel
1.2.2 獲取數(shù)據(jù)集
train_valid_test_datasets_provider 會接受train/valid/test數(shù)據(jù)集的大小,并返回 “train,valid,test” 數(shù)據(jù)集。
deftrain_valid_test_datasets_provider(train_val_test_num_samples): """Buildtrain,valid,andtestdatasets.""" args=get_args() print_rank_0('>buildingtrain,validation,andtestdatasets' 'forBERT...') train_ds,valid_ds,test_ds=build_train_valid_test_datasets( data_prefix=args.data_path, data_impl=args.data_impl, splits_string=args.split, train_valid_test_num_samples=train_val_test_num_samples, max_seq_length=args.seq_length, masked_lm_prob=args.mask_prob, short_seq_prob=args.short_seq_prob, seed=args.seed, skip_warmup=(notargs.mmap_warmup), binary_head=args.bert_binary_head) print_rank_0(">finishedcreatingBERTdatasets...") returntrain_ds,valid_ds,test_ds
1.2.3 步進函數(shù)
forward_step函數(shù)接受一個“數(shù)據(jù)迭代器”和“模型”,并返回一個“l(fā)oss”標量,該標量帶有一個字典,其中key:value是希望在訓(xùn)練期間監(jiān)視的信息,例如“l(fā)m loss:value”。還要求此函數(shù)將“batch generator”添加到timers類中。
defforward_step(data_iterator,model): """Forwardstep.""" args=get_args() #Getthebatch. tokens,types,sentence_order,loss_mask,lm_labels,padding_mask=get_batch( data_iterator) ifnotargs.bert_binary_head: types=None #Forwardpassthroughthemodel. output_tensor=model(tokens,padding_mask,tokentype_ids=types, lm_labels=lm_labels) returnoutput_tensor,partial(loss_func,loss_mask,sentence_order)
1.2.3.1 廣播數(shù)據(jù)
forward_step 會調(diào)用 get_batch 獲取batch 數(shù)據(jù),其內(nèi)部會從迭代器獲取數(shù)據(jù),然后使用broadcast_data函數(shù)把輸入數(shù)據(jù)從 rank 0 廣播到所有tensor-model-parallel 其他 ranks之上。
注意,數(shù)據(jù)并行是把不同數(shù)據(jù)加載到不同的rank之上,而 Tensor模型并行組之中每個rank都加載同樣數(shù)據(jù)。
defget_batch(data_iterator): """Buildthebatch.""" #Itemsandtheirtype. keys=['text','types','labels','is_random','loss_mask','padding_mask'] datatype=torch.int64 #Broadcastdata. ifdata_iteratorisnotNone: data=next(data_iterator)#獲取數(shù)據(jù) else: data=None data_b=mpu.broadcast_data(keys,data,datatype)#把數(shù)據(jù)廣播到各個GPU #Unpack. tokens=data_b['text'].long() types=data_b['types'].long() sentence_order=data_b['is_random'].long() loss_mask=data_b['loss_mask'].float() lm_labels=data_b['labels'].long() padding_mask=data_b['padding_mask'].long() returntokens,types,sentence_order,loss_mask,lm_labels,padding_mask
broadcast_data 在每個model parallel group之上,把數(shù)據(jù)從rank 0發(fā)送到同組其他成員。
defbroadcast_data(keys,data,datatype): """Broadcastdatafromrankzeroofeachmodelparallelgrouptothe membersofthesamemodelparallelgroup. Arguments: keys:listofkeysinthedatadisctionarytobebroadcasted data:datadictionaryofstringkeysandcputensorvalues. datatype:torchdatatypeofalltensorsindataassociated withkeys. """ #Build(key,size)and(key,numberofelements)dictionariesalong #withthetotalnumberofelementsonallranks. key_size,key_numel,total_numel=_build_key_size_numel_dictionaries(keys, data) #Packonrankzero. ifget_tensor_model_parallel_rank()==0:#rank0才壓縮 #Checkthatallkeyshavethesamedatatype. _check_data_types(keys,data,datatype) #Flattenthedataassociatedwiththekeys flatten_data=torch.cat( [data[key].contiguous().view(-1)forkeyinkeys],dim=0).cuda() else: flatten_data=torch.empty(total_numel, device=torch.cuda.current_device(), dtype=datatype) #Broadcast torch.distributed.broadcast(flatten_data,get_tensor_model_parallel_src_rank(), group=get_tensor_model_parallel_group()) #Unpack output={} offset=0 forkeyinkeys: size=key_size[key] numel=key_numel[key] output[key]=flatten_data.narrow(0,offset,numel).view(size) offset+=numel returnoutput
get_tensor_model_parallel_src_rank 計算與張量模型并行組中第一個local rank對應(yīng)的全局rank。
defget_tensor_model_parallel_src_rank(): """Calculatetheglobalrankcorrespondingtothefirstlocalrank inthetensormodelparallelgroup.""" global_rank=torch.distributed.get_rank() local_world_size=get_tensor_model_parallel_world_size() return(global_rank//local_world_size)*local_world_size
邏輯圖具體如下,三個不同的函數(shù)分別為預(yù)訓(xùn)練提供不同的功能輸入,做到了解耦。
0x02 Pretrain
BERT訓(xùn)練主要分為兩步:
Pre-train:pre-train是遷移學(xué)習(xí)的基礎(chǔ),是訓(xùn)練token-level的語義理解。
Fine-tuning:在已經(jīng)訓(xùn)練好的語言模型基礎(chǔ)之上,加入特定領(lǐng)域(比如金融醫(yī)療)的參數(shù)來重新訓(xùn)練,比如對于分類問題就可以在pre-train模型基礎(chǔ)之上加上一個softmax,再使用語料 fine-tune。
Pre-train 主要如下:
初始化Megatron。
使用model_provider設(shè)置模型、優(yōu)化器和lr計劃。
調(diào)用train_val_test_data_provider以獲取train/val/test數(shù)據(jù)集。
使用forward_step_func訓(xùn)練模型。
具體代碼如下:
defpretrain(train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, extra_args_provider=None, args_defaults={}): """Maintrainingprogram. Thisfunctionwillrunthefollowingsintheorderprovided: 1)initializeMegatron. 2)setupmodel,optimizerandlrscheduleusingthemodel_provider. 3)calltrain_val_test_data_providertogettrain/val/testdatasets. 4)trainthemodleusingtheforward_step_func. """ #Initalizeandgetarguments,timers,andTensorboardwriter. initialize_megatron(extra_args_provider=extra_args_provider, args_defaults=args_defaults) #Adjustthestartuptimesoitreflectsthelargestvalue. #Thiswillbeclosertowhatschedulerwillsee(outsideof #image...launches. global_TRAIN_START_TIME start_time_tensor=torch.cuda.DoubleTensor([_TRAIN_START_TIME]) torch.distributed.all_reduce(start_time_tensor, op=torch.distributed.ReduceOp.MIN) _TRAIN_START_TIME=start_time_tensor.item() args=get_args() timers=get_timers() #Model,optimizer,andlearningrate.使用model_provider設(shè)置模型、優(yōu)化器和lr計劃 model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider, model_type) #Datastuff.調(diào)用train_val_test_data_provider以獲取train/val/測試數(shù)據(jù)集 ifargs.virtual_pipeline_model_parallel_sizeisnotNone: all_data_iterators=[ build_train_valid_test_data_iterators(train_valid_test_dataset_provider) for_inrange(len(model)) ] train_data_iterator=[data_iterators[0]fordata_iteratorsinall_data_iterators] valid_data_iterator=[data_iterators[1]fordata_iteratorsinall_data_iterators] test_data_iterator=[data_iterators[2]fordata_iteratorsinall_data_iterators] else: train_data_iterator,valid_data_iterator,test_data_iterator =build_train_valid_test_data_iterators( train_valid_test_dataset_provider) iteration=0 ifargs.do_trainandargs.train_iters>0: iteration=train(forward_step_func,#訓(xùn)練模型 model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator) ifargs.do_valid: prefix='theendoftrainingforvaldata' evaluate_and_print_results(prefix,forward_step_func, valid_data_iterator,model, iteration,False) ifargs.saveanditeration!=0: save_checkpoint(iteration,model,optimizer,lr_scheduler) ifargs.do_test: #Runontestdata. prefix='theendoftrainingfortestdata' evaluate_and_print_results(prefix,forward_step_func, test_data_iterator,model, 0,True)
對于我們分析來說,initialize_megatron 是重點,這里初始化了 megatron。
0x03 初始化
3.1 initialize_megatron
initialize_megatron 方法會設(shè)置全局變量,初始化分布式環(huán)境等等。
definitialize_megatron(extra_args_provider=None,args_defaults={}, ignore_unknown_args=False,allow_no_cuda=False): """Setglobalvariables,initializedistributed,and setautoresumeandrandomseeds. `allow_no_cuda`shouldnotbesetunlessusingmegatronforcpuonly dataprocessing.Ingeneralthisargshouldnotbesetunlessyouknow whatyouaredoing. Returnsafunctiontofinalizedistributedenvinitialization (optionally,onlywhenargs.lazy_mpu_init==True) """ ifnotallow_no_cuda: #Makesurecudaisavailable. asserttorch.cuda.is_available(),'MegatronrequiresCUDA.' #Parseargs,buildtokenizer,andsetadlr-autoresume, #tensorboard-writer,andtimers. set_global_variables(extra_args_provider=extra_args_provider,#設(shè)置全局變量 args_defaults=args_defaults, ignore_unknown_args=ignore_unknown_args) #torch.distributedinitialization deffinish_mpu_init(): args=get_args() #Pytorchdistributed. _initialize_distributed()#設(shè)置分布式 #Randomseedsforreproducibility. ifargs.rank==0: print('>settingrandomseedsto{}...'.format(args.seed)) _set_random_seed(args.seed) #SetpytorchJITlayerfusionoptions. _set_jit_fusion_options() args=get_args() ifargs.lazy_mpu_init: args.use_cpu_initialization=True #delayedinitializationofDDP-relatedstuff #WeonlysetbasicDDPglobals set_tensor_model_parallel_world_size(args.tensor_model_parallel_size) #andreturnfunctionforexternalDDPmanager #tocallwhenithasDDPinitialized set_tensor_model_parallel_rank(args.rank) returnfinish_mpu_init else: #Megatron'sMPUisthemaster.Completeinitializationrightaway. finish_mpu_init() #Autoresume. _init_autoresume() #Compiledependencies. _compile_dependencies() #Nocontinuationfunction returnNone
3.2 初始化分布式環(huán)境
_initialize_distributed 代碼位于 megatron/initialize.py,此方法會:
調(diào)用 torch.distributed.init_process_group 初始化分布式環(huán)境。
調(diào)用 mpu.initialize_model_parallel 來設(shè)置模型并行,數(shù)據(jù)并行等各種進程組,我們下文會重點討論。
創(chuàng)建完worker進程之后,程序需要知道哪些進程在訓(xùn)練同一個模型,torch.distributed.init_process_group 就實現(xiàn)了這個功能。torch.distributed.init_process_group 會生成一個進程組,同組內(nèi)進程訓(xùn)練同一個模型,也能確定用什么方式進行通信。進程組會給組內(nèi)每個進程一個序號,就是gloabl rank,如果是多機并行,每個機器創(chuàng)建的進程之間也有一個序號,就是 local rank。如果是單機多卡并行,local rank 和 global rank是一致的。
def_initialize_distributed(): """Initializetorch.distributedandmpu.""" args=get_args() device_count=torch.cuda.device_count() iftorch.distributed.is_initialized(): args.rank=torch.distributed.get_rank() args.world_size=torch.distributed.get_world_size() else: #Manuallysetthedeviceids. ifdevice_count>0: device=args.rank%device_count ifargs.local_rankisnotNone: assertargs.local_rank==device, 'expectedlocal-ranktobethesameasrank%device-count.' else: args.local_rank=device torch.cuda.set_device(device) #Calltheinitprocess torch.distributed.init_process_group(#初始化PyTorch分布式環(huán)境 backend=args.distributed_backend, world_size=args.world_size,rank=args.rank, timeout=timedelta(minutes=10)) #Setthetensormodel-parallel,pipelinemodel-parallel,and #data-parallelcommunicators. ifdevice_count>0: ifmpu.model_parallel_is_initialized(): print('modelparallelisalreadyinitialized') else: #初始化模型并行,比如設(shè)置各種進程組 mpu.initialize_model_parallel(args.tensor_model_parallel_size, args.pipeline_model_parallel_size, args.virtual_pipeline_model_parallel_size, args.pipeline_model_parallel_split_rank)
3.3 初始化進程組全局變量
因為調(diào)用了 mpu.initialize_model_parallel 來設(shè)置模型并行,數(shù)據(jù)并行等各種進程組,所以我們假定目前進程組都已經(jīng)設(shè)置成功,所以每個 rank 對應(yīng)的進程都有自己的全局變量。假定目前有16個GPU,屬于兩個node,rank 0 ~7 屬于第一個節(jié)點,rank 8 ~ 15 屬于第二個節(jié)點。下面的 gi 指的是第 i 個 GPU。
_TENSOR_MODEL_PARALLEL_GROUP :當(dāng)前 rank 所屬于的Intra-layer model parallel group,就是tensor 并行進程組。
假如每一層分為兩個tensor,則 _TENSOR_MODEL_PARALLEL_GROUP 例子為:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。
_PIPELINE_MODEL_PARALLEL_GROUP :當(dāng)前 rank 所屬于的Intra-layer model parallel group,就是流水線進程組。
假如流水線深度為4,則例子為 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。
_MODEL_PARALLEL_GROUP :當(dāng)前 rank 所屬于的模型并行進程組,包括了以上兩組。
針對我們例子,就是完整模型被復(fù)制了兩份,兩份分別對應(yīng)的 GPU 具體是[0, 1, 4, 5, 8, 9, 12, 13],[2, 3, 6, 7, 10, 11, 14, 15]
_EMBEDDING_GROUP :嵌入對應(yīng)的進程組。
_DATA_PARALLEL_GROUP :當(dāng)前 rank 所屬于的Data parallel group。
假如數(shù)據(jù)并行度數(shù)為2,則例子為[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。
#Intra-layermodelparallelgroupthatthecurrentrankbelongsto. _TENSOR_MODEL_PARALLEL_GROUP=None #Inter-layermodelparallelgroupthatthecurrentrankbelongsto. _PIPELINE_MODEL_PARALLEL_GROUP=None #Modelparallelgroup(bothintra-andpipeline)thatthecurrentrankbelongsto. _MODEL_PARALLEL_GROUP=None #Embeddinggroup. _EMBEDDING_GROUP=None #Dataparallelgroupthatthecurrentrankbelongsto. _DATA_PARALLEL_GROUP=None
0x04 設(shè)置模型
在 Pretrain 之中,會調(diào)用如下來設(shè)置模型,優(yōu)化器等等。
#Model,optimizer,andlearningrate.使用model_provider設(shè)置模型、優(yōu)化器和lr計劃 model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider, model_type)
4.1 setup_model_and_optimizer
setup_model_and_optimizer 方法會設(shè)置模型和優(yōu)化器,其中重點是get_model。
defsetup_model_and_optimizer(model_provider_func,model_type): """Setupmodelandoptimizer.""" args=get_args() model=get_model(model_provider_func,model_type) unwrapped_model=unwrap_model(model, (torchDDP,LocalDDP,Float16Module)) optimizer=get_megatron_optimizer(unwrapped_model) lr_scheduler=get_learning_rate_scheduler(optimizer) ifargs.loadisnotNone: timers=get_timers() #Extrabarrierisaddedtomakesureallranksreportthe #maxtime. torch.distributed.barrier() args.iteration=load_checkpoint(model,optimizer,lr_scheduler) torch.distributed.barrier() else: args.iteration=0 #WeonlysupportlocalDDPwithmultiplemicro-batches. iflen(model)>1ormpu.get_pipeline_model_parallel_world_size()>1: assertargs.DDP_impl=='local' #getmodelwithoutFP16and/orTorchDDPwrappers ifargs.iteration==0andlen(unwrapped_model)==1 andhasattr(unwrapped_model[0],'init_state_dict_from_bert'): unwrapped_model[0].init_state_dict_from_bert() ifargs.fp16: optimizer.reload_model_params() returnmodel,optimizer,lr_scheduler
4.2 模型
4.2.1 BertModel
我們首先看看 BertModel 的初始化函數(shù),略過其他功能函數(shù)。其主要調(diào)用了 get_language_model。
classBertModel(MegatronModule): """BertLanguagemodel.""" def__init__(self, num_tokentypes=2, add_binary_head=True, parallel_output=True, pre_process=True, post_process=True): super(BertModel,self).__init__() args=get_args() self.fp16_lm_cross_entropy=args.fp16_lm_cross_entropy self.add_binary_head=add_binary_head self.parallel_output=parallel_output self.pre_process=pre_process self.post_process=post_process init_method=init_method_normal(args.init_method_std) scaled_init_method=scaled_init_method_normal(args.init_method_std, args.num_layers) #獲取語言模型 self.language_model,self._language_model_key=get_language_model( num_tokentypes=num_tokentypes, add_pooler=self.add_binary_head, encoder_attn_mask_type=AttnMaskType.padding, init_method=init_method, scaled_init_method=scaled_init_method, pre_process=self.pre_process, post_process=self.post_process) self.initialize_word_embeddings(init_method_normal) ifself.post_process:#如果是最后一層,會特殊處理 self.lm_head=BertLMHead( self.word_embeddings_weight().size(0), args.hidden_size,init_method,args.layernorm_epsilon,parallel_output) self._lm_head_key='lm_head' self.binary_head=None ifself.add_binary_head: self.binary_head=get_linear_layer(args.hidden_size,2, init_method) self._binary_head_key='binary_head'
4.2.2 語言模型
get_language_model 會獲取一個 TransformerLanguageModel。
defget_language_model(num_tokentypes,add_pooler, encoder_attn_mask_type,init_method=None, scaled_init_method=None,add_encoder=True, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, pre_process=True,post_process=True): """Buildlanguagemodelandreturnalongwiththekeytosave.""" args=get_args() ifinit_methodisNone: init_method=init_method_normal(args.init_method_std) ifscaled_init_methodisNone: scaled_init_method=scaled_init_method_normal(args.init_method_std, args.num_layers) #Languagemodel. language_model=TransformerLanguageModel( init_method, scaled_init_method, encoder_attn_mask_type, num_tokentypes=num_tokentypes, add_encoder=add_encoder, add_decoder=add_decoder, decoder_attn_mask_type=decoder_attn_mask_type, add_pooler=add_pooler, pre_process=pre_process, post_process=post_process ) #keyusedforcheckpoints. language_model_key='language_model' returnlanguage_model,language_model_key
TransformerLanguageModel 就是具體的語言模型,其中重要的是 ParallelTransformer。這里會依據(jù)傳入的配置來進行生成。
如果是第一層,即有 pre_process,則會加入 embedding layer。
如果是中間層,則會根據(jù) encoder 還是 decoder 來生成對應(yīng)的 ParallelTransformer。
如果是最后一層,即有 post_process,則會加入 Pooler,在外層 BertModel 也會有對應(yīng)處理。
classTransformerLanguageModel(MegatronModule): """Transformerlanguagemodel. Arguments: transformer_hparams:transformerhyperparameters vocab_size:vocabularysize max_sequence_length:maximumsizeofsequence.This isusedforpositionalembedding embedding_dropout_prob:dropoutprobabilityforembeddings num_tokentypes:sizeofthetoken-typeembeddings.0value willignorethisembedding """ def__init__(self, init_method, output_layer_init_method, encoder_attn_mask_type, num_tokentypes=0, add_encoder=True, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, add_pooler=False, pre_process=True, post_process=True): super(TransformerLanguageModel,self).__init__() args=get_args() self.pre_process=pre_process self.post_process=post_process self.hidden_size=args.hidden_size self.num_tokentypes=num_tokentypes self.init_method=init_method self.add_encoder=add_encoder self.encoder_attn_mask_type=encoder_attn_mask_type self.add_decoder=add_decoder self.decoder_attn_mask_type=decoder_attn_mask_type self.add_pooler=add_pooler self.encoder_hidden_state=None #Embeddings. ifself.pre_process: self.embedding=Embedding(self.hidden_size, args.padded_vocab_size, args.max_position_embeddings, args.hidden_dropout, self.init_method, self.num_tokentypes) self._embedding_key='embedding' #Transformer. #Encoder(usuallysettoTrue,Falseifpartofanencoder-decoder #architectureandinencoder-onlystage). ifself.add_encoder: self.encoder=ParallelTransformer( self.init_method, output_layer_init_method, self_attn_mask_type=self.encoder_attn_mask_type, pre_process=self.pre_process, post_process=self.post_process ) self._encoder_key='encoder' else: self.encoder=None #Decoder(usuallysettoFalse,Trueifpartofanencoder-decoder #architectureandindecoder-onlystage). ifself.add_decoder: #Temporaryassertionuntilweverifycorrectnessofpipelineparallelism #implementationofT5. self.decoder=ParallelTransformer( self.init_method, output_layer_init_method, layer_type=LayerType.decoder, self_attn_mask_type=self.decoder_attn_mask_type, pre_process=self.pre_process, post_process=self.post_process) self._decoder_key='decoder' else: self.decoder=None ifself.post_process: #Pooler. ifself.add_pooler: self.pooler=Pooler(self.hidden_size,self.init_method) self._pooler_key='pooler'
4.2.3 ParallelTransformer
這里會調(diào)用 ParallelTransformerLayer 生成具體的 Transformer層,我們會在后文中進行分析。
即,ParallelTransformer 包括多個 Transformer,其中每層 Transformer 是一個 ParallelTransformerLayer。
classParallelTransformer(MegatronModule): """Transformerclass.""" def__init__(self,init_method,output_layer_init_method, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding, pre_process=True,post_process=True): super(ParallelTransformer,self).__init__() args=get_args() self.bf16=args.bf16 self.fp32_residual_connection=args.fp32_residual_connection self.pre_process=pre_process self.post_process=post_process self.input_tensor=None #Storeactivationcheckpoitingflag. self.activations_checkpoint_method=args.activations_checkpoint_method self.activations_checkpoint_num_layers=args.activations_checkpoint_num_layers self.distribute_checkpointed_activations=args.distribute_checkpointed_activations #Numberoflayers. self.num_layers=mpu.get_num_layers(#獲得本Transformer的具體層數(shù) args,args.model_type==ModelType.encoder_and_decoder) #Transformerlayers. defbuild_layer(layer_number): returnParallelTransformerLayer(#返回一層Transformmer init_method, output_layer_init_method, layer_number, layer_type=layer_type, self_attn_mask_type=self_attn_mask_type) ifargs.virtual_pipeline_model_parallel_sizeisnotNone: #Numberoflayersineachmodelchunkisthenumberoflayersinthestage, #dividedbythenumberofmodelchunksinastage. self.num_layers=self.num_layers//args.virtual_pipeline_model_parallel_size #With8layers,2stages,and4modelchunks,wewantanassignmentof #layerstostageslike(eachlistisamodelchunk): #Stage0:[0][2][4][6] #Stage1:[1][3][5][7] #With8layers,2stages,and2virtualstages,wewantanassignmentof #layerstostageslike(eachlistisamodelchunk): #Stage0:[0,1][4,5] #Stage1:[2,3][6,7] offset=mpu.get_virtual_pipeline_model_parallel_rank()*( args.num_layers//args.virtual_pipeline_model_parallel_size)+ (mpu.get_pipeline_model_parallel_rank()*self.num_layers) else: #Eachstagegetsacontiguoussetoflayers. offset=mpu.get_pipeline_model_parallel_rank()*self.num_layers self.layers=torch.nn.ModuleList(#生成num_layers個Transformer [build_layer(i+1+offset)foriinrange(self.num_layers)]) ifself.post_process: #Finallayernormbeforeoutput. self.final_layernorm=LayerNorm( args.hidden_size, eps=args.layernorm_epsilon, no_persist_layer_norm=args.no_persist_layer_norm)
目前邏輯如下,我們假定有兩個 transformer:
4.2.3.1 獲取層數(shù)
這里一個重點就是獲取層數(shù),即獲取本模型在并行處理狀況下,應(yīng)該擁有多少層。如果模型一共64層,流水線深度為16,則并行每個階段有4層,則本子模型擁有4層。
defget_num_layers(args,is_encoder_and_decoder_model): """Computethenumberoftransformerlayersresidentonthecurrentrank.""" ifget_pipeline_model_parallel_world_size()>1: ifis_encoder_and_decoder_model: assertargs.pipeline_model_parallel_split_rankisnotNone num_ranks_in_encoder=args.pipeline_model_parallel_split_rank num_ranks_in_decoder=get_pipeline_model_parallel_world_size()-num_ranks_in_encoder ifis_pipeline_stage_before_split(): num_layers=args.num_layers//num_ranks_in_encoder else: num_layers=args.num_layers//num_ranks_in_decoder else: num_layers=args.num_layers//get_pipeline_model_parallel_world_size() else: num_layers=args.num_layers returnnum_layers
get_pipeline_model_parallel_world_size 獲取本流水線組world size數(shù)目,就是流水線深度。
defget_pipeline_model_parallel_world_size(): """Returnworldsizeforthepipelinemodelparallelgroup.""" global_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE if_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZEisnotNone: return_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE returntorch.distributed.get_world_size(group=get_pipeline_model_parallel_group())
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE 的意思是流水線深度 p,就是縱向切 p-1刀。比如一共 12 層,縱向切 5 刀,則有 6 個stage,每個 stage 有 2 層。
4.2.3.2 前向傳播
我們接著看看其前向傳播函數(shù),這里主要就是調(diào)用內(nèi)部 ParallelTransformerLayer 的 forward 方法,如果是第一層或者最后一層,則做特殊處理。
defforward(self,hidden_states,attention_mask, encoder_output=None,enc_dec_attn_mask=None, inference_params=None): ifself.pre_process: #Dataformatchangetoavoidexplicittranposes:[bsh]-->[sbh]. #Iftheinputflagforfp32residualconnectionisset,convertforfloat. ifself.fp32_residual_connection: hidden_states=hidden_states.transpose(0,1).contiguous().float() #Otherwise,leaveitasis. else: hidden_states=hidden_states.transpose(0,1).contiguous() else: #Seeset_input_tensor() hidden_states=self.input_tensor ifencoder_outputisnotNone: encoder_output=encoder_output.transpose(0,1).contiguous() ifself.activations_checkpoint_methodisnotNone: hidden_states=self._checkpointed_forward(hidden_states, attention_mask, encoder_output, enc_dec_attn_mask) else: forindexinrange(self.num_layers): layer=self._get_layer(index) hidden_states=layer(#調(diào)用ParallelTransformerLayer的forward函數(shù) hidden_states, attention_mask, encoder_output=encoder_output, enc_dec_attn_mask=enc_dec_attn_mask, inference_params=inference_params) #Finallayernorm. ifself.post_process: #Revertingdataformatchange[sbh]-->[bsh]. hidden_states=hidden_states.transpose(0,1).contiguous() output=self.final_layernorm(hidden_states) else: output=hidden_states returnoutput
4.3 get_model
現(xiàn)在讓我們回到 get_model,把生成模型的流程整理出來。
BERT之中含有多個transformer,所以直接按照層數(shù)切分,每一層是一模一樣的transformer layer。前面提到了,在我們樣例之中啟動了8個進程,每個進程里面有一個子模型,即原始BERT模型的部分層。但是怎么知道每個子模型包含了多少層?答案是:因為已經(jīng)建立了各種進程組,所以 get_model 方法會依據(jù)目前進程組情況進行處理。單個進程內(nèi)模型獲取如下:
如果是有 virtual 設(shè)置,則會遍歷 virtual size,生成對應(yīng)數(shù)目的模型(BertModel)。
否則如果是 encoder_and_decoder,則針對split進行配置。
設(shè)置 tensor model parallel 屬性。
把本模型放置到GPU之上。
如果需要數(shù)據(jù)并行,則配置DDP。
具體代碼如下:
defget_model(model_provider_func,model_type=ModelType.encoder_or_decoder,wrap_with_ddp=True): """Buildthemodel.""" args=get_args() args.model_type=model_type #Buildmodel. ifmpu.get_pipeline_model_parallel_world_size()>1and args.virtual_pipeline_model_parallel_sizeisnotNone:#有virtual設(shè)置,后續(xù)會提到 model=[] foriinrange(args.virtual_pipeline_model_parallel_size):#遍歷virtual #設(shè)置rank,主要是為了看是不是第一層,最后一層 mpu.set_virtual_pipeline_model_parallel_rank(i) #Setpre_processandpost_processonlyaftervirtualrankisset. pre_process=mpu.is_pipeline_first_stage() post_process=mpu.is_pipeline_last_stage() this_model=model_provider_func(#獲取原始模型BertModel pre_process=pre_process, post_process=post_process ) this_model.model_type=model_type model.append(this_model)#模型列表之中添加一個新的BertModel else: pre_process=mpu.is_pipeline_first_stage()#是不是第一層 post_process=mpu.is_pipeline_last_stage()#是不是最后一層 add_encoder=True add_decoder=True ifmodel_type==ModelType.encoder_and_decoder: ifmpu.get_pipeline_model_parallel_world_size()>1: rank=mpu.get_pipeline_model_parallel_rank() split_rank=args.pipeline_model_parallel_split_rank world_size=mpu.get_pipeline_model_parallel_world_size() pre_process=rank==0orrank==split_rank#是不是第一層 post_process=(rank==(split_rank-1))or(#是不是最后一層 rank==(world_size-1)) add_encoder=mpu.is_pipeline_stage_before_split() add_decoder=mpu.is_pipeline_stage_after_split() model=model_provider_func(#獲取原始模型 pre_process=pre_process, post_process=post_process, add_encoder=add_encoder, add_decoder=add_decoder) else: model=model_provider_func(#獲取原始模型 pre_process=pre_process, post_process=post_process ) model.model_type=model_type ifnotisinstance(model,list): model=[model] #Settensormodelparallelattributesifnotset. #Onlyparametersthatarealreadytensormodelparallelhavethese #attributessetforthem.Weshouldmakesurethedefaultattributes #aresetforallparamssotheoptimizercanusethem. formodel_moduleinmodel: forparaminmodel_module.parameters(): mpu.set_defaults_if_not_set_tensor_model_parallel_attributes(param) #GPUallocation. formodel_moduleinmodel:#把本模型放置到GPU之上 model_module.cuda(torch.cuda.current_device()) #Fp16conversion. ifargs.fp16orargs.bf16: model=[Float16Module(model_module,args)formodel_moduleinmodel] ifwrap_with_ddp:#如果需要數(shù)據(jù)并行,則配置DDP ifargs.DDP_impl=='torch': i=torch.cuda.current_device() model=[torchDDP(model_module,device_ids=[i],output_device=i, process_group=mpu.get_data_parallel_group()) formodel_moduleinmodel] elifargs.DDP_impl=='local': model=[LocalDDP(model_module, args.accumulate_allreduce_grads_in_fp32, args.use_contiguous_buffers_in_local_ddp) formodel_moduleinmodel] else: raiseNotImplementedError('UnknownDDPimplementationspecified:' '{}.Exiting.'.format(args.DDP_impl)) returnmodel
單個進程內(nèi)的邏輯大致如下,這里 torchDDP 的意思是把 BertModel 之中的 module 用 torchDDP 來封裝。
0x05 數(shù)據(jù)并行
5.1 設(shè)置數(shù)據(jù)
build_train_valid_test_data_iterators 方法會對數(shù)據(jù)進行處理,提供了 train,valid,test 三種不同的數(shù)據(jù)集。
defbuild_train_valid_test_data_iterators( build_train_valid_test_datasets_provider): """XXX""" args=get_args() (train_dataloader,valid_dataloader,test_dataloader)=(None,None,None) #Backwardcompatibility,assumefixedbatchsize. ifargs.iteration>0andargs.consumed_train_samples==0: args.consumed_train_samples=args.iteration*args.global_batch_size ifargs.iteration>0andargs.consumed_valid_samples==0: ifargs.train_samplesisNone: args.consumed_valid_samples=(args.iteration//args.eval_interval)* args.eval_iters*args.global_batch_size #Dataloaderonlyonrank0ofeachmodelparallelgroup. ifmpu.get_tensor_model_parallel_rank()==0: #Numberoftrain/valid/testsamples. ifargs.train_samples: train_samples=args.train_samples else: train_samples=args.train_iters*args.global_batch_size eval_iters=(args.train_iters//args.eval_interval+1)* args.eval_iters test_iters=args.eval_iters train_val_test_num_samples=[train_samples, eval_iters*args.global_batch_size, test_iters*args.global_batch_size] #Buildthedatasets. train_ds,valid_ds,test_ds=build_train_valid_test_datasets_provider( train_val_test_num_samples) #Builddataloders. train_dataloader=build_pretraining_data_loader( train_ds,args.consumed_train_samples) valid_dataloader=build_pretraining_data_loader( valid_ds,args.consumed_valid_samples) test_dataloader=build_pretraining_data_loader(test_ds,0) #Flagstoknowifweneedtodotraining/validation/testing. do_train=train_dataloaderisnotNoneandargs.train_iters>0 do_valid=valid_dataloaderisnotNoneandargs.eval_iters>0 do_test=test_dataloaderisnotNoneandargs.eval_iters>0 #Needtobroadcastnum_tokensandnum_type_tokens. flags=torch.cuda.LongTensor( [int(do_train),int(do_valid),int(do_test)]) else: flags=torch.cuda.LongTensor([0,0,0]) #Broadcastnumtokens. torch.distributed.broadcast(flags, mpu.get_tensor_model_parallel_src_rank(), group=mpu.get_tensor_model_parallel_group()) args.do_train=flags[0].item() args.do_valid=flags[1].item() args.do_test=flags[2].item() #Builditerators. dl_type=args.dataloader_type iftrain_dataloaderisnotNone: train_data_iterator=iter(train_dataloader)ifdl_type=='single' elseiter(cyclic_iter(train_dataloader)) else: train_data_iterator=None ifvalid_dataloaderisnotNone: valid_data_iterator=iter(valid_dataloader)ifdl_type=='single' elseiter(cyclic_iter(valid_dataloader)) else: valid_data_iterator=None iftest_dataloaderisnotNone: test_data_iterator=iter(test_dataloader)ifdl_type=='single' elseiter(cyclic_iter(test_dataloader)) else: test_data_iterator=None returntrain_data_iterator,valid_data_iterator,test_data_iterator
5.2 DDP
在 get_model 之中,有如下代碼使用 DDP。
frommegatron.modelimportDistributedDataParallelasLocalDDP fromtorch.nn.parallel.distributedimportDistributedDataParallelastorchDDP ifwrap_with_ddp: ifargs.DDP_impl=='torch': i=torch.cuda.current_device() model=[torchDDP(model_module,device_ids=[i],output_device=i, process_group=mpu.get_data_parallel_group()) formodel_moduleinmodel] elifargs.DDP_impl=='local': model=[LocalDDP(model_module, args.accumulate_allreduce_grads_in_fp32, args.use_contiguous_buffers_in_local_ddp) formodel_moduleinmodel] else: raiseNotImplementedError('UnknownDDPimplementationspecified:' '{}.Exiting.'.format(args.DDP_impl))
所以我們看看 megatron 自己的 DDP實現(xiàn)。
5.2.1 定義
定義只有注釋可以看看,使用連續(xù)的(contiguous)內(nèi)存來存儲和累積梯度,每一種類型的張量屬于一個統(tǒng)一的內(nèi)存,可以統(tǒng)一做 allreduce。
classDistributedDataParallel(DistributedDataParallelBase): """DDPwithcontiguousbuffersoptionstostorreandaccumulategradients. Thisclass: -hasthepotentialtoreducememoryfragmentation. -providestheoptiontodothegradientaccumulation inatypeotherthantheparamstype(forexamplefp32) Arguments: module:inputmodel. accumulate_allreduce_grads_in_fp32:iftruedothegradientaccumulation andthegradientall-reduceallininfloat32.Ifthisoptionis true,werequire`use_contiguous_buffers`tobetruetoo. use_contiguous_buffers:iftrue,useacontiguousbuffertostorethe gradients. """
5.2.2 初始化
初始化方法的目的是把同類型梯度連續(xù)存儲。
def__init__(self,module, accumulate_allreduce_grads_in_fp32, use_contiguous_buffers): super(DistributedDataParallel,self).__init__(module) self.accumulate_allreduce_grads_in_fp32 =accumulate_allreduce_grads_in_fp32 self.use_contiguous_buffers=use_contiguous_buffers #Ifweareusingfp32-accumulate-allreduceexplicitly #thismeansweneedmaingradsinacontinousbuffer. ifself.accumulate_allreduce_grads_in_fp32: assertself.use_contiguous_buffers #=================================== #Restofthispartappliesonlyto #thecaseweusecontinuousbuffers. #=================================== self._grad_buffers=None ifself.use_contiguous_buffers:#這里只考慮連續(xù)內(nèi)存 self._grad_buffers={}#定義buffer #Simplefunctiontodefinebuffertype. def_get_buffer_type(param):#返回buffer類型 returntorch.floatif self.accumulate_allreduce_grads_in_fp32elseparam.dtype #Firstcalculatetotalnumberofelementspertype. type_num_elements={} forparaminself.module.parameters():#遍歷模型參數(shù) ifparam.requires_grad:#如果需要計算梯度 dtype=_get_buffer_type(param)#獲取參數(shù)類型 type_num_elements[dtype]=type_num_elements.get(dtype,0) +param.data.nelement()#該類型參數(shù)數(shù)目做相應(yīng)增加 #目前type_num_elements是各種類型參數(shù)的個數(shù) #Allocatethebuffer. fordtype,num_elementsintype_num_elements.items():#遍歷各種類型 self._grad_buffers[dtype]=MemoryBuffer(num_elements,dtype)#分配內(nèi)存 #這里是假定反向傳播是參數(shù)的反方向,存儲每個參數(shù)梯度的起始位置 #Assumethebackproporderisreversetheparamsorder, #storethestartindexforthegradients. forparaminself.module.parameters():#遍歷模型參數(shù) ifparam.requires_grad:#如果需要計算梯度 dtype=_get_buffer_type(param)#獲取參數(shù)類型 type_num_elements[dtype]-=param.data.nelement()#減少size #確定該參數(shù)在MemoryBuffer的位置 param.main_grad=self._grad_buffers[dtype].get(#獲取該參數(shù)對應(yīng)的內(nèi)存 param.data.shape,type_num_elements[dtype]) #Backwardhook. #Accumalationfunctionforthegradients.Weneed #tostorethemsotheydon'tgooutofscope. self.grad_accs=[] #Loopoveralltheparametersinthemodel. forparaminself.module.parameters():#遍歷模型參數(shù) ifparam.requires_grad:#如果需要計算梯度 #Expandsowegetaccesstograd_fn. param_tmp=param.expand_as(param) #Getthegradientaccumulatorfuncttion. grad_acc=param_tmp.grad_fn.next_functions[0][0]#得到參數(shù)對應(yīng)的梯度函數(shù) grad_acc.register_hook(self._make_param_hook(param))#注冊了hook self.grad_accs.append(grad_acc)#統(tǒng)一管理梯度函數(shù),其實就是bookkeeping作用
5.2.3 內(nèi)存
MemoryBuffer 是內(nèi)存抽象。
classMemoryBuffer: def__init__(self,numel,dtype): self.numel=numel self.dtype=dtype self.data=torch.zeros(self.numel,#初始化內(nèi)存 dtype=self.dtype, device=torch.cuda.current_device(), requires_grad=False) defzero(self): """Resetthebuffertozero.""" self.data.zero_() defget(self,shape,start_index): """Returnatensorwiththeinput`shape`asaviewintothe 1-Ddatastartingat`start_index`.""" end_index=start_index+shape.numel()#定位到該張量在內(nèi)存buffer之中的位置 assertend_index<=?self.numel,? ????????????'requested?tensor?is?out?of?the?buffer?range.' ????????buffer_tensor?=?self.data[start_index:end_index]?#?拿到內(nèi)存 ????????buffer_tensor?=?buffer_tensor.view(shape) ????????return?buffer_tensor?#?
5.2.4 支撐函數(shù)
下面是兩個支撐函數(shù),分別是用于拷貝梯度和將buffer清零。
def_make_param_hook(self,param): """Createtheall-reducehookforbackprop.""" #Hookusedforback-prop. defparam_hook(*unused): #Addthegradienttothebuffer. ifparam.grad.dataisnotNone: param.main_grad.add_(param.grad.data)#把梯度拷貝到連續(xù)內(nèi)存之中 #Nowwecandeallocategradmemory. param.grad=None returnparam_hook defzero_grad_buffer(self): """Setthegradbufferdatatozero.Needstobecalledatthe beginingofeachiteration.""" assertself._grad_buffersisnotNone,'buffersarenotinitialized.' for_,buffer_inself._grad_buffers.items(): buffer_.zero()
我們假定模型有6個參數(shù),3個 fp32,3 個 fp16,所以被組合成兩個連續(xù)內(nèi)存 MemoryBuffer。
5.2.5 梯度規(guī)約
allreduce_gradients 是 DDP 對外提供的 API,在后面 train step 之中會調(diào)用到。
defallreduce_gradients(self): """Reducegradientsacrossdataparallelranks.""" #Ifwehavebuffers,simplyreducethedatainthebuffer. ifself._grad_buffersisnotNone: #連續(xù)內(nèi)存 for_,buffer_inself._grad_buffers.items():#遍歷各種類型的buffer buffer_.data/=mpu.get_data_parallel_world_size() torch.distributed.all_reduce(#統(tǒng)一歸并 buffer_.data,group=mpu.get_data_parallel_group()) else: #Otherwise,bucketizeandall-reduce buckets={}#否則還是用桶來歸并 #Packthebuckets. forparaminself.module.parameters():#遍歷梯度 ifparam.requires_gradandparam.gradisnotNone: tp=param.data.type() iftpnotinbuckets: buckets[tp]=[] buckets[tp].append(param)#同類型的梯度放到對應(yīng)類型的桶之中 param.main_grad=param.grad #Foreachbucket,all-reduceandcopyall-reducedgrads. fortpinbuckets: bucket=buckets[tp] grads=[param.grad.dataforparaminbucket]#把桶里的梯度拿出來 coalesced=_flatten_dense_tensors(grads)#打平梯度 coalesced/=mpu.get_data_parallel_world_size() torch.distributed.all_reduce(#歸并 coalesced,group=mpu.get_data_parallel_group()) forbuf,syncedinzip(grads,_unflatten_dense_tensors( coalesced,grads)): buf.copy_(synced)
運行時候,分別對兩種類型的連續(xù)內(nèi)存做 AllReduce。
0x06 訓(xùn)練
Pretrain 之中會調(diào)用 train 來進行訓(xùn)練。
ifargs.do_trainandargs.train_iters>0: iteration=train(forward_step_func, model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator)
6.1 訓(xùn)練主體
train 是常規(guī)的套路,大家基本上按照名字就可以理解。
deftrain(forward_step_func,model,optimizer,lr_scheduler, train_data_iterator,valid_data_iterator): """Trainthemodelfunction.""" args=get_args() timers=get_timers() #Writeargstotensorboard write_args_to_tensorboard() #Turnontrainingmodewhichenablesdropout. formodel_moduleinmodel: model_module.train()# #Trackingloss. total_loss_dict={} #Iterations. iteration=args.iteration report_memory_flag=True whileiterationargs.exit_duration_in_mins]) torch.distributed.all_reduce( done_cuda,op=torch.distributed.ReduceOp.MAX) done=done_cuda.item() ifdone: ifnotsaved_checkpoint: save_checkpoint_and_time(iteration,model,optimizer, lr_scheduler) sys.exit() #Exitingbasedoniterations ifargs.exit_intervalanditeration%args.exit_interval==0: ifnotsaved_checkpoint: save_checkpoint_and_time(iteration,model,optimizer, lr_scheduler) torch.distributed.barrier() sys.exit() returniteration
6.2 訓(xùn)練step
train_step 會獲取 get_forward_backward_func 得到 schedule,因為是流水線并行,所以需要 schedule 如何具體訓(xùn)練。
deftrain_step(forward_step_func,data_iterator, model,optimizer,lr_scheduler): """Singletrainingstep.""" args=get_args() timers=get_timers() #Setgradtozero. ifargs.DDP_impl=='local'andargs.use_contiguous_buffers_in_local_ddp: forpartitioninmodel: partition.zero_grad_buffer() optimizer.zero_grad() #獲取訓(xùn)練schedule forward_backward_func=get_forward_backward_func() losses_reduced=forward_backward_func(#進行訓(xùn)練 forward_step_func,data_iterator,model, optimizer,timers,forward_only=False) #Emptyunusedmemory ifargs.empty_unused_memory_level>=1: torch.cuda.empty_cache() #All-reduceifneeded. ifargs.DDP_impl=='local': formodel_moduleinmodel: model_module.allreduce_gradients() #All-reduceword_embeddings'gradacrossfirstandlaststagestoensure #thatword_embeddingsparametersstayinsync. #Thisshouldonlyrunformodelsthatsupportpipelinedmodelparallelism #(BERTandGPT-2). ifmpu.is_rank_in_embedding_group(ignore_virtual=True)and mpu.get_pipeline_model_parallel_world_size()>1: ifmpu.is_pipeline_first_stage(ignore_virtual=True): unwrapped_model=model[0] elifmpu.is_pipeline_last_stage(ignore_virtual=True): unwrapped_model=model[-1] else:#WedonotsupporttheinterleavedscheduleforT5yet. unwrapped_model=model[0] unwrapped_model=unwrap_model( unwrapped_model,(torchDDP,LocalDDP,Float16Module)) ifunwrapped_model.share_word_embeddings: word_embeddings_weight=unwrapped_model.word_embeddings_weight() ifargs.DDP_impl=='local': grad=word_embeddings_weight.main_grad else: grad=word_embeddings_weight.grad torch.distributed.all_reduce(grad,group=mpu.get_embedding_group()) #Updateparameters. update_successful,grad_norm,num_zeros_in_grad=optimizer.step() #Updatelearningrate. ifupdate_successful: increment=get_num_microbatches()* args.micro_batch_size* args.data_parallel_size lr_scheduler.step(increment=increment) skipped_iter=0 else: skipped_iter=1 #Emptyunusedmemory ifargs.empty_unused_memory_level>=2: torch.cuda.empty_cache() ifmpu.is_pipeline_last_stage(ignore_virtual=True): #Averagelossacrossmicrobatches. loss_reduced={} forkeyinlosses_reduced[0]: losses_reduced_for_key=[x[key]forxinlosses_reduced] loss_reduced[key]=sum(losses_reduced_for_key)/len(losses_reduced_for_key) returnloss_reduced,skipped_iter,grad_norm,num_zeros_in_grad return{},skipped_iter,grad_norm,num_zeros_in_grad
6.3 獲取schedule
get_forward_backward_func 獲取 pipeline 的schedule,這里分為 flush 和 interleaving 兩種,我們后續(xù)會分析這兩種schedule。
defget_forward_backward_func(): args=get_args() ifmpu.get_pipeline_model_parallel_world_size()>1: ifargs.virtual_pipeline_model_parallel_sizeisnotNone: forward_backward_func=forward_backward_pipelining_with_interleaving else: forward_backward_func=forward_backward_pipelining_without_interleaving else: forward_backward_func=forward_backward_no_pipelining returnforward_backward_func
訓(xùn)練邏輯大體拓展為:
至此,Megatron 基本架構(gòu)分析完畢,下一篇我們介紹模型并行設(shè)置。
審核編輯:湯梓紅
-
NVIDIA
+關(guān)注
關(guān)注
14文章
5080瀏覽量
103823 -
源碼
+關(guān)注
關(guān)注
8文章
653瀏覽量
29508 -
模型
+關(guān)注
關(guān)注
1文章
3340瀏覽量
49267 -
語言模型
+關(guān)注
關(guān)注
0文章
542瀏覽量
10344 -
pytorch
+關(guān)注
關(guān)注
2文章
808瀏覽量
13376
原文標題:[源碼解析] 模型并行分布式訓(xùn)練Megatron (2) --- 整體架構(gòu)
文章出處:【微信號:GiantPandaCV,微信公眾號:GiantPandaCV】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
基于Transformer做大模型預(yù)訓(xùn)練基本的并行范式
![基于Transformer做大<b class='flag-5'>模型</b>預(yù)<b class='flag-5'>訓(xùn)練</b>基本的<b class='flag-5'>并行</b>范式](https://file1.elecfans.com/web2/M00/88/E9/wKgaomR2616AZCOWAAAPvPCxMio906.png)
《無線通信FPGA設(shè)計》分布式FIR的并行改寫
Pytorch模型訓(xùn)練實用PDF教程【中文】
分布式系統(tǒng)的優(yōu)勢是什么?
分布式對象調(diào)試中的事件模型
Google Brain和DeepMind聯(lián)手發(fā)布可以分布式訓(xùn)練模型的框架
![Google Brain和DeepMind聯(lián)手發(fā)布可以<b class='flag-5'>分布式</b><b class='flag-5'>訓(xùn)練</b><b class='flag-5'>模型</b>的框架](https://file.elecfans.com/web2/M00/03/D4/pYYBAGDW22-AXhoyAAAN9bmAG_Q287.png)
超大Transformer語言模型的分布式訓(xùn)練框架
![超大Transformer語言<b class='flag-5'>模型</b>的<b class='flag-5'>分布式</b><b class='flag-5'>訓(xùn)練</b>框架](https://file.elecfans.com/web2/M00/17/60/pYYBAGFj-qiAPd3iAAARA9q0LDI270.png)
探究超大Transformer語言模型的分布式訓(xùn)練框架
圖解大模型系列之:Megatron源碼解讀1,分布式環(huán)境初始化
![圖解大<b class='flag-5'>模型</b>系列之:<b class='flag-5'>Megatron</b>源碼解讀1,<b class='flag-5'>分布式</b>環(huán)境初始化](https://file1.elecfans.com/web2/M00/89/3F/wKgaomR-3vqADPeCAABSuRwa5jQ893.png)
大模型分布式訓(xùn)練并行技術(shù)(一)-概述
![大<b class='flag-5'>模型</b><b class='flag-5'>分布式</b><b class='flag-5'>訓(xùn)練</b><b class='flag-5'>并行</b>技術(shù)(一)-概述](https://file1.elecfans.com/web2/M00/95/89/wKgaomTnBHuAQ4pqAABEKvb4iZQ927.png)
PyTorch GPU 加速訓(xùn)練模型方法
分布式通信的原理和實現(xiàn)高效分布式通信背后的技術(shù)NVLink的演進
![<b class='flag-5'>分布式</b>通信的原理和實現(xiàn)高效<b class='flag-5'>分布式</b>通信背后的技術(shù)NVLink的演進](https://file1.elecfans.com/web2/M00/0C/B2/wKgaomc6m2qAX-1eAAAF5ef6CJE040.png)
評論