文章目錄
系列文章 0x0. 前言 0x1. Supervised finetuning (SFT) 教程翻譯 如何訓練模型 如何對SFT checkpoint進行評測? 模型和數據 來自OPT-1.3B及其SFT變體(使用不同微調數據)的提示示例 一些參數解釋和可訓練的最大模型 其它 0x2. 評測腳本解讀 0x3. 訓練腳本解讀 0x3.1 頭文件相關解析 create_prompt_dataset解析 工具函數解析 print_rank_0 to_device save_hf_format set_random_seed get_all_reduce_mean get_optimizer_grouped_parameters save_zero_three_model load_hf_tokenizer convert_linear_layer_to_lora convert_lora_to_linear_layer only_optimize_lora_parameters create_hf_model 0x3.2 main.py主體解析 parse_args解析 main函數解析 0x4. hybrid_engine的細節 & log 0x5. 總結
系列文章
DeepSpeed-Chat 打造類ChatGPT全流程 筆記一
【DeepSpeed 教程翻譯】三,在 DeepSpeed中使用 PyTorch Profiler和Flops Profiler
DeepSpeed結合Megatron-LM訓練GPT2模型筆記(上)
【DeepSpeed 教程翻譯】二,Megatron-LM GPT2,Zero 和 ZeRO-Offload
【DeepSpeed 教程翻譯】開始,安裝細節和CIFAR-10 Tutorial
0x0. 前言
在 DeepSpeed-Chat 打造類ChatGPT全流程 筆記一 中跑通了DeepSpeed Chat的訓練和推理流程,DeepSpeed Chat的訓練流程包含監督指令微調(SFT),Reward模型微調,基于人類反饋的強化學習(RLHF)三個步驟。接著上面文章的todo,這篇文章主要是解析一下監督指令微調(SFT)階段的代碼實現。
0x1. Supervised finetuning (SFT) 教程翻譯
監督微調(SFT)與在自然語言任務(例如,WikiText-103)上的標準語言模型微調非常相似。主要的區別來自于數據集資源,SFT將收集高質量的查詢-回答對來微調模型以達到人類更傾向的生成結果。
如何訓練模型
我們提供了多個腳本用于在單個GPU(例如,單個A6000-48G,V100-32G,A100-40G等),單節點(例如,8/16x V100-32G,8 A100-40G/80G)和多節點設置(例如,64x A100-80G)上進行訓練,這些可以在 training_scripts 目錄中找到。例如,如果你有一個單獨的A6000-48G,你可以簡單地運行對應的腳本
training_scripts/single_gpu/run_1.3b.sh
來訓練一個OPT-1.3b模型。我們的單節點腳本很容易擴展到多節點系統。
如何對SFT checkpoint進行評測?
一旦你使用上述代碼完成訓練,你可以簡單地執行 bash evaluation_scripts/run_prompt.sh
它會要求用戶提供兩個模型的路徑:(a) 原始預訓練模型(即 --model_name_or_path_baseline facebook/opt-1.3b)和 (b) 微調后的模型(即 --model_name_or_path_finetune output/check_base)。"prompt_eval.py" 包含了幾個可以根據你的喜好進行更新的提示。
模型和數據
由于GPT3沒有開源的checkpoint,我們使用了Meta OPT家族的預訓練模型(即facebook/opt-1.3b)。你也可以使用其他預訓練模型(如GPT-Neo,Bloom等)。至于數據集,我們也使用了來自Huggingface數據集的開源數據集,具體如下:
Dahoas/rm-static Dahoas/full-hh-rlhf Dahoas/synthetic-instruct-gptj-pairwise yitingxie/rlhf-reward-datasets openai/webgpt_comparisons stanfordnlp/SHP
感謝DeepSpeed RLHF的數據抽象和融合技術,我們現在可以將多個數據源合并用于訓練。然而,重要的是要注意,不同的數據集可能使用不同的提示詞(例如,Dohas/rm-static使用"Human:"表示查詢,"Assistant:"表示回答)。因此,用戶必須自行對齊這些提示。在我們的例子中,我們一致使用了Dohas/rm-static的格式。通過我們的評估,我們發現整合多樣化的數據集可以提高模型的質量。請參考下一節以獲取不同查詢-答案對的示例。
來自OPT-1.3B及其SFT變體(使用不同微調數據)的提示示例
在這里插入圖片描述
一些參數解釋和可訓練的最大模型
main.py文件中使用的大多數參數都有清晰的解釋,如果你有解碼器模型微調的經驗,通常很容易理解。然而,如果你對其中任何一個不清楚,請不要猶豫在GitHub問題上向我們求助。在這一部分,我們提供了一些具體的參數解釋和它們的使用方法。
參數 | 解釋 | 注意事項 |
---|---|---|
--data_path | 用于微調模型的數據 | 你可以指定多個數據資源來訓練模型,例如:Dahoas/rm-static Dahoas/full-hh-rlhf |
--data_split | 為三步訓練切分數據 | 根據InstructGPT,我們提供了切分數據集的能力,使得每個分區只在一個步驟中使用。設置為"2,4,4"意味著我們分別使用20%,40%,40%的數據在每個步驟中。如果你只做SFT,或者你發現在不同步驟中使用重疊數據是可以的/有幫助的,你可以將它改為"10,0,0"。 |
--sft_only_data_path | 用于微調模型的單響應數據 | 對于只在步驟1中使用的單響應數據,你應該將它們作為這個參數的一部分,而不是上面的data_path參數。這個參數中的數據集將不會被切分,而只在步驟1中全面使用。 |
--gradient_checkpoint | 為模型啟用梯度檢查點(也稱為激活檢查點) | 這可以顯著降低訓練內存成本 |
--offload | DeepSpeed特定功能。將模型卸載到CPT/NVME以節省內存 | 這可以在內存消耗較少的情況下訓練更大的模型。但是它會減慢訓練的速度。 |
--zero_stage | DeepSpeed特定功能,適用于多GPU系統 | 這可以幫助將模型/優化器分布在多個GPU上。請參見https://www.deepspeed.ai/tutorials/zero/ |
--lora_dim | 當它大于0時,將啟用LoRA | 通常,LoRA需要更大的學習率才能更好地收斂 |
--lora_module_name | 啟用LoRA模塊的范圍。 | |
--only_optimize_lora | 凍結所有其他參數,只優化LoRA相關參數 | |
--gradient_checkpoint, --lora_dim, only_optimize_lora | 當啟用LoRA和梯度檢查點時,不能啟用只優化LoRA | 如果全部啟用,將影響梯度流(也就是由PyTorch支持的auto-grad系統后端) |
對于用戶來說,一個重要的考慮是確定他們可以使用當前系統訓練的最大模型大小。在這里,我們提供了一個估計這個限制的方法。假設你不使用卸載功能,并啟用(i)零階段3(如果使用多個GPU),(ii)梯度檢查點,以及(iii)LoRA,那么你可以訓練的大致最大模型大小(以十億參數為單位)可以估計為"總GPU內存(GB)除以3"。例如,如果你有一臺單一的A6000-48G GPU,你可能可以訓練最多16十億參數的模型。需要注意的是,這只是一個粗略的估計,你應該自己驗證。
其它
從InstructGPT的工作中,我們建議為了得到更好的人類偏好的答案,讓模型過度擬合(即更長的訓練周期)。通過我們的探索,我們發現這對于較小模型的微調,如OPT-1.3B,特別有幫助。值得注意的是,我們在腳本中提供的超參數并沒有經過大量的調整。因此,我們鼓勵用戶和實踐者自己找到最優的配置。此外,我們的系統可以很容易地擴展到其他語言,如中文和日語。為了證明這一點,我們在"training_scripts/other_language"目錄下包含了兩個例子。
0x2. 評測腳本解讀
DeepSpeedExamples/applications/DeepSpeed-Chat/training/step1_supervised_finetuning/evaluation_scripts/run_prompt.sh這個腳本是對輸入的模型進行評測,我們看一下內容:
# You can provide two models to compare the performance of the baseline and the finetuned model export CUDA_VISIBLE_DEVICES=0 python prompt_eval.py --model_name_or_path_baseline XXX --model_name_or_path_finetune XXX
這表示我們可以調用prompt_eval.py來對baseline模型和finetune模型進行對比評測。所以評測的實現就是這里的prompt_eval.py,接下來對prompt_eval.py做一個解讀。
首先是頭文件導入部分,除了一些常規的需要用到的包還從DeepSpeed Chat的utils下面導入了2個函數。
#DeepSpeedTeam importargparse importlogging importtorch importsys importos fromtransformersimport( AutoModelForCausalLM,) sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__),os.path.pardir))) fromutils.model.model_utilsimportcreate_hf_model#從utils下面導入自定義函數 fromutils.utilsimportload_hf_tokenizer logger=logging.getLogger(__name__)
接下來解析一下create_hf_model和load_hf_tokenizer兩個函數的實現。首先是create_hf_model函數:
#這段代碼定義了一個名為create_hf_model的函數,該函數的作用是創建或加載一個預訓練模型。該函數的主要參數包括: #model_class:模型的類別,例如GPT-2、BERT等。 #tokenizer:用于模型的分詞器。 #ds_config:DeepSpeed的配置參數。 #rlhf_training:一個標志,用來表示是否正在進行RLHF(ReinforcementLearningfromHumanFeedback)訓練。 #disable_dropout:一個標志,用來表示是否禁用dropout。Dropout是一種防止過擬合的技術。 defcreate_hf_model(model_class, model_name_or_path, tokenizer, ds_config=None, rlhf_training=False, disable_dropout=False): #根據model_name_or_path從預訓練模型獲取模型配置model_config。 model_config=AutoConfig.from_pretrained(model_name_or_path) #如果disable_dropout為真,則將模型配置中的dropout設為0.0。 ifdisable_dropout: model_config.dropout=0.0 #Note:dschfisdefinedinfunctionscopetoavoidglobaleffects #https://huggingface.co/docs/transformers/main_classes/deepspeed#nontrainer-deepspeed-integration #根據ds_config中的設置,創建DeepSpeed的配置對象dschf,以便進行DeepSpeed優化。 ifds_configisnotNoneandds_config["zero_optimization"]["stage"]==3: dschf=HfDeepSpeedConfig(ds_config) else: dschf=None #根據rlhf_training的值,確定是從配置中創建模型還是從預訓練模型中加載模型。如果rlhf_training為真,則根據模型配置創建模型;否則,從預訓練模型加載模型。 ifrlhf_training: #theweightloadingishandledbycreatecriticmodel model=model_class.from_config(model_config) else: model=model_class.from_pretrained( model_name_or_path, from_tf=bool(".ckpt"inmodel_name_or_path), config=model_config) #將模型的結束標記和填充標記設為分詞器的結束標記id。 model.config.end_token_id=tokenizer.eos_token_id model.config.pad_token_id=model.config.eos_token_id #調整模型的詞匯表大小,使其為8的倍數。這樣做的目的是為了在某些硬件(如GPU)上提高效率。 model.resize_token_embeddings(int( 8* math.ceil(len(tokenizer)/8.0)))#makethevocabsizemultipleof8 returnmodel
然后是load_hf_tokenizer函數:
#這段代碼定義了一個名為load_hf_tokenizer的函數,該函數的作用是加載一個指定的分詞器。這個函數的參數有: #model_name_or_path:模型的名稱或者預訓練模型的路徑。 #fast_tokenizer:是否使用FastTokenizer,這是一種更快的分詞器實現。 defload_hf_tokenizer(model_name_or_path,fast_tokenizer=True): #檢查model_name_or_path是否存在。如果存在,可能意味著這是一個本地的模型路徑。 ifos.path.exists(model_name_or_path): #Locallytokenizerloadinghassomeissue,soweneedtoforcedownload #如果是本地模型,再檢查在該路徑下的"config.json"文件是否存在。如果存在,打開該文件,并從文件中讀取"_name_or_path"字段,將其賦值給model_name,然后通過AutoTokenizer.from_pretrained函數,使用該模型名稱和fast_tokenizer參數,從HuggingFace模型庫下載并加載分詞器。 model_json=os.path.join(model_name_or_path,"config.json") ifos.path.exists(model_json): model_json_file=json.load(open(model_json)) model_name=model_json_file["_name_or_path"] tokenizer=AutoTokenizer.from_pretrained(model_name, fast_tokenizer=True) else: #如果model_name_or_path不存在,直接使用AutoTokenizer.from_pretrained函數,使用model_name_or_path和fast_tokenizer參數,從HuggingFace模型庫下載并加載分詞器。 tokenizer=AutoTokenizer.from_pretrained(model_name_or_path, fast_tokenizer=True) returntokenizer
接下來是參數解析部分,解析如下:
#這段代碼定義了一個名為parse_args的函數,該函數的作用是解析命令行參數。它使用Python的argparse庫來完成這個工作。下面是每個參數的詳解: defparse_args(): parser=argparse.ArgumentParser(description="EvalthefinetuedSFTmodel") #model_name_or_path_baseline:基線模型的路徑,這是一個必須提供的參數(required=True)。 parser.add_argument( "--model_name_or_path_baseline", type=str, help="Pathtobaselinemodel", required=True, ) #model_name_or_path_finetune:微調后模型的路徑,這也是一個必須提供的參數。 parser.add_argument( "--model_name_or_path_finetune", type=str, help="Pathtopretrainedmodel", required=True, ) #num_beams:用于指定集束搜索的集束寬度,其默認值為1。 parser.add_argument( "--num_beams", type=int, default=1, help='Specifynumofbeams', ) #num_beam_groups:用于指定集束搜索的組數,其默認值為1。 parser.add_argument( "--num_beam_groups", type=int, default=1, help='Specifynumofbeams', ) #top_k:用于指定在Top-K采樣中考慮的最高可能性詞匯的數量,其默認值為4。 parser.add_argument( "--top_k", type=int, default=4, help='Specifynumofbeams', ) #penalty_alpha:懲罰因子,其默認值為0.6。 parser.add_argument( "--penalty_alpha", type=float, default=0.6, help='Specifynumofbeams', ) #num_return_sequences:生成序列的數量,其默認值為1。 parser.add_argument( "--num_return_sequences", type=int, default=1, help='Specifynumofreturnsequences', ) #max_new_tokens:生成的最大新token數,其默認值為100。 parser.add_argument( "--max_new_tokens", type=int, default=100, help='Specifynumofreturnsequences', ) #language:語言類型,可以是"English"、"Chinese"或"Japanese",默認為"English"。 parser.add_argument("--language", type=str, default="English", choices=["English","Chinese","Japanese"]) #parser.parse_args()這個函數將解析命令行參數,并將結果保存在一個Namespace對象中。這個對象被返回,可以在其他地方使用這些參數。 args=parser.parse_args() returnargs
接下來是generate函數的解析:
#這個函數是用來利用訓練好的模型生成文本的,它接受以下參數 #model:已經訓練好的模型。 #tokenizer:用于將文本轉換為模型可理解的輸入的工具。 #inputs:模型的輸入數據。 #num_beams:在使用束搜索算法時的束寬,其默認值為1。 #num_beam_groups:在使用分組束搜索時的組數,默認為1。 #do_sample:是否進行隨機采樣。如果設為True,則在生成過程中會隨機選擇下一個單詞,而不是僅選擇最可能的單詞。默認為False。 #num_return_sequences:模型返回的序列數,默認為1。 #max_new_tokens:模型生成的最大新token數,即最大生成文本的長度,默認為100。 defgenerate(model, tokenizer, inputs, num_beams=1, num_beam_groups=1, do_sample=False, num_return_sequences=1, max_new_tokens=100): #函數首先使用模型的generate方法,根據提供的參數生成文本。 generate_ids=model.generate(inputs.input_ids, num_beams=num_beams, num_beam_groups=num_beam_groups, do_sample=do_sample, num_return_sequences=num_return_sequences, max_new_tokens=max_new_tokens) #使用tokenizer的batch_decode方法將生成的令牌ID解碼為可讀的文本。注意,這里跳過了特殊的令牌(如填充和開始/結束令牌),并且不會清理tokenize產生的額外空格。 result=tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) returnresult
接下來是generate_constrastive_search函數的解析:
#這個函數叫做generate_constrastive_search,它是用于利用訓練好的模型進行對比搜索生成文本的。這個函數接受以下參數: #model:已經訓練好的模型。 #tokenizer:用于將文本轉換為模型可理解的輸入的工具。 #inputs:模型的輸入數據。 #top_k:在每一步生成時,只考慮概率最高的top_k個候選項,然后進行隨機抽樣。默認為4。 #penalty_alpha:用于懲罰新生成的token與原始輸入之間的差異,默認為0.6。 #num_return_sequences:模型返回的序列數,默認為1。 #max_new_tokens:模型生成的最大新token數,即最大生成文本的長度,默認為100。 defgenerate_constrastive_search(model, tokenizer, inputs, top_k=4, penalty_alpha=0.6, num_return_sequences=1, max_new_tokens=100): #函數首先使用模型的generate方法,根據提供的參數生成文本。注意這里使用了模型的一個特殊的生成方式,這種方式在每一步生成時,只考慮概率最高的top_k個候選項,然后進行隨機抽樣,同時使用了一個懲罰因子penalty_alpha來懲罰新生成的token與原始輸入之間的差異。 generate_ids=model.generate(inputs.input_ids, top_k=top_k, penalty_alpha=penalty_alpha, num_return_sequences=num_return_sequences, max_new_tokens=max_new_tokens) #然后,使用tokenizer的batch_decode方法將生成的tokenID解碼為可讀的文本。注意,這里跳過了特殊的token(如填充和開始/結束token),并且不會清理token化產生的額外空格。 result=tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) returnresult
接下來是一個簡單的打印工具函數:
#gen_output:這是一個列表,其中包含了我們希望打印的內容,每一項都是一段文本。 defprint_utils(gen_output): #函數會遍歷gen_output列表中的每一項,然后將每一項都打印出來。為了在不同項之間增加一些可視化的分隔,函數在每一項前后都額外打印了一個空行。 foriinrange(len(gen_output)): print() print(gen_output[i]) print()
然后是prompt_eval這個函數,這個函數prompt_eval的目的是評估和比較基線模型(model_baseline)和微調過的模型(model_fintuned)對于一組提示(prompts)的生成性能。讓我們逐行進行解析:
#輸入參數包括:args(命令行參數)、model_baseline(基線模型)、model_fintuned(微調模型)、tokenizer(用于編碼和解碼的分詞器)、device(指定運行模型的設備)、prompts(一組要評估的提示)。 defprompt_eval(args,model_baseline,model_fintuned,tokenizer,device, prompts): #對于prompts中的每一個提示,我們都做以下操作: forpromptinprompts: #使用分詞器將提示轉換為模型所需的輸入格式,并將其移至指定的設備上。 inputs=tokenizer(prompt,return_tensors="pt").to(device) #打印一條消息表示我們現在正在進行基線模型的生成。 print("==========Baseline:Greedy=========") #然后,我們調用之前定義的generate函數使用貪婪搜索方法生成文本,并使用print_utils函數打印生成的結果。 r_base=generate(model_baseline, tokenizer, inputs, num_beams=1, num_return_sequences=args.num_return_sequences, max_new_tokens=args.max_new_tokens) print_utils(r_base) #打印一條消息表示我們現在正在進行微調模型的生成。 print("==========finetune:Greedy=========") #同樣地,我們調用generate函數使用貪婪搜索方法生成文本,并使用print_utils函數打印生成的結果。 r_finetune_g=generate(model_fintuned, tokenizer, inputs, num_beams=1, num_return_sequences=args.num_return_sequences, max_new_tokens=args.max_new_tokens) print_utils(r_finetune_g) #注意:在此函數中,貪婪搜索被用作基線方法。然而,該函數還提供了其他幾種搜索策略的例子,包括多項式采樣、束搜索、束搜索多項式采樣、多樣性束搜索和對比搜索。這些策略在此函數中都被注釋掉了,但你可以根據需要去掉注釋,使用這些策略。 #print("==========finetune:Multinomialsampling=========") #r_finetune_m=generate(model_fintuned,tokenizer,inputs, #num_beams=1, #do_sample=True, #num_return_sequences=args.num_return_sequences, #max_new_tokens=args.max_new_tokens) #print_utils(r_finetune_m) #print("==========finetune:BeamSearch=========") #r_finetune_b=generate(model_fintuned,tokenizer,inputs, #num_beams=args.num_beams, #num_return_sequences=args.num_return_sequences, #max_new_tokens=args.max_new_tokens) #print_utils(r_finetune_b) #print("==========finetune:Beam-searchmultinomialsampling=========") #r_finetune_s=generate(model_fintuned,tokenizer,inputs, #num_beams=args.num_beams, #do_sample=True, #num_return_sequences=args.num_return_sequences, #max_new_tokens=args.max_new_tokens) #print_utils(r_finetune_s) #print("==========finetune:DiverseBeamSearch=========") #r_finetune_d=generate(model_fintuned,tokenizer,inputs, #num_beams=args.num_beams, #num_beam_groups=args.num_beam_groups, #num_return_sequences=args.num_return_sequences, #max_new_tokens=args.max_new_tokens) #print_utils(r_finetune_d) #print("==========finetune:ConstrastiveSearch=========") #r_finetune_c=generate_constrastive_search(model_fintuned,tokenizer,inputs, #top_k=args.top_k, #penalty_alpha=args.penalty_alpha, #num_return_sequences=args.num_return_sequences, #max_new_tokens=args.max_new_tokens) #print_utils(r_finetune_c) #最后,打印一條消息表示這個提示的處理已經結束。然后打印兩個空行作為分隔。 print("====================promptend=============================") print() print()
解析main函數:
#main函數負責解析命令行參數、準備模型和分詞器、定義提示,然后使用這些來評估和比較基線模型和微調模型。 defmain(): #這個main函數是整個腳本的入口點。它首先通過parse_args函數解析命令行參數。然后它設置了運行模型的設備為第一個GPU。 args=parse_args() device=torch.device("cuda:0") #接著,它使用load_hf_tokenizer函數加載分詞器,然后使用create_hf_model函數創建基線模型(model_baseline)和微調模型(model_fintuned) tokenizer=load_hf_tokenizer(args.model_name_or_path_baseline, fast_tokenizer=True) model_baseline=create_hf_model(AutoModelForCausalLM, args.model_name_or_path_baseline, tokenizer,None) model_fintuned=create_hf_model(AutoModelForCausalLM, args.model_name_or_path_finetune, tokenizer,None) #然后,這些模型被移動到指定的設備上。 model_baseline.to(device) model_fintuned.to(device) #在接下來的部分,函數定義了一組用于評估的提示。注意,這里特別指出,如果提示以空格結束,那么沒有經過微調的原始模型有可能會陷入停滯并無法產生響應。微調過的模型在這方面表現得更好。因此,這里所有的提示都以冒號":"結束,以使得比較更有意義。 #這個腳本支持英文、中文和日文的評估,它通過args.language參數判斷用戶選擇的語言,并根據此選擇加載對應的提示。 ifargs.language=="English": prompts=[ "Human:PleasetellmeaboutMicrosoftinafewsentence?Assistant:", "Human:Explainthemoonlandingtoa6yearoldinafewsentences.Assistant:", "Human:Writeashortpoemaboutawisefrog.Assistant:", "Human:WhowaspresidentoftheUnitedStatesin1955?Assistant:", "Human:Howdoesatelescopework?Assistant:", "Human:Whydobirdsmigratesouthforthewinter?Assistant:" ] elifargs.language=="Chinese": prompts=[ "Human:請用幾句話介紹一下微軟?Assistant:", "Human:用幾句話向6歲的孩子解釋登月。Assistant:", "Human:寫一首關于一只聰明的青蛙的短詩。Assistant:", "Human:誰是1955年的美國總統?Assistant:","Human:望遠鏡是如何工作的?Assistant:", "Human:鳥類為什么要南遷過冬?Assistant:" ] elifargs.language=="Japanese": prompts=[ "Human:マイクロソフトについて簡単に教えてください。Assistant:", "Human:6歳児に月面著陸を短い文で説明する。Assistant:", "Human:賢いカエルについて短い詩を書いてください。Assistant:", "Human:1955年のアメリカ合衆國大統領は誰?Assistant:", "Human:望遠鏡はどのように機能しますか?Assistant:", "Human:鳥が冬に南に移動するのはなぜですか?Assistant:" ] #最后,它調用prompt_eval函數,將所有的參數和提示傳遞給它,以進行模型評估。 prompt_eval(args,model_baseline,model_fintuned,tokenizer,device, prompts)
0x3. 訓練腳本解讀
訓練過程都集中在DeepSpeedExamples/applications/DeepSpeed-Chat/training/step1_supervised_finetuning/main.py這個腳本,我們逐行解析一下這個腳本。
0x3.1 頭文件相關解析
#DeepSpeedTeam #首先,它導入了Python的標準庫,如argparse(用于解析命令行參數),os和math。 #然后,它導入了PyTorch庫,這是一個用于深度學習的開源庫,同時也導入了一些PyTorch的輔助模塊,如DataLoader(用于加載數據)、 #RandomSampler和SequentialSampler(用于數據抽樣)以及DistributedSampler(用于在分布式設置中進行數據抽樣)。 importargparse importos importmath importsys importtorch fromtorch.utils.dataimportDataLoader,RandomSampler,SequentialSampler fromtorch.utils.data.distributedimportDistributedSampler #接下來,它導入了HuggingFace的transformers庫的一些模塊,包括用于因果語言建模的模型(AutoModelForCausalLM),優化器調度類型(SchedulerType),默認的數據整理函數(default_data_collator)和獲取優化器調度器的函數(get_scheduler)。 fromtransformersimport( AutoModelForCausalLM, SchedulerType, default_data_collator, get_scheduler, ) #然后,它導入了deepspeed庫,這是一個為大規模模型訓練優化的庫。它也導入了deepspeed庫中的一些模塊,包括優化器類(DeepSpeedCPUAdam和FusedAdam) importdeepspeed fromdeepspeed.ops.adamimportDeepSpeedCPUAdam,FusedAdam #之后,它將當前腳本的父目錄添加到系統路徑中,以便可以從該目錄下的utils目錄導入一些自定義函數和模塊。 sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__),os.path.pardir))) #最后,它從utils目錄中導入了一些自定義模塊和函數,包括數據處理函數(create_prompt_dataset),打印和設備轉換函數(print_rank_0和to_device), #模型保存函數(save_hf_format),隨機種子設置函數(set_random_seed),求平均函數(get_all_reduce_mean), #獲取優化器參數組的函數(get_optimizer_grouped_parameters),保存和加載模型的函數(save_zero_three_model和load_hf_tokenizer), #以及創建模型和處理模型的函數(create_hf_model)。這些函數在腳本中的后續部分都將被使用。 fromutils.data.data_utilsimportcreate_prompt_dataset fromutils.utilsimportprint_rank_0,to_device,save_hf_format,set_random_seed,get_all_reduce_mean,get_optimizer_grouped_parameters,save_zero_three_model,load_hf_tokenizer fromutils.ds_utilsimportget_train_ds_config fromutils.module.loraimportconvert_linear_layer_to_lora,convert_lora_to_linear_layer,only_optimize_lora_parameters fromutils.model.model_utilsimportcreate_hf_model
create_prompt_dataset解析
create_prompt_dataset這個函數實際上直接或者間接的用到了utils/data中raw_dataset.py和data_utils.py,為了搞清楚這個函數,我們需要對這兩個文件做一個解析。
首先解析一下raw_dataset.py。這里先定義了一個PromptRawDataset類:
#DeepSpeedTeam fromdatasetsimportload_dataset fromtorch.utils.dataimportSubset importre #這段代碼定義了一個名為PromptRawDataset的類,這個類是一個模板類,用于處理和組織模型輸入數據的格式。 #如果有新的數據集需要進行處理,可以繼承這個類并實現相應的方法來確保數據的統一格式和接口。 classPromptRawDataset(object): #首先,這個類的構造函數__init__接收四個參數:output_path(輸出路徑),seed(隨機種子), #local_rank(本地等級)和dataset_name(數據集名稱)。 #在構造函數中,如果數據集名稱不是'local/jsonfile', #那么會使用HuggingFace的datasets庫的load_dataset函數來加載數據集。 def__init__(self,output_path,seed,local_rank,dataset_name): self.output_path=output_path self.seed=seed self.local_rank=local_rank ifnotdataset_name=='local/jsonfile': self.raw_datasets=load_dataset(dataset_name) #然后,這個類定義了一些方法,這些方法在默認情況下并沒有實現(只是返回None或者空操作), #這是因為這個類只是一個模板類,這些方法需要在實際使用時在子類中具體實現。 defget_train_data(self):#獲取訓練數據 return defget_eval_data(self):#獲取評估數據 return #Thepromptshouldbeintheformatof:"Human:"+actual_prompt_sentence+"Assistant:" #get_prompt方法用于獲取樣本中的prompt(提示,這是模型的輸入)。 defget_prompt(self,sample): return #Thechosenresponseshouldbeintheformatof:""+actual_response_sentence #get_chosen方法用于獲取樣本中的chosen(已選的回應,這是模型需要生成的目標輸出)。 defget_chosen(self,sample): return #Therejectedresponseshouldbeintheformatof:""+actual_response_sentence #Ifthedatasetdoesnothaverejectedresponse,returnNone #get_rejected方法用于獲取樣本中的rejected(被拒絕的回應,這可能用于一些特定的訓練場景,比如在對抗訓練中,但如果數據集中沒有這樣的數據,可以返回None)。 defget_rejected(self,sample): return #獲取樣本中的prompt和chosen defget_prompt_and_chosen(self,sample): return #獲取樣本中的prompt和rejected defget_prompt_and_rejected(self,sample): return
接下來就是每個具體數據集的定義,我這里以 OpenaiWebgptcomparisonsDataset 為例解析一下,剩下的讀者又需要可以自行理解:
#Englishdataset #這個類OpenaiWebgptcomparisonsDataset繼承自PromptRawDataset類, #針對"openai/webgpt_comparisons"這個具體的數據集進行了特化。 classOpenaiWebgptcomparisonsDataset(PromptRawDataset): #在構造函數__init__中,調用了父類的構造函數,并設定了dataset_name和dataset_name_clean兩個屬性, #分別為"openai/webgpt_comparisons"和"openai_webgpt_comparisons"。 def__init__(self,output_path,seed,local_rank,dataset_name): super().__init__(output_path,seed,local_rank,dataset_name) self.dataset_name="openai/webgpt_comparisons" self.dataset_name_clean="openai_webgpt_comparisons" #get_train_data和get_eval_data方法分別從raw_datasets中獲取訓練數據和測試數據。 #它們與之前的DahoasRmstaticDataset類不同之處在于,它們使用get_raw_dataset_split_index #方法對訓練數據進行了劃分,將其劃分為訓練集和驗證集,并返回對應的數據子集。 defget_train_data(self): from.data_utilsimportget_raw_dataset_split_index dataset=self.raw_datasets["train"] index=get_raw_dataset_split_index(self.local_rank,self.output_path, self.dataset_name_clean, self.seed,"train_eval","9,1",0, len(dataset)) dataset=Subset(dataset,index) returndataset defget_eval_data(self): from.data_utilsimportget_raw_dataset_split_index dataset=self.raw_datasets["train"] index=get_raw_dataset_split_index(self.local_rank,self.output_path, self.dataset_name_clean, self.seed,"train_eval","9,1",1, len(dataset)) dataset=Subset(dataset,index) returndataset #get_prompt,get_chosen和get_rejected方法分別從樣本中獲取提示,已選回應和被拒絕的回應。 #這里假定樣本是一個字典,其中包含了名為'question','score_0','score_1','answer_0'和'answer_1'的字段。 #其中,'question'字段是一個字典,包含了'full_text'字段。這個字段包含了人類提出的問題。 #'score_0'和'score_1'字段是字符串,表示對'answer_0'和'answer_1'的評分。 #如果'score_0'大于等于'score_1',那么'answer_0'就是已選回應,'answer_1'就是被拒絕的回應,反之亦然。 #在獲取已選回應和被拒絕的回應時,還對回應進行了處理, #去除了所有形如"[...]"或"(...)"的文本,然后在回應前添加了一個空格。 defget_prompt(self,sample): return"Human:"+sample['question']['full_text']+"Assistant:" defget_chosen(self,sample): iffloat(sample['score_0'])>=float(sample['score_1']): response=sample['answer_0'] else: response=sample['answer_1'] #Thisdatahascitationsquarebracketsandnumbers(e.g.,"[1]"). #Rightnowwearenotdoingbrowser-assistedfinetuning,thuswe #removethesecitationstoavoidconfusingthemodel. response=re.sub(r"[([].*?[)]]","",response) response=re.sub(r"[([].*?[)]]","",response) return""+response defget_rejected(self,sample): iffloat(sample['score_0'])=float(sample['score_1']): response=sample['answer_0'] else: response=sample['answer_1'] response=re.sub(r"[([].*?[)]]","",response) response=re.sub(r"[([].*?[)]]","",response) return"Human:"+sample['question'][ 'full_text']+"Assistant:"+response defget_prompt_and_rejected(self,sample): iffloat(sample['score_0'])
接著解析 data_utils.py:
#DeepSpeedTeam """ Partofthecodewasadoptedfromhttps://github.com/microsoft/Megatron-DeepSpeed/blob/main/megatron/data/dataset_utils.py """ importtorch fromtorch.utils.dataimportDataset,Subset,ConcatDataset fromtorch.nn.utils.rnnimportpad_sequence importtorch.nn.functionalasF fromdatasetsimportload_dataset importnumpyasnp importos importhashlib#Python的內置庫,提供了一系列散列函數,如MD5、SHA1等。 fromitertoolsimportchain#Python的內置庫,提供了一系列用于操作迭代器的函數。 from.importraw_datasets #這段代碼定義了一個名為get_raw_dataset的函數,其主要作用是根據傳入的數據集名稱dataset_name #返回一個適當的PromptRawDataset子類的實例。 #這個函數有四個參數:dataset_name,output_path,seed和local_rank。 defget_raw_dataset(dataset_name,output_path,seed,local_rank): #在函數中,根據dataset_name的具體值來創建不同的PromptRawDataset子類的實例。 #例如,如果dataset_name為"Dahoas/rm-static",那么就創建一個DahoasRmstaticDataset的實例; #如果dataset_name為"Dahoas/full-hh-rlhf",那么就創建一個DahoasFullhhrlhfDataset的實例,以此類推。 if"Dahoas/rm-static"indataset_name: returnraw_datasets.DahoasRmstaticDataset(output_path,seed, local_rank,dataset_name) elif"Dahoas/full-hh-rlhf"indataset_name: returnraw_datasets.DahoasFullhhrlhfDataset(output_path,seed, local_rank,dataset_name) elif"Dahoas/synthetic-instruct-gptj-pairwise"indataset_name: returnraw_datasets.DahoasSyntheticinstructgptjpairwiseDataset( output_path,seed,local_rank,dataset_name) elif"yitingxie/rlhf-reward-datasets"indataset_name: returnraw_datasets.YitingxieRlhfrewarddatasetsDataset( output_path,seed,local_rank,dataset_name) elif"openai/webgpt_comparisons"indataset_name: returnraw_datasets.OpenaiWebgptcomparisonsDataset( output_path,seed,local_rank,dataset_name) elif"stanfordnlp/SHP"indataset_name: returnraw_datasets.StanfordnlpSHPDataset(output_path,seed, local_rank,dataset_name) elif"pvduy/sharegpt_alpaca_oa_vicuna_format"indataset_name: returnraw_datasets.PvduySharegptalpacaoavicunaformatDataset( output_path,seed,local_rank,dataset_name) elif"wangrui6/Zhihu-KOL"indataset_name: returnraw_datasets.Wangrui6ZhihuKOLDataset(output_path,seed, local_rank,dataset_name) elif"Cohere/miracl-zh-queries-22-12"indataset_name: returnraw_datasets.CohereMiraclzhqueries2212Dataset( output_path,seed,local_rank,dataset_name) elif"Hello-SimpleAI/HC3-Chinese"indataset_name: returnraw_datasets.HelloSimpleAIHC3ChineseDataset( output_path,seed,local_rank,dataset_name) elif"mkqa-Chinese"indataset_name: returnraw_datasets.MkqaChineseDataset(output_path,seed,local_rank, "mkqa") elif"mkqa-Japanese"indataset_name: returnraw_datasets.MkqaJapaneseDataset(output_path,seed,local_rank, "mkqa") elif"Cohere/miracl-ja-queries-22-12"indataset_name: returnraw_datasets.CohereMiracljaqueries2212Dataset( output_path,seed,local_rank,dataset_name) elif"lmqg/qg_jaquad"indataset_name: returnraw_datasets.LmqgQgjaquadDataset(output_path,seed,local_rank, dataset_name) elif"lmqg/qag_jaquad"indataset_name: returnraw_datasets.LmqgQagjaquadDataset(output_path,seed,local_rank, dataset_name) #如果dataset_name是"local/jsonfile",則會檢查在路徑chat_path+'/data/train.json' #和chat_path+'/data/eval.json'下是否存在文件。如果存在,則創建一個LocalJsonFileDataset的實例; #如果不存在,則拋出一個RuntimeError異常。 elif"local/jsonfile"indataset_name: chat_path=os.path.abspath( os.path.join(os.path.dirname(__file__),os.path.pardir, os.path.pardir,os.path.pardir)) ifnot(os.path.isfile(chat_path+'/data/train.json') andos.path.isfile(chat_path+'/data/eval.json')): raiseRuntimeError( f"Pleasecheckboththetrain.jsonandeval.jsonfilesinyourapplications/DeepSpeed-Chat/datadirectory." ) returnraw_datasets.LocalJsonFileDataset(output_path,seed,local_rank, dataset_name,chat_path) else: #如果dataset_name沒有在以上的所有條件中匹配到,那么函數也會拋出一個RuntimeError異常,表示沒有為這個數據集的配置。 raiseRuntimeError( f"Wedonothaveconfigsfordataset{dataset_name},butyoucanadditbyyourselfinraw_datasets.py." )
再看下 get_shuffle_idx函數:
#這個函數的作用是生成一個大小為size的亂序索引數組,它接受兩個參數:seed和size。 defget_shuffle_idx(seed,size): np_rng=np.random.RandomState(seed=seed)#創建一個NumPy的隨機狀態生成器對象np_rng,seed是隨機種子,確定了隨機數的生成序列。 dtype_=np.uint32#設置其為NumPy的uint32類型,這是一個無符號32位整數類型。 ifsize>=(np.iinfo(np.uint32).max-1):#如果size大于或等于uint32的最大值減一,這里減一是為了防止可能的溢出。 dtype_=np.int64#則將dtype_改為int64,這是一個64位的有符號整數類型。 shuffle_idx=np.arange(start=0,stop=size,step=1,dtype=dtype_)#創建一個由0開始,步長為1,到size結束(不包含size),并且數據類型為dtype_的等差數列,將其賦值給shuffle_idx。 np_rng.shuffle(shuffle_idx)#使用np_rng隨機狀態生成器對shuffle_idx進行隨機排列,這樣就打亂了shuffle_idx的順序。 returnshuffle_idx#返回亂序后的shuffle_idx。
接著解析get_raw_dataset_split_index函數:
#這個函數主要是根據提供的參數分割數據集,并生成一個分割索引。 #它首先檢查索引文件是否存在,如果不存在,則生成分割索引,并保存到文件。 #然后,它從文件中加載索引,并返回索引列表。 #它接受8個參數:local_rank、輸出路徑、數據集名稱、種子、split_name、data_split、split_index和數據大小。 defget_raw_dataset_split_index(local_rank,output_path,dataset_name,seed, split_name,data_split,split_index, data_size): #根據給定參數生成索引文件名。 index_file_name=f"{output_path}/{dataset_name}_seed{seed}_{split_name}_{data_split}_{split_index}.npy" #reindexeachtimewhenusinglocaljsonfilesinceit'smorelikelytogetmodified #如果索引文件不存在,或者數據集名為'jsonfile',則執行下面的操作。 if(notos.path.isfile(index_file_name))or(dataset_name=='jsonfile'): splits=[float(s)forsindata_split.split(',')]#將data_split(以逗號分隔的字符串)分割為浮點數列表。 splits_sum=sum(splits)#計算所有分割的總和。 splits=[split/splits_sumforsplitinsplits]#將每個分割值歸一化,使其和為1。 splits_index=[0]#初始化分割索引列表,起始值為0 forindex,splitinenumerate(splits):#遍歷分割列表。 #將新的索引(當前索引加上歸一化的分割值與數據大小的乘積)添加到索引列表。 splits_index.append(splits_index[index]+ int(round(split*float(data_size)))) #計算最后一個索引與數據大小的差值。 diff=splits_index[-1]-data_size #遍歷除第一個外的所有索引。 forindexinrange(1,len(splits_index)): #將差值從每個索引中減去,以確保最后一個索引等于數據大小。 splits_index[index]-=diff #斷言最后一個索引等于數據大小 assertsplits_index[-1]==data_size #生成一個亂序的索引。 shuffle_idx=get_shuffle_idx(seed,data_size) forsplit_iinrange(len(splits)):#遍歷每個分割。 #根據給定參數生成亂序索引分割文件名。 shuffle_idx_split_file_name=f"{output_path}/{dataset_name}_seed{seed}_{split_name}_{data_split}_{split_i}.npy" #提取亂序索引的一個分割。 shuffle_idx_split=shuffle_idx[ splits_index[split_i]:splits_index[split_i+1]] #將亂序索引分割保存到文件。 np.save(shuffle_idx_split_file_name, shuffle_idx_split, allow_pickle=True) #加載索引文件。 index=np.load(index_file_name,allow_pickle=True) #將索引數組轉換為列表并返回。 returnindex.tolist()
接下來解析一下繼承自Dataset的PromptDataset類:
#這是一個自定義的PromptDataset類,它繼承自torch.utils.data.Dataset。 #這是一個數據集類,通常被用于PyTorch中數據的加載和預處理。 classPromptDataset(Dataset): #類的構造函數,它接受五個參數:prompt_dataset、chosen_dataset、reject_dataset、pad_token_id和train_phase。 def__init__(self,prompt_dataset,chosen_dataset,reject_dataset, pad_token_id,train_phase)->None: super().__init__()#調用父類torch.utils.data.Dataset的構造函數。 self.prompt_dataset=prompt_dataset#將傳入的參數賦值給類的成員變量。 self.chosen_dataset=chosen_dataset self.reject_dataset=reject_dataset self.pad_token_id=pad_token_id self.train_phase=train_phase def__len__(self):#定義類的__len__方法,它返回數據集的長度。這是PyTorch數據集的必要方法。 length=len(self.chosen_dataset)#初始設定數據集長度為chosen_dataset的長度。 ifself.train_phase==3: length=len(self.prompt_dataset)#如果訓練階段為3,則數據集長度設定為prompt_dataset的長度。 returnlength#返回計算得出的數據集長度。 #定義類的__getitem__方法,它接受一個參數idx,返回索引idx處的數據。這是PyTorch數據集的必要方法。 def__getitem__(self,idx): #如果訓練階段為1,則返回一個字典,包含input_ids、attention_mask和labels,它們都來自chosen_dataset的索引idx處。 ifself.train_phase==1: return{ "input_ids":self.chosen_dataset[idx]["input_ids"], "attention_mask":self.chosen_dataset[idx]["attention_mask"], "labels":self.chosen_dataset[idx]["input_ids"] } #如果訓練階段為2,則返回來自chosen_dataset和reject_dataset的input_ids和attention_mask。 elifself.train_phase==2: returnself.chosen_dataset[idx]["input_ids"],self.chosen_dataset[idx]["attention_mask"], self.reject_dataset[idx]["input_ids"],self.reject_dataset[idx]["attention_mask"] #如果訓練階段為3,則返回來自prompt_dataset的input_ids、attention_mask和pad_token_id elifself.train_phase==3: returnself.prompt_dataset[idx]["input_ids"],self.prompt_dataset[idx]["attention_mask"], self.pad_token_id
接著再解析一下create_dataset_split函數:
#這是一個名為create_dataset_split的函數,它的功能是根據給定的訓練階段(train_phase),創建并返回相應的數據集分割。 #具體來說,它為每個訓練階段生成不同的數據集列表,并將它們放入PromptDataset對象中。 #函數接受6個參數:當前數據集(current_dataset)、原始數據集(raw_dataset)、訓練階段(train_phase)、 #分詞器(tokenizer)、會話結束標記(end_of_conversation_token)和最大序列長度(max_seq_len)。 defcreate_dataset_split(current_dataset,raw_dataset,train_phase,tokenizer, end_of_conversation_token,max_seq_len): #創建三個空的列表,用于存儲對話提示(prompt_dataset)、選定的對話(chosen_dataset)和被拒絕的對話(reject_dataset)。 prompt_dataset=[] chosen_dataset=[] reject_dataset=[] #如果訓練階段為1,則將接受的對話進行分詞并添加到chosen_dataset中。 iftrain_phase==1: #遍歷當前數據集。 fori,tmp_datainenumerate(current_dataset): #tokenizethetext #從原始數據集中獲取對話提示和接受的對話。 chosen_sentence=raw_dataset.get_prompt_and_chosen( tmp_data)#theacceptresponse #如果接受的對話不為空,則將其分詞并添加到chosen_dataset中。 ifchosen_sentenceisnotNone: chosen_sentence+=end_of_conversation_token chosen_token=tokenizer(chosen_sentence, max_length=max_seq_len, padding="max_length", truncation=True, return_tensors="pt") chosen_token["input_ids"]=chosen_token["input_ids"].squeeze( 0) chosen_token["attention_mask"]=chosen_token[ "attention_mask"].squeeze(0) chosen_dataset.append(chosen_token) #如果訓練階段為2,則將接受和被拒絕的對話都進行分詞并分別添加到chosen_dataset和reject_dataset中。 eliftrain_phase==2: fori,tmp_datainenumerate(current_dataset): #tokenizethetext chosen_sentence=raw_dataset.get_prompt_and_chosen( tmp_data)#theacceptresponse reject_sentence=raw_dataset.get_prompt_and_rejected( tmp_data)#theacceptresponse ifchosen_sentenceisnotNoneandreject_sentenceisnotNone: chosen_sentence+=end_of_conversation_token#theacceptresponse reject_sentence+=end_of_conversation_token chosen_token=tokenizer(chosen_sentence, max_length=max_seq_len, padding="max_length", truncation=True, return_tensors="pt") reject_token=tokenizer(reject_sentence, max_length=max_seq_len, padding="max_length", truncation=True, return_tensors="pt") chosen_token["input_ids"]=chosen_token["input_ids"] chosen_token["attention_mask"]=chosen_token["attention_mask"] chosen_dataset.append(chosen_token) reject_token["input_ids"]=reject_token["input_ids"] reject_token["attention_mask"]=reject_token["attention_mask"] reject_dataset.append(reject_token) #如果訓練階段為3,則將對話提示進行分詞并添加到prompt_dataset中。 eliftrain_phase==3: fori,tmp_datainenumerate(current_dataset): #tokenizethetext prompt=raw_dataset.get_prompt(tmp_data) ifpromptisnotNone: prompt_token=tokenizer(prompt,return_tensors="pt") prompt_token["input_ids"]=prompt_token["input_ids"] prompt_token["attention_mask"]=prompt_token["attention_mask"] forkey_wordin["input_ids","attention_mask"]: length=prompt_token[key_word].size()[-1] iflength>max_seq_len: y=prompt_token[key_word].squeeze(0)[length- (max_seq_len- 1):].flip(0) else: y=prompt_token[key_word].squeeze(0).flip(0) prompt_token[key_word]=y prompt_dataset.append(prompt_token) #創建一個新的PromptDataset對象,并返回。這個對象包含了對話提示、接受的對話和被拒絕的對話的數據集,以及分詞器的填充標記ID和訓練階段。 returnPromptDataset(prompt_dataset,chosen_dataset,reject_dataset, tokenizer.pad_token_id,train_phase)
接下來再解析一下create_dataset這個函數:
這段代碼定義了一個函數create_dataset,主要負責創建訓練數據集和評估數據集,具體的功能細節如下: defcreate_dataset(local_rank,dataset_name,data_split,output_path, train_phase,seed,tokenizer,end_of_conversation_token, max_seq_len): #調用get_raw_dataset函數,該函數根據提供的數據集名稱、輸出路徑、隨機種子和local_rank等參數,從各種預定義的數據集中獲取所需的原始數據集。 raw_dataset=get_raw_dataset(dataset_name,output_path,seed,local_rank) train_dataset=raw_dataset.get_train_data()#從原始數據集中獲取訓練數據。 #獲取訓練數據集的索引,涉及數據的分割。 train_index=get_raw_dataset_split_index(local_rank,output_path, raw_dataset.dataset_name_clean, seed,"train",data_split, train_phase-1, len(train_dataset)) #根據上一步獲取的索引,創建訓練數據的子集。 train_dataset=Subset(train_dataset,train_index) #調用create_dataset_split函數對上一步獲得的數據子集進行進一步處理, #這可能包括對文本的標記化(tokenization),并且創建一個PromptDataset對象。 train_dataset=create_dataset_split(train_dataset,raw_dataset, train_phase,tokenizer, end_of_conversation_token, max_seq_len) #是用于創建評估數據集的,步驟與訓練數據集的創建基本相同。 eval_dataset=raw_dataset.get_eval_data() eval_index=get_raw_dataset_split_index(local_rank,output_path, raw_dataset.dataset_name_clean, seed,"eval", data_split,train_phase-1, len(eval_dataset)) eval_dataset=Subset(eval_dataset,eval_index) eval_dataset=create_dataset_split(eval_dataset,raw_dataset,train_phase, tokenizer,end_of_conversation_token, max_seq_len) returntrain_dataset,eval_dataset
這里需要額外注意一點,create_dataset_split 這個函數傳入的參數是 train_phase,而不是像 get_raw_dataset_split_index 函數那樣傳入 train_phase-1,這是為什么?這是因為train_phase用來標識在模型訓練過程中的哪個階段,是一個直接的指示符,與數據分割或索引無關。它用來在create_dataset_split函數中確定當前處于哪個訓練階段,并根據不同的訓練階段對數據進行不同的處理。而在調用get_raw_dataset_split_index函數時,傳入的是train_phase - 1,這是因為在這個函數中,我們需要根據當前訓練階段的前一個階段(由train_phase - 1表示)的數據分割或索引情況,來決定如何對當前階段的數據進行分割或索引。create_dataset_split主要關注如何根據訓練階段對數據進行處理,而get_raw_dataset_split_index主要關注如何根據前一個訓練階段的數據分割或索引情況,對當前階段的數據進行分割或索引。
回收本節的開頭,我們解析create_prompt_dataset函數:
#這個函數的主要目的是創建一個包含訓練和評估數據集的“提示”數據集,并將這兩個數據集保存在指定的文件中。具體來說: defcreate_prompt_dataset(local_rank, data_path, data_split, output_path, train_phase, seed, tokenizer, max_seq_len, end_of_conversation_token="<|endoftext|>", sft_only_data_path=[], reload=False): """ Createsthepromptdataset """ #os.makedirs(output_path,exist_ok=True):創建輸出目錄,如果目錄已經存在則不會引發異常。 os.makedirs(output_path,exist_ok=True) #構造文件名,這個文件名包含了很多有關數據集和模型的信息,如數據路徑、數據分割、訓練階段、 #隨機種子、tokenizer的名稱、最大序列長度等。然后將這個文件名哈希化,以避免文件名過長。 fname="_".join(data_path) sft_cache_key="_".join(sft_only_data_path) tokenizer_name=tokenizer.init_kwargs["name_or_path"].replace("/","_") fname=f"{fname}_split{data_split}_phase{train_phase}_seed{seed}_tokenizer{tokenizer_name}_seqlen{max_seq_len}_sft{sft_cache_key}" fname="_".join(fname.split("/")) fname=hashlib.sha256(fname.encode()).hexdigest( )#hashthefilenametoavoidtoolongfilename #構造訓練數據集和評估數據集的文件路徑。 train_fname=f"{output_path}/traindata_{fname}.pt" eval_fname=f"{output_path}/evaldata_{fname}.pt" #檢查訓練數據集和評估數據集的文件是否都已經存在,如果存在,則表示緩存已經找到,否則表示需要創建緩存。 cache_found=os.path.isfile(train_fname)andos.path.isfile(eval_fname) buf_create_cache=torch.ByteTensor([notcache_found]).cuda() torch.distributed.all_reduce(buf_create_cache) #如果當前進程是主進程(local_rank<=?0)并且需要創建緩存,就執行以下操作。 ????if?local_rank?<=?0?and?(buf_create_cache.item()?!=?0?or?reload): ????????#?如果只有一個數據集,直接調用create_dataset函數創建訓練數據集和評估數據集。 ????????if?len(data_path)?==?1:??#?Single?dataset. ????????????train_dataset,?eval_dataset?=?create_dataset( ????????????????local_rank,?data_path[0],?data_split,?output_path,?train_phase, ????????????????seed,?tokenizer,?end_of_conversation_token,?max_seq_len) ????????else:??#?Blending?datasets. ????????????#?如果有多個數據集,對每個數據集都調用create_dataset函數,并把得到的訓練數據集和評估數據集添加到對應的列表中, ????????????train_datasets?=?[] ????????????eval_datasets?=?[] ????????????train_size?=?0 ????????????eval_size?=?0 ????????????for?d_path?in?data_path: ????????????????train_dataset,?eval_dataset?=?create_dataset( ????????????????????local_rank,?d_path,?data_split,?output_path,?train_phase, ????????????????????seed,?tokenizer,?end_of_conversation_token,?max_seq_len) ????????????????train_datasets.append(train_dataset) ????????????????eval_datasets.append(eval_dataset) ????????????????train_size?+=?len(train_dataset) ????????????????eval_size?+=?len(eval_dataset) ????????????#?然后使用ConcatDataset和Subset函數合并數據集。 ????????????train_dataset?=?ConcatDataset(train_datasets) ????????????shuffle_idx?=?get_shuffle_idx(seed,?train_size) ????????????train_dataset?=?Subset(train_dataset,?shuffle_idx.tolist()) ????????????eval_dataset?=?ConcatDataset(eval_datasets) ????????????shuffle_idx?=?get_shuffle_idx(seed,?eval_size) ????????????eval_dataset?=?Subset(eval_dataset,?shuffle_idx.tolist()) ????????#?Append?the?SFT-only?dataset?if?it?exists,?and?current?phase?is?1(SFT). ????????#?如果當前是第一階段的訓練(SFT)并且指定了僅用于SFT的數據集,那么對這些數據集執行類似的操作, ????????#?然后把得到的訓練數據集和評估數據集添加到原有的數據集中。 ????????if?train_phase?==?1?and?sft_only_data_path: ????????????sft_train_datasets?=?[] ????????????sft_eval_datasets?=?[] ????????????sft_train_size?=?0 ????????????sft_eval_size?=?0 ????????????for?sft_path?in?sft_only_data_path: ????????????????sft_train_dataset,?sft_eval_dataset?=?create_dataset( ????????????????????local_rank, ????????????????????sft_path, ????????????????????"10,0,0", ????????????????????output_path, ????????????????????train_phase, ????????????????????seed, ????????????????????tokenizer, ????????????????????end_of_conversation_token, ????????????????????max_seq_len, ????????????????) ????????????????sft_train_datasets.append(sft_train_dataset) ????????????????sft_eval_datasets.append(sft_eval_dataset) ????????????????sft_train_size?+=?len(sft_train_dataset) ????????????????sft_eval_size?+=?len(sft_eval_dataset) ????????????if?sft_train_datasets:??#?Check?if?sft_train_datasets?is?not?empty ????????????????sft_train_dataset?=?ConcatDataset(sft_train_datasets) ????????????????train_dataset?=?ConcatDataset( ????????????????????[train_dataset,?sft_train_dataset]) ????????????????shuffle_idx?=?get_shuffle_idx(seed,?len(train_dataset)) ????????????????train_dataset?=?Subset(train_dataset,?shuffle_idx.tolist()) ????????????if?sft_eval_datasets:??#?Check?if?sft_eval_datasets?is?not?empty ????????????????sft_eval_dataset?=?ConcatDataset(sft_eval_datasets) ????????????????eval_dataset?=?ConcatDataset([eval_dataset,?sft_eval_dataset]) ????????????????shuffle_idx?=?get_shuffle_idx(seed,?len(eval_dataset)) ????????????????eval_dataset?=?Subset(eval_dataset,?shuffle_idx.tolist()) ????????#?把訓練數據集和評估數據集保存到對應的文件中。 ????????torch.save(train_dataset,?train_fname) ????????torch.save(eval_dataset,?eval_fname) ????#?在多進程環境中,確保所有進程都完成了數據集的保存操作。 ????torch.distributed.barrier() ????return?torch.load(train_fname),?torch.load(eval_fname)
工具函數解析
這里解析一下下面導入的代碼,在DeepSpeedExamples/applications/DeepSpeed-Chat/training/utils/utils.py這個路徑下:
fromutils.utilsimportprint_rank_0,to_device,save_hf_format,set_random_seed,get_all_reduce_mean,get_optimizer_grouped_parameters,save_zero_three_model,load_hf_tokenizer
print_rank_0
#在rank0也就是masterrank打印信息,防止每個機器或GPU都打印消息造成大量重復信息 defprint_rank_0(msg,rank=0): ifrank<=?0: ????????print(msg)
to_device
#這個函數的作用是把一個數據批次移動到指定的設備上。在PyTorch中,數據默認是在CPU上的, #如果要在GPU上進行運算,就需要先把數據移到GPU上。這個函數通過遍歷批次中的所有元素并 #調用to(device)方法來實現這一點。如果某個元素不能被移到指定的設備上 #(例如,這個元素是一個字符串或者其他不能在GPU上使用的類型),那么就直接保留這個元素,不進行任何操作。 defto_device(batch,device): output={} fork,vinbatch.items(): try: output[k]=v.to(device) except: output[k]=v returnoutput
save_hf_format
#這段代碼的功能是將訓練好的模型以HuggingFace格式保存,這樣之后就可以使用HuggingFace庫的from_pretrained方法加載了。 defsave_hf_format(model,tokenizer,args,sub_folder=""): #usedtosavehuggingfaceformat,sowecanuseitforhf.from_pretrained #首先,這行代碼檢查模型是否有'module'這個屬性。這在PyTorch中是一個常見的模式, #當模型被封裝在torch.nn.DataParallel或torch.nn.parallel.DistributedDataParallel時, #模型的所有屬性都會被存儲在'module'屬性中。所以這行代碼的目的是確保我們總是在原始模型上進行操作,而不是并行化的包裝器。 model_to_save=model.moduleifhasattr(model,'module')elsemodel #定義模型的配置和權重的文件名。 CONFIG_NAME="config.json" WEIGHTS_NAME="pytorch_model.bin" #定義模型保存的目錄路徑。 output_dir=os.path.join(args.output_dir,sub_folder) #如果模型保存的目錄不存在,那么創建這個目錄。 os.makedirs(output_dir,exist_ok=True) #定義模型權重和配置的保存路徑。 output_model_file=os.path.join(output_dir,WEIGHTS_NAME) output_config_file=os.path.join(output_dir,CONFIG_NAME) #獲取模型的狀態字典,這個字典包含了模型的所有權重。 save_dict=model_to_save.state_dict() #遍歷狀態字典中的所有鍵。這里使用list(save_dict.keys())而不是直接使用 #save_dict.keys()是因為我們在遍歷過程中會修改字典,直接遍歷字典的鍵在Python中會導致錯誤。 forkeyinlist(save_dict.keys()): #if"lora"inkey::如果鍵中包含"lora",那么刪除這個鍵值對。 #"lora"可能是某種特定的模型組件或參數,這里將其排除在保存的模型權重之外。 if"lora"inkey: delsave_dict[key] #將處理過的狀態字典保存到文件中。 torch.save(save_dict,output_model_file) #將模型的配置保存為JSON文件。 model_to_save.config.to_json_file(output_config_file) #將分詞器的詞匯表保存到指定的目錄。 tokenizer.save_vocabulary(output_dir)
set_random_seed
#這段代碼的目的是為所有可能用到的隨機數生成器設定種子,以確保實驗結果的可復現性。 #隨機數生成器的種子是生成隨機數序列的初始值,當我們用同一個種子初始化隨機數生成器時,它會生成完全一樣的隨機數序列。 defset_random_seed(seed): #首先檢查種子是否是None。如果是None,那么就跳過這段代碼,隨機數生成器將會用一個隨機的種子。 ifseedisnotNone: set_seed(seed)#這一行調用transformers庫的set_seed的函數,將種子設定為指定的值。 random.seed(seed)#將Python內置的隨機數生成器的種子設定為指定的值。 np.random.seed(seed)#將NumPy的隨機數生成器的種子設定為指定的值。 torch.manual_seed(seed)#將PyTorch的隨機數生成器的種子設定為指定的值。 torch.cuda.manual_seed_all(seed)#將PyTorch的所有GPU隨機數生成器的種子設定為指定的值。
get_all_reduce_mean
#這段代碼是在分布式訓練環境中進行平均值計算的函數,通過這段代碼, #所有的處理器(或者叫節點)上的同一個tensor都會被加和起來,然后除以總的處理器數,得到平均值。 defget_all_reduce_mean(tensor): #這行代碼執行一個分布式的reduce操作。reduce操作是指所有處理器中的同一個tensor都被某種方式結合起來。 #在這個例子中,torch.distributed.ReduceOp.SUM表示所有處理器上的tensor將被加和起來。 #加和的結果會在所有處理器上都可用。 torch.distributed.all_reduce(tensor,op=torch.distributed.ReduceOp.SUM) #這行代碼將前一步得到的加和結果除以處理器的數量(也叫作worldsize)。 #這樣,tensor就變成了所有處理器上原始tensor的平均值。 tensor=tensor/torch.distributed.get_world_size() #最后,這個平均值tensor被返回。在所有處理器上,這個函數返回的tensor都是相同的, #等于所有處理器上原始tensor的平均值。 returntensor
get_optimizer_grouped_parameters
#這段代碼的作用是將模型中的參數分組以便于在優化器中使用。它將模型參數分為兩組: #一組需要進行權重衰減(L2正則化)的參數,另一組不需要進行權重衰減的參數。 defget_optimizer_grouped_parameters(model, weight_decay, no_decay_name_list=[ "bias","LayerNorm.weight" ]): #它定義了一個列表optimizer_grouped_parameters,其中包含兩個字典。每個字典都對應一個參數組,包含"params"和"weight_decay"這兩個關鍵字。 optimizer_grouped_parameters=[ #在第一個字典中,它從模型參數中選出那些名稱不包含"bias"或"LayerNorm.weight" #且需要求梯度的參數。這些參數在優化過程中會應用weight_decay作為權重衰減項。 { "params":[ pforn,pinmodel.named_parameters() if(notany(ndinn forndinno_decay_name_list)andp.requires_grad) ], "weight_decay": weight_decay, }, #在第二個字典中,它選出那些名稱包含"bias"或"LayerNorm.weight"且需要求梯度的參數。 #這些參數在優化過程中不會應用權重衰減,即其"weight_decay"值為0。 { "params":[ pforn,pinmodel.named_parameters() if(any(ndinn forndinno_decay_name_list)andp.requires_grad) ], "weight_decay": 0.0, }, ] returnoptimizer_grouped_parameters
這種參數的分組策略是很常見的。比如在訓練Transformer模型時,通常會為權重和偏置項設定不同的學習策略。這是因為權重衰減對于防止過擬合很有幫助,但對于某些參數(如偏置項或者層歸一化的權重)可能會導致性能下降,因此常常會排除這些參數不進行權重衰減。
save_zero_three_model
#這個函數的主要功能是篩選出那些在DeepSpeedZero3優化中被離線存儲,但在當前還未獲取的參數。 #在DeepSpeedZero3優化中,一些模型參數在使用過后會被離線存儲,以此釋放GPU顯存。 #當這些參數需要再次被使用時,需要先獲取到本地。 def_z3_params_to_fetch(param_list): #這個條件語句判斷一個參數是否是被DeepSpeedZero3優化過的,且其狀態為"未獲取"(NOT_AVAILABLE)。 #對于被DeepSpeedZero3優化過的參數,它們有一個ds_id屬性和一個ds_status屬性,其中ds_status表示參數的當前狀態。 return[ pforpinparam_list ifhasattr(p,'ds_id')andp.ds_status==ZeroParamStatus.NOT_AVAILABLE ] #這個函數的主要作用是保存一個使用了DeepSpeedZero優化(可能為stage3)的模型。 #DeepSpeed的Zero優化技術是為了解決模型參數、優化器狀態和梯度等內存占用問題, #通過這種方式,可以訓練比當前GPU內存更大的模型。 defsave_zero_three_model(model_ema,global_rank,save_dir,zero_stage=0): #首先,檢查輸入的zero_stage是否為3,確定是否使用了DeepSpeedZero階段3優化。 zero_stage_3=(zero_stage==3) #然后,確保保存模型的目錄存在。 os.makedirs(save_dir,exist_ok=True) #定義模型權重保存的完整路徑。 WEIGHTS_NAME="pytorch_model.bin" output_model_file=os.path.join(save_dir,WEIGHTS_NAME) #如果模型是被包裹在其它結構(如DataParallel或DistributedDataParallel)中的,我們需要取出真實的模型實例。 model_to_save=model_ema.moduleifhasattr(model_ema, 'module')elsemodel_ema ifnotzero_stage_3: #如果沒有使用Zero階段3優化,直接使用PyTorch的torch.save函數保存模型狀態。 ifglobal_rank==0: torch.save(model_to_save.state_dict(),output_model_file) else: #如果使用了Zero階段3優化,因為模型的部分參數和優化器狀態在不同的設備上,所以需要先將它們收集起來。 output_state_dict={} fork,vinmodel_to_save.named_parameters(): ifhasattr(v,'ds_id'): #deepspeed.zero.GatheredParameters是DeepSpeed提供的一個上下文管理器, #它可以將分布在多個設備上的參數收集到一起。這部分參數保存在CPU上。 withdeepspeed.zero.GatheredParameters(_z3_params_to_fetch([v ]), enabled=zero_stage_3): v_p=v.data.cpu() else: v_p=v.cpu() #然后,將收集好的參數(并且不包含“lora”關鍵字的參數)添加到輸出狀態字典中。 ifglobal_rank==0and"lora"notink: output_state_dict[k]=v_p #最后,再使用torch.save函數保存模型狀態。 ifglobal_rank==0: torch.save(output_state_dict,output_model_file) #同時為了節省內存,使用del關鍵字刪除了存儲參數的字典。 deloutput_state_dict
load_hf_tokenizer
#這個函數的目標是加載一個HuggingFacetokenizer,這個tokenizer是用來將文本數據轉化為模型可以接受的形式的。 #model_name_or_path:這個參數可以是一個模型名字或者一個本地路徑,指示從哪里獲取tokenizer。 #fast_tokenizer=True:這個參數指示是否使用HuggingFace的"fast"tokenizer。"fast"tokenizer是用Rust編寫的,運行速度更快。 defload_hf_tokenizer(model_name_or_path,fast_tokenizer=True): #這行代碼檢查model_name_or_path是否是一個存在的路徑。如果是,那么就嘗試從本地文件系統中加載tokenizer。 ifos.path.exists(model_name_or_path): #Locallytokenizerloadinghassomeissue,soweneedtoforcedownload #這行代碼拼接路徑,嘗試找到模型的config文件。 model_json=os.path.join(model_name_or_path,"config.json") #這行代碼檢查config文件是否存在。如果存在,那么就嘗試從config文件中獲取模型名字。 ifos.path.exists(model_json): model_json_file=json.load(open(model_json))#這行代碼加載config文件。 model_name=model_json_file["_name_or_path"]#這行代碼從config文件中獲取模型名字。 tokenizer=AutoTokenizer.from_pretrained(model_name, fast_tokenizer=True)#這行代碼使用模型名字從HuggingFace的模型庫中下載并加載tokenizer。 else:#如果model_name_or_path不是一個存在的路徑,那么就認為它是一個模型名字。 tokenizer=AutoTokenizer.from_pretrained(model_name_or_path, fast_tokenizer=True)#這行代碼使用模型名字從HuggingFace的模型庫中下載并加載tokenizer。 returntokenizer
再解析一下DeepSpeedExamples/applications/DeepSpeed-Chat/training/utils/ds_utils.py導入的get_train_ds_config函數:
#DeepSpeedTeam GLOBAL_BATCH_SIZE=32 MICRO_BATCH_SIZE=4 #這個函數的主要目標是生成一個字典,用來配置DeepSpeed訓練模型的各種參數。 #DeepSpeed是一個用于加速大規模模型訓練的庫,其中包括一種叫做ZeRO(ZeroRedundancyOptimizer)的優化技術, #可以大大減少模型訓練過程中的顯存和內存占用。 defget_train_ds_config(offload, stage=2, enable_hybrid_engine=False, inference_tp_size=1, release_inference_cache=False, pin_parameters=True, tp_gather_partition_size=8, max_out_tokens=512): #這行代碼首先定義了一個變量device。如果參數offload為True,那么就將device設置為"cpu", #表示模型的參數和優化器狀態將被"offload"到CPU內存中,以減少GPU顯存的占用。 #如果offload為False,那么device就被設置為"none",表示不進行offload。 device="cpu"ifoffloadelse"none" #這行代碼定義了一個字典zero_opt_dict,包含了ZeRO優化器的各種配置參數。 zero_opt_dict={ "stage":stage,#ZeRO的階段,可以為0、1、2、3,階段越高,顯存優化效果越好,但計算復雜度也越高。 "offload_param":{#offload_param和offload_optimizer這兩個字段控制是否將模型的參數和優化器狀態offload到CPU內存。 "device":device }, "offload_optimizer":{ "device":device }, #這三個字段只有在ZeROstage3時有效,用于控制參數offloading的細節。 "stage3_param_persistence_threshold":1e4, "stage3_max_live_parameters":3e7, "stage3_prefetch_bucket_size":3e7, #這個字段控制是否使用顯存更高效的線性層實現。由于當前的顯存高效實現可能會導致數值不穩定,所以這里默認設為False。 "memory_efficient_linear":False } #返回一個配置字典,這個字典包含了訓練模型的各種參數。 return{ "train_batch_size":GLOBAL_BATCH_SIZE,#全局的批次大小 "train_micro_batch_size_per_gpu":MICRO_BATCH_SIZE,#每個GPU的微批次大小。 "steps_per_print":10,#每隔10步打印一次訓練信息。 "zero_optimization":zero_opt_dict,#之前定義的ZeRO配置字典。 "fp16":{#控制是否使用混合精度訓練,以及lossscaling的窗口大小。 "enabled":True, "loss_scale_window":100 }, "gradient_clipping":1.0,#梯度裁剪的閾值。 "prescale_gradients":False,#是否在梯度累計之前就進行梯度縮放,通常用于防止梯度下溢。 "wall_clock_breakdown":False,#是否進行每步訓練時間的詳細分析。 "hybrid_engine":{ "enabled":enable_hybrid_engine, "max_out_tokens":max_out_tokens, "inference_tp_size":inference_tp_size, "release_inference_cache":release_inference_cache, "pin_parameters":pin_parameters, "tp_gather_partition_size":tp_gather_partition_size, } }
再解析一下DeepSpeedExamples/applications/DeepSpeed-Chat/training/utils/module/lora.py導入的convert_linear_layer_to_lora, convert_lora_to_linear_layer, only_optimize_lora_parameters 這幾個函數。
在解析這幾個函數之前需要先解析一下 LinearLayer_LoRA 這個類:
#LinearLayer_LoRA是一個實現了LoRA(Low-RankAdaptation)策略的PyTorch模塊。 #LoRA是一種在大型預訓練模型(如GPT-3或者BERT)微調中使用的策略,可以提高微調的效果。 #這個類是一個簡單的LoRA實現,只支持線性層。 classLinearLayer_LoRA(nn.Module): #ansimpleimplementationofLoRA #fornowonlysupportLinearLayer #初始化函數,定義了一些參數,如權重(weight)、LoRA維度(lora_dim)、 #LoRA縮放因子(lora_scaling)、LoRAdropout(lora_droppout)以及偏置(bias)。 def__init__(self, weight, lora_dim=0, lora_scaling=1, lora_droppout=0, bias=None): super(LinearLayer_LoRA,self).__init__() self.weight=weight self.bias=bias #在初始化過程中,首先檢查lora_dim是否大于0,如果不是,就拋出異常。 iflora_dim<=?0: ????????????raise?ValueError( ????????????????"You?are?training?to?use?LoRA,?whose?reduced?dim?should?be?larger?than?1" ????????????) ????????#?然后嘗試獲取權重的形狀,如果權重是在?ZeRO?階段3中的參數,那么通過?ds_shape?屬性獲取形狀; ????????#?否則直接使用?shape?屬性。 ????????try: ????????????#?for?zero?stage?3 ????????????rows,?columns?=?weight.ds_shape ????????except: ????????????rows,?columns?=?weight.shape ????????#?然后初始化?LoRA?的左右兩個權重矩陣,并設置它們為可學習的參數。 ????????self.lora_right_weight?=?nn.Parameter(torch.zeros( ????????????columns, ????????????lora_dim))??#?apply?transpose?so?in?forward?we?do?not?need?to ????????self.lora_left_weight?=?nn.Parameter(torch.zeros(lora_dim,?rows)) ????????self.lora_scaling?=?lora_scaling?/?lora_dim ????????#?如果?lora_droppout?大于0,則創建一個?Dropout?層;否則創建一個?Identity?層。 ????????if?lora_droppout?>0: self.lora_dropout=nn.Dropout(lora_droppout) else: self.lora_dropout=nn.Identity() #然后調用reset_parameters方法對LoRA權重進行初始化。 self.reset_parameters() #disabletheoriginalweightgradient #最后,關閉原始權重的梯度,設置LoRA融合標志位為False。 self.weight.requires_grad=False #fuseLoRAtotheoriginalweight self.fuse_lora=False defeval(self): self.lora_dropout.eval()#將模型設置為評估模式,這時候Dropout層會停止工作。 #self.fuse_lora_weight() deftrain(self,mode=True): self.lora_dropout.train(mode)#將模型設置為訓練模式,這時候Dropout層會開始工作。 #self.unfuse_lora_weight() defreset_parameters(self):#初始化LoRA權重的方法。右權重使用kaiming均勻分布進行初始化,左權重初始化為全0。 nn.init.kaiming_uniform_(self.lora_right_weight,a=math.sqrt(5)) nn.init.zeros_(self.lora_left_weight) #fuse_lora_weight(self)和unfuse_lora_weight(self): #這兩個方法用于將LoRA權重融合到原始權重中,或者從原始權重中解融合。 #融合操作實質上是將原始權重與LoRA權重的乘積(縮放后)相加。 deffuse_lora_weight(self): ifnotself.fuse_lora: self.weight.data+=self.lora_scaling*torch.matmul( self.lora_left_weight.t(),self.lora_right_weight.t()) self.fuse_lora=True defunfuse_lora_weight(self): ifself.fuse_lora: self.weight.data-=self.lora_scaling*torch.matmul( self.lora_left_weight.t(),self.lora_right_weight.t()) self.fuse_lora=False #前向傳播函數。如果LoRA權重已融合,則直接對輸入進行線性變換; #否則,會額外計算一個LoRA項,該項是輸入通過Dropout層,然后與LoRA權重相乘得到的。 defforward(self,input): ifself.fuse_lora: returnF.linear(input,self.weight,self.bias) else: returnF.linear( input,self.weight, self.bias)+(self.lora_dropout(input)@self.lora_right_weight @self.lora_left_weight)*self.lora_scaling
convert_linear_layer_to_lora
#這個函數convert_linear_layer_to_lora是用來將模型中的線性層轉換為LoRA層的。 #在訓練深度學習模型時,這種方法能夠在保持預訓練模型參數不變的同時,通過添加額外的參數來微調模型。 #convertthelinearlayertoLoRA defconvert_linear_layer_to_lora(model, part_module_name, lora_dim=0, lora_scaling=1, lora_droppout=0): repalce_name=[] #函數首先遍歷模型中的所有模塊(model.named_modules()),找出名稱中包含part_module_name的線性層(nn.Linear), #并將這些層的名稱添加到repalce_name列表中。 forname,moduleinmodel.named_modules(): ifisinstance(module,nn.Linear)andpart_module_nameinname: repalce_name.append(name) #然后,函數遍歷repalce_name列表,使用recursive_getattr函數獲取模型中對應名稱的模塊。 #這些模塊是需要被替換成LoRA層的線性層。 fornameinrepalce_name: module=recursive_getattr(model,name) #對于每一個需要被替換的模塊,函數創建一個LinearLayer_LoRA實例tmp, #并將其傳輸到與原始模塊相同的設備和數據類型上。創建LinearLayer_LoRA實例時, #需要傳入原始模塊的權重、偏置以及LoRA層的一些參數,如lora_dim、lora_scaling和lora_droppout。 tmp=LinearLayer_LoRA( module.weight,lora_dim,lora_scaling,lora_droppout, module.bias).to(module.weight.device).to(module.weight.dtype) #創建完LinearLayer_LoRA實例后,函數使用recursive_setattr函數將原始模塊替換為LinearLayer_LoRA實例。 recursive_setattr(model,name,tmp) returnmodel
convert_lora_to_linear_layer
#這個函數的主要功能是篩選出那些在DeepSpeedZero3優化中被離線存儲,但在當前還未獲取的參數。 #在DeepSpeedZero3優化中,一些模型參數在使用過后會被離線存儲,以此釋放GPU顯存。 #當這些參數需要再次被使用時,需要先獲取到本地。 def_z3_params_to_fetch(param_list): #這個條件語句判斷一個參數是否是被DeepSpeedZero3優化過的,且其狀態為"未獲取"(NOT_AVAILABLE)。 #對于被DeepSpeedZero3優化過的參數,它們有一個ds_id屬性和一個ds_status屬性,其中ds_status表示參數的當前狀態。 return[ pforpinparam_list ifhasattr(p,'ds_id')andp.ds_status==ZeroParamStatus.NOT_AVAILABLE ] #這個函數convert_lora_to_linear_layer是用來將模型中的LoRA層轉換回線性層的。 #在訓練深度學習模型時,這個操作可以用于在訓練完LoRA層后,將模型恢復到原始的狀態, #以便進行下一步的操作,如模型的保存、加載等。 #converttheLoRAlayertolinearlayer defconvert_lora_to_linear_layer(model): repalce_name=[] #函數首先遍歷模型中的所有模塊(model.named_modules()),找出所有的LoRA層(LinearLayer_LoRA), #并將這些層的名稱添加到repalce_name列表中。 forname,moduleinmodel.named_modules(): ifisinstance(module,LinearLayer_LoRA): repalce_name.append(name) #然后,函數遍歷repalce_name列表,使用recursive_getattr函數獲取模型中對應名稱的LoRA層。 fornameinrepalce_name: module=recursive_getattr(model,name) #對于每一個LoRA層,函數首先檢查是否處于zerostage3(DeepSpeed的一個特性,用于在多GPU訓練中減少內存占用)。 #如果是,則設置zero_stage_3為True。 zero_stage_3=hasattr(module.weight,'ds_id') withdeepspeed.zero.GatheredParameters(_z3_params_to_fetch([ module.weight,module.bias,module.lora_left_weight, module.lora_right_weight ]), modifier_rank=0, enabled=zero_stage_3): module.fuse_lora_weight() returnmodel
only_optimize_lora_parameters
#這個函數的作用是關閉模型中除LoRA參數之外的所有參數的梯度。這意味著在訓練過程中,只有LoRA參數會被優化,其他參數保持不變。 defonly_optimize_lora_parameters(model): #turnoffthegradientofalltheparametersexcepttheLoRAparameters #遍歷模型的所有參數。每個參數都有一個唯一的名稱name和對應的參數值param。 forname,paraminmodel.named_parameters(): #查當前參數的名稱是否包含lora_right_weight或lora_left_weight。 #這是因為在LoRA(Low-RankAdaptation)中,只有這兩種參數是需要優化的。 if"lora_right_weight"innameor"lora_left_weight"inname: #param.requires_grad=True如果參數名包含lora_right_weight或lora_left_weight, #則設置參數的requires_grad屬性為True,表示需要對此參數進行梯度下降優化。 param.requires_grad=True else: param.requires_grad=False returnmodel
create_hf_model
#這段代碼定義了一個名為create_hf_model的函數,該函數的作用是創建或加載一個預訓練模型。該函數的主要參數包括: #model_class:模型的類別,例如GPT-2、BERT等。 #tokenizer:用于模型的分詞器。 #ds_config:DeepSpeed的配置參數。 #rlhf_training:一個標志,用來表示是否正在進行RLHF(ReinforcementLearningfromHumanFeedback)訓練。 #disable_dropout:一個標志,用來表示是否禁用dropout。Dropout是一種防止過擬合的技術。 defcreate_hf_model(model_class, model_name_or_path, tokenizer, ds_config=None, rlhf_training=False, disable_dropout=False): #根據model_name_or_path從預訓練模型獲取模型配置model_config。 model_config=AutoConfig.from_pretrained(model_name_or_path) #如果disable_dropout為真,則將模型配置中的dropout設為0.0。 ifdisable_dropout: model_config.dropout=0.0 #Note:dschfisdefinedinfunctionscopetoavoidglobaleffects #https://huggingface.co/docs/transformers/main_classes/deepspeed#nontrainer-deepspeed-integration #根據ds_config中的設置,創建DeepSpeed的配置對象dschf,以便進行DeepSpeed優化。 ifds_configisnotNoneandds_config["zero_optimization"]["stage"]==3: dschf=HfDeepSpeedConfig(ds_config) else: dschf=None #根據rlhf_training的值,確定是從配置中創建模型還是從預訓練模型中加載模型。如果rlhf_training為真,則根據模型配置創建模型;否則,從預訓練模型加載模型。 ifrlhf_training: #theweightloadingishandledbycreatecriticmodel model=model_class.from_config(model_config) else: model=model_class.from_pretrained( model_name_or_path, from_tf=bool(".ckpt"inmodel_name_or_path), config=model_config) #將模型的結束標記和填充標記設為分詞器的結束標記id。 model.config.end_token_id=tokenizer.eos_token_id model.config.pad_token_id=model.config.eos_token_id #調整模型的詞匯表大小,使其為8的倍數。這樣做的目的是為了在某些硬件(如GPU)上提高效率。 model.resize_token_embeddings(int( 8* math.ceil(len(tokenizer)/8.0)))#makethevocabsizemultipleof8 returnmodel
0x3.2 main.py主體解析
parse_args解析
defparse_args(): #創建一個argparse的解析器對象,這個對象可以添加命令行參數和處理它們。description參數提供了一個對程序的簡單描述。 parser=argparse.ArgumentParser( description= "Finetuneatransformersmodelonacausallanguagemodelingtask") parser.add_argument('--data_path', nargs='*', default=['Dahoas/rm-static'], help='Pathtothetrainingdataset.Acceptedformat:' '1)asingledatapath,2)multipledatasetsinthe' 'form:dataset1-pathdataset2-path...') parser.add_argument('--data_split', type=str, default='2,4,4', help='Comma-separatedlistofproportionsfortraining' 'phase1,2,and3data.Forexamplethesplit`6,2,2`' 'willuse60%ofdataforphase1,20%forphase2' 'and20%forphase3.') parser.add_argument( '--sft_only_data_path', nargs='*', default=[], help='PathtothedatasetforonlyusinginSFTphase.') parser.add_argument( '--data_output_path', type=str, default='/data_turbo/home/zhangxiaoyu/data_files/', help= 'Wheretostorethedata-relatedfilessuchasshuffleindex.Thisneedstobeonalocalstorageofanode(notonasharedstorage)' ) parser.add_argument( "--model_name_or_path", type=str, help= "Pathtopretrainedmodelormodelidentifierfromhuggingface.co/models.", required=True, ) parser.add_argument( "--per_device_train_batch_size", type=int, default=16, help="Batchsize(perdevice)forthetrainingdataloader.", ) parser.add_argument( "--per_device_eval_batch_size", type=int, default=16, help="Batchsize(perdevice)fortheevaluationdataloader.", ) parser.add_argument( "--max_seq_len", type=int, default=512, help="Themaximumsequencelength.", ) parser.add_argument( "--learning_rate", type=float, default=1e-3, help= "Initiallearningrate(afterthepotentialwarmupperiod)touse.", ) parser.add_argument("--weight_decay", type=float, default=0., help="Weightdecaytouse.") parser.add_argument("--num_train_epochs", type=int, default=1, help="Totalnumberoftrainingepochstoperform.") parser.add_argument( "--gradient_accumulation_steps", type=int, default=1, help= "Numberofupdatesstepstoaccumulatebeforeperformingabackward/updatepass.", ) parser.add_argument( "--lr_scheduler_type", type=SchedulerType, default="cosine", help="Theschedulertypetouse.", choices=[ "linear","cosine","cosine_with_restarts","polynomial", "constant","constant_with_warmup" ], ) parser.add_argument( "--num_warmup_steps", type=int, default=0, help="Numberofstepsforthewarmupinthelrscheduler.") parser.add_argument("--output_dir", type=str, default=None, help="Wheretostorethemodel.") parser.add_argument("--seed", type=int, default=1234, help="Aseedforreproducibletraining.") parser.add_argument("--local_rank", type=int, default=-1, help="local_rankfordistributedtrainingongpus") parser.add_argument('--gradient_checkpointing', action='store_true', help='EnableHFgradientcheckpointingformodel.') parser.add_argument('--disable_dropout', action='store_true', help='Disablethedropoutofthemodel.') #deepspeedfeatures parser.add_argument('--offload', action='store_true', help='EnableZeROOffloadtechniques.') parser.add_argument( '--zero_stage', type=int, default=0, help='ZeROoptimizationstageforActormodel(andclones).') ##LoRAforefficienttrainingsetting parser.add_argument("--lora_dim", type=int, default=0, help="If>0,useLoRAforefficienttraining.") parser.add_argument("--lora_module_name", type=str, default="decoder.layers.", help="ThescopeofLoRA.") parser.add_argument('--only_optimize_lora', action='store_true', help='OnlyoptimizetheLoRAparameters.') #這一行將DeepSpeed的配置參數添加到解析器中。 parser=deepspeed.add_config_arguments(parser) #這一行解析命令行參數并將它們存儲在args對象中 args=parser.parse_args() #Validatesettings #在這個代碼塊中,驗證一些特定的參數設置是否合法。 #例如,如果同時啟用了gradient_checkpointing和僅優化LoRA參數,那么將會拋出一個錯誤。 ifargs.gradient_checkpointingandargs.lora_dim>0: assert( notargs.only_optimize_lora ),"--gradient_checkpointingand--only_optimize_loracannotbeenabledatthesametime." returnargs
main函數解析
接下來是訓練部分的核心函數,也是全文最后一個函數main的解析。
#這個函數是主函數,是訓練語言模型的主流程,主要步驟包括解析命令行參數、 #設置設備、準備數據、定義模型、配置優化器和學習率調度器、進行訓練和評估等。 defmain(): #解析命令行參數。 args=parse_args() #如果本地排名為-1,說明不在分布式訓練環境下,設備設置為"cuda"; #否則根據args.local_rank設置對應的cuda設備,并初始化分布式訓練。 ifargs.local_rank==-1: device=torch.device("cuda") else: torch.cuda.set_device(args.local_rank) device=torch.device("cuda",args.local_rank) #Initializesthedistributedbackendwhichwilltakecareofsychronizingnodes/GPUs #torch.distributed.init_process_group(backend='nccl') deepspeed.init_distributed() #獲取全局rank。 args.global_rank=torch.distributed.get_rank() #獲取deepspeed的訓練配置。 ds_config=get_train_ds_config(offload=args.offload, stage=args.zero_stage) #在配置中設置訓練時每個GPU的微批次大小和總的批次大小。 ds_config[ 'train_micro_batch_size_per_gpu']=args.per_device_train_batch_size ds_config[ 'train_batch_size']=args.per_device_train_batch_size*torch.distributed.get_world_size( )*args.gradient_accumulation_steps #Ifpassedalong,setthetrainingseednow. #設置隨機種子以保證結果的可復現性。 set_random_seed(args.seed) #在此處添加一個barrier操作,確保所有進程都執行到這一點后再繼續執行后續操作。 torch.distributed.barrier() #加載預訓練模型對應的分詞器。 tokenizer=load_hf_tokenizer(args.model_name_or_path,fast_tokenizer=True) tokenizer.pad_token=tokenizer.eos_token #makesuretokenizerisrightpadinourlogic tokenizer.padding_side='right' #創建預訓練模型。 model=create_hf_model(AutoModelForCausalLM, args.model_name_or_path, tokenizer, ds_config, disable_dropout=args.disable_dropout) #如果參數lora_dim大于0,將模型的線性層轉換為LoRa層;如果只優化LoRa參數,關閉其他參數的梯度。 ifargs.lora_dim>0: model=convert_linear_layer_to_lora(model,args.lora_module_name, args.lora_dim) ifargs.only_optimize_lora: model=only_optimize_lora_parameters(model) #Preparethedata #創建數據集和數據加載器:包括訓練集和驗證集,以及對應的采樣器和數據加載器。 train_phase=1 train_dataset,eval_dataset=create_prompt_dataset( args.local_rank, args.data_path, args.data_split, args.data_output_path, train_phase, args.seed, tokenizer, args.max_seq_len, sft_only_data_path=args.sft_only_data_path) #DataLoaderscreation: ifargs.local_rank==-1: train_sampler=RandomSampler(train_dataset) eval_sampler=SequentialSampler(eval_dataset) else: train_sampler=DistributedSampler(train_dataset) eval_sampler=DistributedSampler(eval_dataset) train_dataloader=DataLoader(train_dataset, collate_fn=default_data_collator, sampler=train_sampler, batch_size=args.per_device_train_batch_size) eval_dataloader=DataLoader(eval_dataset, collate_fn=default_data_collator, sampler=eval_sampler, batch_size=args.per_device_eval_batch_size) #定義模型評估函數,用于計算模型在驗證集上的困惑度。 defevaluation(model,eval_dataloader): model.eval()#將模型切換為評估模式。 losses=0#初始化loss。 forstep,batchinenumerate(eval_dataloader):#對于評估數據集中的每一個batch。 batch=to_device(batch,device)#將batch數據移到對應的設備上。 withtorch.no_grad():#在此上下文管理器中,不計算梯度,這樣可以節省存儲和計算資源。 #將batch數據輸入模型,進行前向計算。 outputs=model(**batch) loss=outputs.loss#取出模型的輸出中的loss。 losses+=loss.float()#將當前的loss累加到總的losses中。 losses=losses/(step+1)#計算平均的loss。 try: perplexity=torch.exp(losses)#嘗試計算模型的困惑度,如果捕捉到溢出錯誤,將困惑度設置為無窮大。 exceptOverflowError: perplexity=float("inf") #嘗試在所有設備上計算困惑度的平均值,如果發生任何錯誤,就忽略。 try: perplexity=get_all_reduce_mean(perplexity).item() except: pass returnperplexity #根據是否使用權重衰減將模型參數分為兩組。 #Splitweightsintwogroups,onewithweightdecayandtheothernot. optimizer_grouped_parameters=get_optimizer_grouped_parameters( model,args.weight_decay) #選擇優化器類型,如果啟用了梯度Offload,使用DeepSpeedCPUAdam,否則使用FusedAdam。 AdamOptimizer=DeepSpeedCPUAdamifargs.offloadelseFusedAdam #創建優化器。 optimizer=AdamOptimizer(optimizer_grouped_parameters, lr=args.learning_rate, betas=(0.9,0.95)) #計算每個epoch的更新步數。 num_update_steps_per_epoch=math.ceil( len(train_dataloader)/args.gradient_accumulation_steps) #創建學習率調度器。 lr_scheduler=get_scheduler( name=args.lr_scheduler_type, optimizer=optimizer, num_warmup_steps=args.num_warmup_steps, num_training_steps=args.num_train_epochs*num_update_steps_per_epoch, ) #使用deepspeed初始化模型、優化器和學習率調度器。 model,optimizer,_,lr_scheduler=deepspeed.initialize( model=model, optimizer=optimizer, args=args, config=ds_config, lr_scheduler=lr_scheduler, dist_init_required=True) #如果啟用了梯度檢查點,那么在模型中也啟用梯度檢查點。 ifargs.gradient_checkpointing: model.gradient_checkpointing_enable() #Train! #使用print_rank_0函數在主節點(global_rank為0的節點)打印開始訓練的信息。 print_rank_0("*****Runningtraining*****",args.global_rank) #在主節點打印在第0個epoch(訓練開始前)進行模型評估的信息。 print_rank_0( f"*****Evaluatingperplexity,Epoch{0}/{args.num_train_epochs}*****", args.global_rank) #調用evaluation函數對模型進行評估,得到模型的困惑度。 perplexity=evaluation(model,eval_dataloader) #在主節點打印模型的困惑度。 print_rank_0(f"ppl:{perplexity}",args.global_rank) #循環args.num_train_epochs輪進行訓練。 forepochinrange(args.num_train_epochs): print_rank_0( f"BeginningofEpoch{epoch+1}/{args.num_train_epochs},TotalMicroBatches{len(train_dataloader)}", args.global_rank)#在每個epoch開始時,在主節點打印開始新的訓練周期的信息。 model.train()#將模型設置為訓練模式。 forstep,batchinenumerate(train_dataloader):#對于訓練數據集中的每一個batch。 batch=to_device(batch,device)#將batch數據移到對應的設備上。 outputs=model(**batch,use_cache=False)#將batch數據輸入模型,進行前向計算。 loss=outputs.loss#取出模型的輸出中的loss。 model.backward(loss)#進行反向傳播,計算梯度。 model.step()#更新模型的參數。 #Evaluateperplexityonthevalidationset. #在每個epoch結束后,在主節點打印開始評估的信息。 print_rank_0( f"*****Evaluatingperplexity,Epoch{epoch+1}/{args.num_train_epochs}*****", args.global_rank) #對模型進行評估,得到模型的困惑度。 perplexity=evaluation(model,eval_dataloader) #在主節點打印模型的困惑度。 print_rank_0(f"ppl:{perplexity}",args.global_rank) #更新模型中的epoch計數。 model.tput_timer.update_epoch_count() #如果設置了輸出目錄,進行以下操作。 ifargs.output_dirisnotNone: #在主節點打印開始保存模型的信息。 print_rank_0('savingthefinalmodel...',args.global_rank) #將模型中的LoRA層轉換為線性層。 model=convert_lora_to_linear_layer(model) #如果是主節點,進行以下操作。 ifargs.global_rank==0: #以HuggingFace的模型格式保存模型。 save_hf_format(model,tokenizer,args) #如果使用了ZeroRedundancyOptimizer(Zero)的第三階段,進行以下操作。 ifargs.zero_stage==3: #Forzerostage3,eachgpuonlyhasapartofthemodel,soweneedaspecialsavefunction #使用特殊的保存函數保存模型。在Zero的第三階段,每個GPU只有模型的一部分,所以需要特殊的保存函數。 save_zero_three_model(model, args.global_rank, args.output_dir, zero_stage=args.zero_stage)
0x4. hybrid_engine的細節 & log
從訓練過程的輸出日志來看hybrid_engine是默認關閉的,DeepSpeed-Chat 打造類ChatGPT全流程 筆記一 里面提到DeepSpeed Hybrid Engine是用在加速 RLHF 流程中最耗時的部分也就是第三步,而本文介紹的監督指令微調是第一步,所以即使開啟hybrid_engine加速效果應該也比較有限,所以這里默認關閉。
hybrid_engine的優化方法和原理在后續文章中繼續探索。
這里分享一下我復現官方sample訓練的第一階段的log:https://paste.ubuntu.com/p/vcG49hQmCW/
0x5. 總結
這篇文章解析了DeepSpeed Chat中監督指令微調這個過程的源碼,這個過程和一般的PyTorch DDP分布式訓練區別不是特別大,主要是自定義prompt數據集以及將普通的訓練流程中的組件如模型,優化器,學習率調度器等等,使用DeepSpeed來warp一下,來用上DeepSpeed提供的Zero,Gradient Checkpoint(注意這個其實就是activation checkpoint)等特性。本文是完全按照訓練流程順序閱讀代碼,并補全了訓練過程中所有涉及到的工具函數或者新的特性如LoRA微調的代碼解析。DeepSpeed Chat這部分代碼寫得比較清晰易懂,因為是在接口層面來使用DeepSpeed,相當于基于DeepSpeed做應用所以代碼中不會涉及到DeepSpeed的底層代碼,只需要關注算法流程。但這個代碼在LoRA微調這部分感覺設計的耦合性有一點高,如果要新增新的微調方式比如QLoRA可能寫法就不太優雅了。
-
源碼
+關注
關注
8文章
652瀏覽量
29454 -
模型
+關注
關注
1文章
3308瀏覽量
49223 -
自然語言
+關注
關注
1文章
291瀏覽量
13400
原文標題:0x5. 總結
文章出處:【微信號:GiantPandaCV,微信公眾號:GiantPandaCV】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論