
本文为《类ChatGPT逐行代码解读》系列的第二篇,上一篇是:从零实现Transformer、ChatGLM-6B:从位置编码/缩放点积注意力/多头注意力开始
本文模型的特点是都加了RLHF,对于这4个模型而言:TRL、ChatLLaMA、ColossalChat、DeepSpeed Chat
总之,微软这个DeepSpeed Chat实现的不错,抠完它的关键代码后,你会发现和之前本博客内另一篇写的原理部分都一一对应起来了(如果你还没看过原理,建议先看此文:ChatGPT技术原理解析,只有懂原理才能更好的理解实现或实际实现,特别是该文的第三部分 ),而把论文、原理/算法、公式、代码一一对应,可以让你的理解有个质变
本文最早的标题是:从零实现带RLHF的类ChatGPT:从TRL/ChatLLaMA/ColossalChat到DeepSpeed Chat,后来因为要不断扩展DSC的内容,为避免本文越写越长,故最终分出了两篇文章
通过《ChatGPT技术原理解析》一文,我们已经知道了ChatGPT的三阶段训练过程,其中,阶段三的本质其实就是通过PPO的方式去微调LM
GitHub上有个TRL(Transformer Reinforcement Learning,基于『Hugging Face开发的Transformer库』),便是通过PPO的方式去微调LM,需要的数据便是三元组「query, response, reward」,具体如下图所示
PPO算法是一种具体的Actor-Critic算法实现,比如在对话机器人中,输入的prompt是state,输出的response是action,想要得到的策略就是怎么从prompt生成action能够得到最大的reward,也就是拟合人类的偏好。具体实现时,可以按如下两大步骤实现
以下是计算策略损失和价值损失的关键代码(来自trl/ppo_trainer.py at main · lvwerra/trl · GitHub的第971-1032行),且为方便大家阅读时一目了然,我特意给每一行的代码都加上了注释
def loss(self,old_logprobs: torch.FloatTensor, # 旧的对数概率,是前一步的策略输出values: torch.FloatTensor, # 价值函数的输出rewards: torch.FloatTensor, # 从环境中得到的奖励logits: torch.FloatTensor, # 策略网络的原始输出(未经softmax)vpreds: torch.FloatTensor, # 价值函数的预测值logprobs: torch.FloatTensor, # 当前策略输出的对数概率mask: torch.LongTensor, # 用于忽略某些元素(如填充元素)的掩码
):lastgaelam = 0 # 初始化lastgaelam,用于计算广义优势估计(GAE)advantages_reversed = [] # 初始化一个空列表,用于存储计算出的逆序优势值gen_len = rewards.shape[-1] # 获取奖励的长度,即序列的长度values = values * mask # 使用掩码对值进行过滤rewards = rewards * mask # 使用掩码对奖励进行过滤# 反向遍历时间步,计算每一步的优势值for t in reversed(range(gen_len)): # 获取下一个状态的值nextvalues = values[:, t + 1] if t < gen_len - 1 else 0.0 # 计算TD误差delta = rewards[:, t] + fig.gamma * nextvalues - values[:, t] # 更新lastgaelamlastgaelam = delta + fig.gamma * fig.lam * lastgaelam # 将计算出的优势值添加到列表中advantages_reversed.append(lastgaelam) # 对逆序的优势值进行逆序,使其按照原始顺序,然后将其堆叠起来并进行转置,得到最终的优势值advantages = torch.stack(advantages_reversed[::-1]).transpose(0, 1) # 计算回报,即优势值加上对应的value值returns = advantages + values# 对优势值进行掩码白化,即只对掩码部分进行白化处理advantages = masked_whiten(advantages, mask)# 从计算图中分离优势值,防止反向传播 advantages = advantages.detach() # 对预测值进行剪裁,防止预测值偏离真实值过远vpredclipped =
clip_by_value(vpreds, values - fig.cliprange_value, values + fig.cliprange_value)# 计算预测值与回报之间的平方损失vf_losses1 = (vpreds - returns) ** 2 # 计算剪裁后的预测值与回报之间的平方损失vf_losses2 = (vpredclipped - returns) ** 2 # 计算价值函数的损失,选择两种损失中的较大者,然后计算其平均值,并乘以0.5vf_loss = 0.5 * masked_mean(torch.max(vf_losses1, vf_losses2), mask) # 计算剪裁损失比原始损失大的部分的平均值vf_clipfrac = masked_(vf_losses2, vf_losses1).double(), mask) # 计算新旧策略的比例ratio = p(logprobs - old_logprobs) # 计算策略梯度损失pg_losses = -advantages * ratio # 计算剪裁后的策略梯度损失pg_losses2 =
-advantages * torch.clamp(ratio, 1.0 - fig.cliprange, 1.0 + fig.cliprange) # 计算策略损失,选择两种损失中的较大者,然后计算其平均值pg_loss = masked_mean(torch.max(pg_losses, pg_losses2), mask) # 计算剪裁损失比原始损失大的部分的平均值pg_clipfrac = masked_(pg_losses2, pg_losses).double(), mask)# 计算总损失,等于策略损失加上价值损失loss = pg_loss + fig.vf_coef * vf_loss
上面代码中 有两点值得解释下
计算回报时,为何是优势值加上对应的value值
别忘了,根据本博客中另一篇文章《RL极简入门》可知
优势函数A(s,a)定义为Q(s,a) - V(s),其中Q(s,a)是动作价值函数,表示在状态s采取动作a所能获得的预期回报
而V(s)则是状态价值函数,表示在状态s下依据当前策略所能获得的预期回报
因此,当我们计算returns时,实际上是在计算Q(s,a)的估计值,即预期的动作价值
所以,returns = advantages + values = (Q(s,a) - V(s)) + V(s) = Q(s,a)
这样,returns就代表了我们预期能在状态s采取动作a获得的回报
由于LLaMA没有使用RLHF方法,初创公司 Nebuly AI开源了RLHF版的LLaMA,即ChatLLaMA
其训练过程类似 ChatGPT,而通过本博客内的《ChatGPT技术原理解析》3.1节,可知训练三个模型(SFT、RM、RL/PPO)得先准备三套数据集
actor_training_data,即用于微调GPT3所用的数据,比如
[
{
"user_input": "here the input of the user",
"completion": "here the model completion"
}
]
actor_training_data如何而来呢,有4项途径
Anthropic HH RLHF:这个数据集由结构化的 {question/answer pairs} 组成,包括机器人选择和拒绝的答案;
Stanford Human Preferences Dataset (SHP):这个数据集是从选定的“提问”subreddits 中挑选出来的,并且包括基于最受支持的回答的范围广泛的 {question/answer pairs} 的问题
可以运行以下命令下载数据集:
python artifacts/download_dataset.py <dataset_name> --path <path_to_folder_for_download> --number_of_samples <N> 其中:
<dataset_name>对于 StanfordNLP/SHP 数据集,可以是“SHP”或“ARLHF”,对于 Anthropic/hh-rlhf 数据集,可以分别是“SHP”或“ARLHF”;
<path_to_folder_for_download>是要创建数据集的文件夹路径;
<N>是组成 reward_dataset.json 的样本数
使用 100% 个性化数据集
用户提供自己的个性化完整数据集,数据集必须是具有以下格式的 JSON 文件:
[
{
"user_input": "here the input of the user",
"completion": "here the model completion"
}
]
其中列表包含多个dictionaries,每个dictionary 对应一个数据样本,建议使用超过 1000 个数据样本来进行对actor的训练
创建完整的数据集,增加一些自定义数据样本,数据集可以从用户提供的一些提示+响应示例中综合生成(少数 => 10)
reward_training_data,用于训练一个奖励模型的数据,包含三部分的数据:
i) prompts,
ii) completion
iii) score of the completion assigned accordingly to the user feedback (the Human Feedback in RLHF,即对各个回答的评分score)
示例如下
[{
"user_input": "...",
"completion": "...",
"score": 1
},
...
]
同样的,奖励数据怎么来呢?有以下三种方式
{
"reward": "Here is the template for the reward model. The rules are:nn1.Rule 1nn2. Rule 2"
}
如果未提供模板,则使用默认模板artifacts/generate_rewards.py,注:所有模板都必须保存在一个名为 .json 的 JSON 文件中templates.json
获得unlabelled dataset后,您可以通过运行以下命令生成分数:
python artifacts/generate_rewards.py <dataset_path> --model <model_to_use> --temperature <t> --max_tokens <n> --reward_template <path_to_file.json> 其中,<dataset_path>要评分的reward dataset的路径;
<model_to_use>用于奖励的模型,默认建议使用text-davinci-003
<temperature>用于对模型进行评分的temperature,temperature =0.1;
<max_tokens>
<reward_template>,这是包含用于生成奖励的模板的文件的路径,如果未提供路径,将使用默认模版
这里值得注意的是,与instructGPT中的「人类通过对模型的输出进行排序,然后利用这些排序数据去训练一个RM」不同,ChatLLaMA直接训练一个RM对模型的输出进行打分 比如0-5分,且与人类的打分做MSE损失(减少RM打分与人类打分之间的差距)
REWARD_TEMPLATE = dict(template=("You have to evaluate the following chat with a score""between 0 and 5" 最后,可能你会问,从哪里看出来的用的MSE损失,答案是从另外一个文件里看出来的(具体是chatllama/rlhf/reward.py 文件的第282行)
class RewardTrainer:"""Class to train the reward modeldef __init__(self, config: ConfigReward) -> None:# save fig = config# load ward = RewardModel(config)# optimizerself.optimizer = torch.optim.ward.parameters(), lr=config.lr)# loss functionself.loss_function = MSELoss()// ... 用户提供他们个性化的完整数据集(至少需要 100 个数据样本),但数据集必须是以下格式的 JSON 文件,取名为:reward_training_data.json
[{"user_input": "here type the user input","completion": "here type the completion","score": 4.0},{"user_input": "here type the user input","completion": "random garbage","score": 0.0}
] 用户提供的少量示例和使用 LLM 综合扩展的数据集(通过self-instruct的方式提示LLM产生更多所需要的指令数据)
It can be provided in 2 different ways:
继续通过self-instruct的方式提示LLM产生更多所需要的指令数据)
需要将key rlhf添加到templates.json文件中,其中包含有关要执行的任务的信息以及 LLM 生成所需的额外上下文,这是模板的示例(所有模板必须保存在一个名为templates.json):
{
"rlhf": "Here is the template for the generating RLHF prompts. The task we want to perform is ..."
}
The user provides the full dataset with possible interactions with the model
数据集需要包含超过 1000 个提示示例(文件命名为rlhf_training_data.json):
[
{
"user_input": "here the example of user input"
}
]
chatllama/rlhf/reward.py中
首先定义了一个名为 Reward Model 的类,作为奖励模型或批评者模型(Critic Model)。Reward Model 是一个基于语言模型的模型,附加了一个头部head,用于预测给定的 token 序列的奖励(一个标量值),最后将CriticModel类设置为RewardModel类,以保持命名一致性
之后,定义类:RewardDatase用于训练奖励模型的数据集
RewardDataset 类是一个继承自 Dataset 的自定义数据集类,它的作用是从给定的 JSON 文件中读取数据,并将数据整理成适当的格式。JSON 文件应包含以下格式的数据:
class RewardDataset(Dataset):"""Dataset class for the reward modelread a json file with the following format:[{"user_input": "...","completion": "...","score": ...},...]Where:user_input: the initial input of the usercompletion: the completion generated by the modelscore: the score given by the user to the completion (or by the LLM)"""
其中 user_input 是用户的初始输入,completion 是模型生成的补全,而 score 是用户或LLM给予补全的分数
再定义一个RewardTrainer 类用于训练奖励模型,它初始化奖励模型、优化器、损失函数(具体如上文所说,或如282行所述的MSE损失函数)、数据集和数据加载器等。此外,它还支持使用 DeepSpeed 或 Accelerate(两种高性能深度学习训练框架)进行训练
RewardTrainer 类的主要方法有:
train:训练奖励模型。它执行训练循环,包括前向传播、计算损失、反向传播和优化器更新。在每个周期结束时,它还可以对模型进行验证(如果提供了验证数据集的话)
# 定义构造函数
def __init__(self, config: ConfigReward) -> None:# 保存配置对象fig = config# 加载模型ward = RewardModel(config)# 创建优化器self.optimizer = torch.optim.ward.parameters(), lr=config.lr)# 定义损失函数,用的交叉熵损失self.loss_function = MSELoss()# 检查验证数据集是否存在self.validation_flag = Falseif config.validation_dataset_path is not None:self.validation_flag = True# 创建数据集和数据加载器ain_dataset = ain_dataset_ain_dataloader = ain_dataset, batch_size=config.batch_size)# 如果有验证数据集,则创建验证数据集和数据加载器if self.validation_flag:self.eval_dataset = RewardDataset(config.validation_dataset_path)self.validation_dataloader = DataLoader(self.eval_dataset, batch_size=config.batch_size)# 初始化学习率调度器 - 学习率将下降到初始值的10%self.scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(self.optimizer,T_0=ain_dataset) // config.batch_size,T_mult=1,eta_min=config.lr * 0.1,last_epoch=-1,) def train(self,) -> None:# 训练奖励模型# # 打印开始训练奖励模型的消息print("Start Training the Reward Model") # 如果启用了 DeepSpeed,从训练数据加载器获取批次大小fig.deepspeed_enable: batch_size = ain_dataloader.batch_size else: batch_size = fig.batch_size # 从配置获取批次大小epochs = fig.epochs # 从配置获取训练轮次device = fig.device # 从配置获取设备# 从配置获取每次打印的迭代次数iteration_per_print = fig.iteration_per_print # 从配置获取检查点步骤checkpoint_steps = fig.checkpoint_steps # 计算训练数据集的迭代次数n_iter = int(ain_dataset) / batch_size) # 加载检查点并获取开始轮次和开始步骤start_epoch, start_step = self.load_checkpoint() # 初始化检查点计数器cnt_checkpoints = 1 # 训练循环for epoch in range(start_epoch, epochs): # 对于每个轮次ain() # 将奖励模型设置为训练模式# 遍历训练数据加载器中的每个输入for i,inputs in ain_dataloader): # 如果从检查点恢复,则跳过步骤if i < start_step:continue# 获取输入input_text = inputs[0] # 获取输入文本score = inputs[1] # 获取分数# 对输入进行分词_grad(): # 禁用梯度计算input_tokens = kenizer(input_text,return_tensors="pt",truncation=True,padding=True,) output = torch.as_tensor(score, dtype=torch.float32, device=device) # 将分数转换为张量# 前向传播fig.deepspeed_enable: # 如果启用了 DeepSpeedest_output = del_engine(input_tokens["input_ids"].to(device),input_tokens["attention_mask"].to(device),)[:, -1] # 使用模型引擎进行前向传播else:est_output = _reward(input_tokens["input_ids"].to(device),input_tokens["attention_mask"].to(device),) # 使用奖励模型进行前向传播# 计算损失函数loss = self.loss_function(est_output, output) # 将损失添加到训练统计数据中 aining_loss.append(loss.item()) # 反向传播fig.deepspeed_enable: # 如果启用了 del_engine.backward(loss) # 使用模型引擎进行反向传播del_engine.step() # 更新模型参数fig.accelerate_enable: # 如果启用了加速_grad() # 将优化器的梯度归零self.accelerator.backward(loss) # 使用加速器进行反向传播self.optimizer.step() # 更新模型参数self.scheduler.step() # 更新学习率调度器else: _grad() # 将优化器的梯度归零loss.backward() # 进行反向传播self.optimizer.step() # 更新模型参数self.scheduler.step() # 更新学习率调度器# 打印进度,如果当前迭代次数是打印间隔的整数倍if i % iteration_per_print == 0: print(f"Epoch: {epoch+1}/{epochs}, "f"Iteration: {i+1}/{n_iter}, "f"Training Loss: {loss.item()}")printed_est_output = [round(float(x), 1) for x in est_output.cpu().tolist()] # 对估计输出进行四舍五入print("prediction",printed_est_output,"target",score.cpu().tolist(),) # 打印预测值和目标值# 保存检查点,如果检查点计数器是检查点步骤的整数倍if cnt_checkpoints % checkpoint_steps == 0: self.save_checkpoint(epoch, i, epochs, n_iter) # 保存检查点cnt_checkpoints = 1 # 重置检查点计数器else: cnt_checkpoints += 1 # 检查点计数器加一# 验证if self.validation_flag: # 如果启用了验证ward.eval() # 将奖励模型设置为评估模式_grad(): # 禁用梯度计算for i, (text, score) in enumerate(self.validation_dataloader): # 遍历验证数据加载器中的每个输入# 对输入进行分词input_tokens = kenizer(text, return_tensors="pt", padding=True) # 对输入文本进行分词input_tokens = (device) # 将输入令牌移动到设备上# TODO: 检查输入令牌的长度,如果过长可能会导致问题output = sor(score, dtype=torch.float32).to(device) # 将分数转换为张量并移动到设备上# 前向传播est_output = _reward(input_tokens["input_ids"],input_tokens["attention_mask"],) # 使用奖励模型进行前向传播# 计算损失函数loss = self.loss_function(est_output, output) # 将损失添加到训练统计数据中aining_stats.validation_loss.append(loss.item()) # 打印进度,如果当前迭代次数是打印间隔的整数倍if i % iteration_per_print == 0: print(f"Epoch: {epoch+1}/{epochs}, "f"Iteration: {i+1}/{n_iter}, "f"Validation Loss: {loss.item()}") # 打印验证进度# 在恢复训练后重置 start_stepstart_step = 0# 训练结束后保存模型ward.save() # 保存奖励模型 总之,在 RewardTrainer 类的 train 方法中
首先会尝试从检查点恢复模型(如果有的话);
然后,它会遍历数据加载器中的所有输入,对每个输入执行前向传播、计算损失、反向传播和优化器更新;在每个周期结束时,如果提供了验证数据集,还会对模型进行验证;
最后,在训练完成后,将保存模型
此外,项目通过chatllama/rlhf/actor.py再训练一个actor,比如通过train方法训练一个基于transformer的模型,它包括了数据处理、模型训练、验证和模型保存等操作
train方法,它没有返回值。start_step重置为0。有了奖励函数和actor,便可以通过PPO算法优化强化学习任务中的策略(actor)和价值(critic)网络,具体如下图,设置内外两个循环
在内层循环中依次做如下处理(以下代码来源于:chatllama/chatllama/rlhf/trainer.py ):
首先是导入必须的库和模块,当然,主要是ActorCritic类
change_tokenization函数:用于在两个不同的分词器之间转换给定的tokens。check_model_family函数:检查两个配置是否属于相同的模型家族。ActorCritic类:包含了actor和critic模型,并用于在训练actor过程中为给定的序列生成动作和值。它包括以下方法: __init__:初始化actor和critic模型 def __init__(self, config: Config) -> None:super().__init__()fig = configself.actor = ActorModel(config.actor)# check if critic must be initialized from reward modelModelLoader.init_critic_from_itic = itic)# if the actor and critic use the same tokenizer is set to Trueself.use_same_tokenizer = False# debug flagself.debug = config.actor. load:加载模型,但未实现。save:将模型保存到路径。forward:基于给定的整个序列,使用actor的forward方法获取序列中每个token的logits,并使用critic的forward方法获取每个生成步骤的值。这个代码主要用于强化学习训练自然语言生成模型。ActorCritic类是其中的核心部分,它包含了actor和critic模型。这两个模型在训练过程中相互协作,用于生成动作和值。
其次,主要是关于一个用于生成动作、动作逻辑、价值和序列的生成函数,以及用于存储训练数据和生成训练示例的类
再之后,定义了一个名为 RLTrainer 的类,用于使用强化学习训练一个Actor-Critic模型。该类具有多个属性和方法,用于训练过程中的各种操作。
__init__ 方法中,初始化了训练器的各个组件,包括Actor-Critic模型、actor和critic优化器、reward模型、用于存储训练统计数据和对话记录的类、以及示例采样器save_checkpoint 方法保存了当前状态的Actor-Critic模型的检查点,包括当前的训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。load_checkpoint 方法加载了Actor-Critic模型的检查点,包括训练轮数、actor和critic模型的状态字典,以及它们各自的优化器的状态字典。如果没有找到检查点,则返回轮数0。如果actor和critic的检查点存在差异,则从两者中最小的轮数开始训练。再之后,调用 learn 方法更新actor和critic模型,并保存训练统计数据和对话记录
# get actor critic new probabilities and valuesactions_logits, values = self.actorcritic.forward(sequences_actor,sequences_mask_actor,sequences_critic,sequences_mask_critic,action_len_actor.item(),action_len_critic.item(),) # get action log probactions_prob = (torch.softmax(actions_logits, dim=-1).max(dim=-1).values)actions_log_prob = torch.log(actions_prob + self.eps)# compute entropy,一般表示为-sum「p(x)logp(x)」entropies = (actions_prob * actions_log_prob).sum(dim=-1)# compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」kl_div_loss = ((actions_prob * (old_actions_log_probs - actions_log_prob)).sum(dim=-1).mean()) # compute ratiosratios = (actions_log_prob - old_actions_log_probs).exp() # ratios即为重要性权重,exp代表求期望,括号里的environment_log_probs代表用于与环境交互的策略
ratios = p(log_probs - environment_log_probs)# 分别用sur_1、sur_2来计算公式的两部分
# 第一部分是重要性权重乘以优势函数
sur_1 = ratios * advs# 第二部分是具体的裁剪过程
sur_2 = torch.clamp(ratios, 1 - clip_eps, 1 + clip_eps) * advs# 最终看谁更小则取谁
clip_loss = -torch.min(sur_1,sur_2).mean() 更具体的实现,则可以如下所示 # 计算PPO总损失if check_model_fig.actor, itic):# 计算 TRL 中的折扣回报gamma = ainer.gamma_discounted# 初始化折扣回报矩阵discounted_rewards = s_like(old_values)# 遍历每个时间步for i in range(discounted_rewards.shape[1]):for j in range(i, discounted_rewards.shape[1]):# 计算折扣回报discounted_rewards[:, i] += (gamma ** (j - i) * rewards[:, j])# 计算优势值,与TRL 中旧值的符号相反advantages = (discounted_rewards - old_values)# normalize advantagesadvantages = (advantages - an(dim=-1)) / (advantages.std() + self.eps)surr1 = advantages * ratioselse:advantages = rewards - old_values[:, -1]surr1 = advantages * ratiossurr2 = (torch.clamp(ratios, 1 - actor_eps_clip, 1 + actor_eps_clip)* advantages) policy_loss = -torch.min(surr1, surr2) - beta_s * entropiespolicy_loss = an()loss = policy_loss + kl_div_loss 可能有读者看到这里 看迷糊了,即咋出来两个损失函数了,看起来是一个策略损失,一个KL散度损失,与我们在本博客里的另一篇文章《ChatGPT技术原理解析》中「3.1.3 InstructGPT训练阶段3:如何通过PPO算法进一步优化模型的策略」探讨的结果咋不太一样呢 ? 不急,我们先来分析下这两个损失函数 # compute entropy,一般表示为-sum「p(x)logp(x)」entropies = (actions_prob * actions_log_prob).sum(dim=-1) 另一个 KL散度损失(kl_div_loss),还是用于限制新策略与旧策略之间的差异,以免更新太快,导致学习不稳定 # compute KL divergence,一般表示为:-sum「p(x)log q(x)/p(x)」kl_div_loss = ((actions_prob * (old_actions_log_probs - actions_log_prob)).sum(dim=-1).mean()) 对应的公式为
值得一提的是,这里确实容易引发疑惑,毕竟上面的policy loss已经对新旧策略的比值 ratios = (actions_log_prob - old_actions_log_probs).exp() 做了截断处理,而这里又加一个对新旧策略差值的KL散度约束,未免有多此一举之嫌,比如在instructGPT的原理中便只有两者其一:关于策略梯度的损失就一个policy loss
最终,总的损失函数为:
其中, 是超参数,用于控制 KL 散度损失的权重
# check if loss item is NaNif torch.isnan(loss):raise ValueError("Loss is nan") # 按照损失更新 actor 模型参数# 使用 DeepSpeed 的 engine 对 loss 进行反向传播fig.actor.deepspeed_enable:actor_model_engine.backward(loss)actor_model_engine.step()# 如果启用了 PyTorch 的 fig.actor.accelerate_enable:# 将 actor 模型参数的梯度清零self._grad()# 使用 Accelerate 对 loss 进行反向传播actor_accelerator.backward(loss)# 使用 PyTorch 的优化器更新 actor 模型参数self.actor_optimizer.step()# 使用 PyTorch 的学习率调度器更新学习率self.actor_scheduler.step()else:self._grad()# 对 loss 进行反向传播loss.backward()self.actor_optimizer.step()self.actor_scheduler.step()
# compute value loss# 裁剪限制了值损失剪辑的变化速率# 使得value相比old_value的更新范围限制在[-critic_eps_clip, critic_eps_clip]之内,否则就截断# 说白了,截断的作用就是:最大不能大过critic_eps_clip,最小不能小过-critic_eps_clipvalue_loss_clipped = old_values + (values - old_values).clamp(-critic_eps_clip, critic_eps_clip)# 计算第一种值损失,即裁剪后的值与奖励之间的平方差value_loss1 = (value_loss_clipped - rewards) ** 2# 计算第二种值损失,即未裁剪的值与奖励之间的平方差value_loss2 = (values - rewards) ** 2# 选择两种值损失中较大的那个,并计算其均值value_loss = torch.max(value_loss1, value_loss2).mean() 本文发布后,有读者留言对这块表达疑惑,即怎么是先计算裁剪的损失,然后对比未裁剪的损失,然后两种损失中取更大呢,原因和上文第六部分最后解释的一样,便不再重复了 if torch.isnan(value_loss):raise ValueError("Value loss is nan") # upate itic.deepspeed_enable:critic_model_engine.backward(value_loss)critic_model_engine.step()itic.accelerate__grad()critic_accelerator.backward(itic_optimizer.step()itic_scheduler.step()_grad()value_loss.backward()itic_optimizer.step()itic_scheduler.step() # 将训练损失值添加到训练统计信息中aining_loss.append(# 将损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型loss.detach().cpu().item())# 将价值损失值添加到训练统计信息中aining_stats.value_loss.append(# 将价值损失值从计算图中分离,移动到 CPU 上,并转换为 Python 数值类型value_loss.detach().cpu().item()) # print iteration infoprint(f"Epoch {epoch+1}/{epochs}",f"Step {k+1}/{int(len(dataloader) / batch_size)}",f"Loss {loss.detach().cpu().item():.4f}",f"Value Loss {value_loss.detach().cpu().item():.4f}",) self.actorcritic.eval()print("End Learning") 最后的最后,定义了一个 train() 方法,使用 actor-critic 算法训练强化学习模型。方法首先初始化各种设置,如训练的总 episode 数量、每个 episode 的最大步数、批次大小和训练设备等。然后检查要用于学习的记忆数量是否是批次大小的倍数,以及总步数是否是更新步数的倍数。
该方法初始化记忆,加载检查点(如果有的话),如果是从头开始的新训练,则清除会话记录。然后循环遍历 episode 和 timestep,从示例数据集中抽取样本,为 actor 和 critic 进行分词,生成动作和值的序列,计算动作日志概率,计算奖励。存储每个 episode/timestep 的记忆,并将完成(解码后的动作)记录在会话日志中。
在一定数量的 timestep 后,使用记忆进行学习,并计算平均奖励。该过程重复进行,直到训练完成。该方法在训练结束时保存模型和会话日志。
据介绍(介绍页面,该页面的翻译之一,代码地址),Colossal-AI 开源了基于 LLaMA-7B 模型的包含完整 RLHF 流程的类 Chat 模型复现方案 ColossalChat
ColossalChat 收集并清洗了社交平台上人们的真实提问场景作为种子数据集,然后利用 self-instruct 技术扩充数据(通过prompt OpenAI API,花费约 900 美元进行标注),最终生成了10.4万条问答的中、英双语数据集(这是数据的开源地址)
他们的说法是,对比其他 self-instruct 方法生成的数据集,该数据集的种子数据更加真实、丰富,生成的数据集涵盖的话题更多,该数据可以同时用于微调和 RLHF 训练,通过高质量的数据,ColossalChat 能进行更好地对话交互,同时支持中文
关于训练方式:类似instructGPT/ChatGPT的训练三步骤(如果忘了,务必复习下此文的3.1节)
具体而言,为两个阶段进行:
先看下整体的代码架构图
接下来,我们看下一些关键实现
首先通过ColossalAI/applications/Chat/coati/trainer/sft.py,训练一个SFT模型
import math # 导入Python的数学库
import time # 导入Python的时间库
from abc import ABC # 从Python的抽象基类库中导入ABC基类
from typing import Optional # 导入Python类型注解库中的Optional, 表示某个类型值可能为空import loralib as lora # 导入一个名为loralib的库并重命名为lora
import torch # 导入PyTorch库
import torch.distributed as dist # 导入PyTorch分布式计算库
import wandb # 导入Weights & Biases库,一般用于实验跟踪和版本控制
dels.loss import GPTLMLoss # 导入coati库中的GPTLMLoss模型
from torch import nn # 导入PyTorch的神经网络库
from torch.optim import Adam, Optimizer # 导入PyTorch优化器库中的Adam和Optimizer类
from torch.optim.lr_scheduler import LambdaLR # 导入PyTorch优化器库中的LambdaLR类,一般用于动态调整学习率
from torch.utils.data import DataLoader # 导入PyTorch数据处理库中的DataLoader类,用于加载数据
from torch.utils.data.distributed import DistributedSampler # 导入PyTorch分布式计算库中的DistributedSampler类,用于在分布式训练中采样数据
from tqdm import tqdm # 导入进度条库tqdm
kenization_utils_base import PreTrainedTokenizerBase # 导入transformers库中的PreTrainedTokenizerBase类,用于处理预训练模型的令牌化
ainer import get_scheduler # 导入transformers库中的get_scheduler函数,用于获取学习率调整策略from colossalai.logging import get_dist_logger # 导入colossalai库中的分布式日志记录函数get_dist_loggerfrom .strategies import Strategy # 导入当前目录下strategies文件中的Strategy类
from .utils import is_rank_0 # 导入当前目录下utils文件中的is_rank_0函数,用于检查当前进程是否为主进程# 下面是定义一个名为SFTTrainer的类,该类继承自abc库的ABC抽象基类
class SFTTrainer(ABC):"""Trainer to use while training reward model.Args:model (Module): the model to trainstrategy (Strategy): the strategy to use for trainingoptim(Optimizer): the optimizer to use for trainingtrain_dataloader: the dataloader to use for trainingeval_dataloader: the dataloader to use for evaluationbatch_size (int, defaults to 1): the batch size while trainingmax_epochs (int, defaults to 2): the number of epochs to trainoptim_kwargs (dict, defaults to {'lr':1e-4}): the kwargs to use while initializing optimizer"""# 下面是初始化函数,初始化SFTTrainer类的实例def __init__(self,model, # 输入参数model,即将训练的模型strategy: Strategy, # 输入参数strategy,即训练的策略optim: Optimizer, # 输入参数optim,即训练的优化器train_dataloader: DataLoader, # 输入参数train_dataloader,即训练的数据加载器eval_dataloader: DataLoader = None, # 输入参数eval_dataloader,即评估的数据加载器,默认为Nonebatch_size: int = 1, # 输入参数batch_size,即每批训练的样本数量,默认为1max_epochs: int = 2, # 输入参数max_epochs,即训练的最大轮数,默认为2accimulation_steps: int = 8, # 输入参数accimulation_steps,即梯度积累的步数,默认为8) -> None: # 初始化函数的返回值类型为Nonesuper().__init__() # 调用父类的初始化函数self.strategy = strategy # 将输入参数strategy赋值给实例变量self.strategyself.epochs = max_epochs # 将输入参数max_epochs赋值给实例变量ain_dataloader = train_dataloader # 将输入参数train_dataloader赋值给实例变量ain_dataloaderself.eval_dataloader = eval_dataloader # 将输入参数eval_dataloader赋值给实例变量self.eval_dataloader# 调用策略的setup_model方法对模型进行设置,并将返回的模型赋值给实例变量del = strategy.setup_model(model)if "DDP" in str(self.strategy): # 如果策略的字符串表示中包含"DDPdel = dule # 将模型的module属性赋值给实例变量del# 调用策略的setup_optimizer方法对优化器进行设置,并将返回的优化器赋值给实例变量self.optimizerself.optimizer = strategy.setup_optimizer(optim, del)self.accimulation_steps = accimulation_steps # 将输入参数accimulation_steps赋值给实例变量self.accimulation_stepsnum_update_steps_per_epoch = len(train_dataloader) // self.accimulation_steps # 计算每个训练轮次的更新步数max_steps = il(self.epochs * num_update_steps_per_epoch) # 计算最大更新步数# 获取学习率调度器,并赋值给实例变量self.schedulerself.scheduler = get_scheduler("cosine", # 学习率调度策略为"cosine"self.optimizer, # 优化器为实例变量self.optimizernum_warmup_stepsil(max_steps * 0.03), # 预热步数为最大更新步数的3%num_training_steps=max_steps) # 训练步数为最大更新步数# 下面是SFTTrainer类的fit方法,用于训练模型def fit(self, logger, log_interval=10): # 输入参数为logger,即日志记录器,以及log_interval,即日志记录间隔,默认为10# 初始化Weights & Biases的实验,并设置项目名为"Coati",实验名为当前时间wandb.init(project="Coati", name=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))wandb.del) # 开始监视实例变量deltotal_loss = 0 # 定义变量total_loss并初始化为0,用于记录总损失# 定义一个进度条,长度为训练轮次数,描述信息为'Epochs',如果当前进程不是主进程则禁用进度条# epoch_bar = tqdm(range(self.epochs), desc='Epochs', disable=not is_rank_0())# 定义一个进度条,长度为训练步数,描述信息为'steps',如果当前进程不是主进程则禁用进度条step_bar = tqdm(range(ain_dataloader) // self.accimulation_steps * self.epochs),desc=f'steps',disable=not is_rank_0())for epoch in range(self.epochs): # 遍历每一个训练轮次# 定义一个进度条,长度为训练样本数,描述信息为'Train process for{epoch}',如果当前进程不是主进程则禁用进度条# process_bar = tqdm(range(ain_dataloader)), desc=f'Train process for{epoch}', disable=not is_rank_0())# 训练模式ain()for batch_id, batch in ain_dataloader): # 遍历每一个训练样本# 将样本中的"input_ids"字段送到当前设备上,并赋值给变量prompt_idsprompt_ids = batch["input_ids"].to(torch.cuda.current_device())# 将样本中的"attention_mask"字段送到当前设备上,并赋值给变量p_maskp_mask = batch["attention_mask"].to(torch.cuda.current_device())# 将样本中的"labels"字段送到当前设备上,并赋值给变量labelslabels = batch["labels"].to(torch.cuda.current_device())# prompt_ids = prompt_ids.squeeze(1).cuda()# p_mask = p_mask.squeeze(1).cuda()# prompt_logits = del(prompt_ids, attention_mask=p_mask, labels=labels)# 对模型进行前向传播,并将返回的结果赋值给变量outputsoutputs = del(prompt_ids, attention_mask=p_mask, labels=labels)loss = outputs.loss # 获取outputs中的loss属性,并赋值给变量lossprompt_logits = outputs.logits # 获取outputs中的logits属性,并赋值给变量prompt_logitsif loss >= 2.5: # 如果loss大于或等于2.5logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") # 在日志中记录警告信息loss = loss / self.accimulation_steps # 将loss除以梯度积累步数total_loss += loss.item() # 将loss加入到total_loss中loss.backward() # 对loss进行反向传播if (batch_id + 1) % self.accimulation_steps == 0: # 如果当前批次是梯度积累步数的整数倍self.optimizer.step() # 对优化器执行一步优化_grad() # 清空优化器的梯度self.scheduler.step() # 对学习率调度器执行一步调度if is_rank_0(): # 如果当前进程是主进程wandb.log({"train_loss": total_loss / self.accimulation_steps}) # 在Weights & Biases中记录训练损失total_loss = 0 # 将total_loss重置为0step_bar.update(1) # 更新步骤进度条# process_bar.update(1) # 更新训练进度条if (batch_id + 1) % (self.accimulation_steps * log_interval) == 0: # 如果当前批次是日志记录间隔的整数倍self.evaluate(epoch, logger, log_interval) # 进行一次评估if self.eval_dataloader: # 如果存在评估数据加载器self.evaluate(epoch, logger) # 进行一次评估if is_rank_0(): # 如果当前进程是主进程torch.del.state_dict(), f"coati_checkpoints/epoch_{epoch}.pth") # 保存模型的状态字典wandb.save(f"coati_checkpoints/epoch_{epoch}.pth") # 在Weights & Biases中保存模型的状态字典# epoch_bar.update(1) # 更新轮次进度条wandb.finish() # 结束Weights & Biases的实验# 下面是SFTTrainer类的evaluate方法,用于评估模型def evaluate(self, epoch, logger, log_interval=10): # 输入参数为epoch,即轮次,logger,即日志记录器,以及log_interval,即日志记录间隔,默认为10# 打印一条日志信息,内容为"Start "logger.info("Start ")del.eval() # 将模型切换到评估模式total_loss = 0 # 定义变量total_loss并初始化为0,用于记录总损失# 定义一个进度条,长度为评估样本数,描述信息为'Eval process',如果当前进程不是主进程则禁用进度条# process_bar = tqdm(range(len(self.eval_dataloader)), desc='Eval process', disable=not is_rank_0())_grad(): # 禁止计算梯度for batch_id, batch in enumerate(self.eval_dataloader): # 遍历每一个评估样本# 将样本中的"input_ids"字段送到当前设备上,并赋值给变量prompt_idsprompt_ids = batch["input_ids"].to(torch.cuda.current_device())# 将样本中的"attention_mask"字段送到当前设备上,并赋值给变量p_maskp_mask = batch["attention_mask"].to(torch.cuda.current_device())# 将样本中的"labels"字段送到当前设备上,并赋值给变量labelslabels = batch["labels"].to(torch.cuda.current_device())# 对模型进行前向传播,并将返回的结果赋值给变量outputsoutputs = del(prompt_ids, attention_mask=p_mask, labels=labels)loss = outputs.loss # 获取outputs中的loss属性,并赋值给变量lossif loss >= 2.5: # 如果loss大于或等于2.5logger.warning(f"batch_id:{batch_id}, abnormal loss: {loss}") # 在日志中记录警告信息total_loss += loss.item() # 将loss加入到total_loss中if (batch_id + 1) % log_interval == 0: # 如果当前批次是日志记录间隔的整数倍if is_rank_0(): # 如果当前进程是主进程wandb.log({"eval_loss": total_loss / log_interval}) # 在Weights & Biases中记录评估损失total_loss = 0 # 将total_loss重置为0# process_bar.update(1) # 更新评估进度条logger.info(f"Finish evaluation process for epoch {epoch}") # 打印一条日志信息,内容为"Finish evaluation process for epoch {epoch}"
其次,通过ColossalAI/applications/Chat/coati/trainer/rm.py 训练一个奖励模型
from abc import ABC # 导入 abc 模块(抽象基类模块)
from datetime import datetime # 导入 datetime 模块,用于处理日期和时间
from typing import Optional # 导入 typing 模块中的 Optional 类型,它表示一个类型可能是 None
import pandas as pd # 导入 pandas 库,用于数据分析和操作
import torch # 导入 PyTorch 库,一个用于深度学习的开源库
import torch.distributed as dist # 导入 PyTorch 分布式计算模块
from torch.optim import Optimizer, lr_scheduler # 导入 PyTorch 中的优化器和学习率调度器# 导入 PyTorch 的数据加载器和数据集模块
from torch.utils.data import DataLoader, Dataset, DistributedSampler
from tqdm import tqdm # 导入 tqdm,一个用于打印进度条的库# 导入 transformers 库的 tokenization_utils_base 模块中的 PreTrainedTokenizerBase 类,它用于处理预训练的 tokenizer
kenization_utils_base import PreTrainedTokenizerBasefrom .strategies import Strategy # 从当前包的 strategies 模块中导入 Strategy 类
from .utils import is_rank_0 # 从当前包的 utils 模块中导入 is_rank_0 函数# 定义 RewardModelTrainer 类,它继承自 ABC(抽象基类)
class RewardModelTrainer(ABC):"""Trainer to use while training reward model.Args:这个类继承了 ABC 抽象基类。它接受以下参数:model:待训练的模型strategy:训练策略optim:优化器loss_fn:损失函数train_dataset:训练数据集valid_dataset:验证数据集eval_dataset:评估数据集batch_size:批次大小(默认为1)max_epochs:最大训练轮数(默认为2)"""# 初始化 RewardModelTrainer 类的实例def __init__(self,model,strategy: Strategy,optim: Optimizer,loss_fn,train_dataset: Dataset,valid_dataset: Dataset,eval_dataset: Dataset,batch_size: int = 1,max_epochs: int = 1,) -> None:# 调用父类(ABC)的初始化方法super().__init__()# 将传入的训练策略保存到实例变量中self.strategy = strategy# 将传入的最大训练轮数保存到实例变量中self.epochs = max_epochs# 初始化训练采样器为 Nonetrain_sampler = None# 如果当前运行环境已经初始化了分布式计算,并且世界尺寸(即参与分布式计算的进程数)大于 1if dist.is_initialized() _world_size() > 1:# 创建一个分布式采样器train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=42, drop_last=True)# 创建一个用于训练的数据加载器,如果 train_sampler 为 None,则将 shuffle 设为 ain_dataloader = DataLoader(train_dataset,shuffle=(train_sampler is None),sampler=train_sampler,batch_size=batch_size)# 创建一个用于验证的数据加载器self.valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)# 创建一个用于评估的数据加载器self.eval_dataloader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=True)# 调用策略的 setup_model 方法设置模型,并将结果保存到实例变量中del = strategy.setup_model(model)# 将传入的损失函数保存到实例变量中self.loss_fn = loss_fn# 调用策略的 setup_optimizer 方法设置优化器,并将结果保存到实例变量中self.optimizer = strategy.setup_optimizer(optim, del)# 创建一个余弦退火学习率调度器,并将结果保存到实例变量中self.scheduler = lr_scheduler.CosineAnnealingLR(self.optimizer, ain_dataloader.__len__() // 100)# 定义一个方法,用于评估给定数据加载器上的准确率,并计算选定奖励和拒绝奖励之间的平均距离def eval_acc(self, dataloader):# 初始化距离、计数器和准确率dist = 0on = 0cnt = 0# 将模型设置为评估模式del.eval()# 禁用梯度计算,以节省计算资源_grad():# 遍历数据加载器中的每一批数据for chosen_ids, c_mask, reject_ids, r_mask in dataloader:# 将 chosen_ids,c_mask,reject_ids 和 r_mask 移动到当前设备上,并去掉第一个维度chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())# 将 chosen_ids 和 c_mask 传入模型,计算 chosen_rewardchosen_reward = del(chosen_ids, attention_mask=c_mask)# 将 reject_ids 和 r_mask 传入模型,计算 reject_rewardreject_reward = del(reject_ids, attention_mask=r_mask)# 遍历 chosen_reward 的每个元素for i in range(len(chosen_reward)):# 计数器加一cnt += 1# 如果 chosen_reward 大于 reject_reward,那么准确率加一if chosen_reward[i] > reject_reward[i]:on += 1# 更新距离dist += (chosen_reward - reject_reward).mean().item()# 计算距离的平均值dist_mean = dist / len(dataloader)# 计算准确率acc = on / cnt# 将模型设置为训练模式ain()# 返回平均距离和准确率return dist_mean, acc# 定义一个方法,用于训练模型def fit(self):# 获取当前的日期和时间time = w()# 创建一个进度条,表示训练轮数epoch_bar = tqdm(range(self.epochs), desc='Train epoch', disable=not is_rank_0())# 遍历每一个训练轮for epoch in range(self.epochs):# 创建一个进度条,表示当前训练轮的训练步数step_bar = tqdm(ain_dataloader.__len__()),desc='Train step of epoch %d' % epoch,disable=not is_rank_0())# 将模型设置为训练模式ain()# 初始化计数器、准确率和距离cnt = 0acc = 0dist = 0# 遍历训练数据加载器中的每一批数据for chosen_ids, c_mask, reject_ids, r_mask ain_dataloader:# 将 chosen_ids,c_mask,reject_ids 和 r_mask 移动到当前设备上,并去掉第一个维度chosen_ids = chosen_ids.squeeze(1).to(torch.cuda.current_device())c_mask = c_mask.squeeze(1).to(torch.cuda.current_device())reject_ids = reject_ids.squeeze(1).to(torch.cuda.current_device())r_mask = r_mask.squeeze(1).to(torch.cuda.current_device())# 将 chosen_ids 和 c_mask 传入模型,计算 chosen_rewardchosen_reward = del(chosen_ids, attention_mask=c_mask)# 将 reject_ids 和 r_mask 传入模型,计算 reject_rewardreject_reward = del(reject_ids, attention_mask=r_mask)# 调用损失函数,计算损失loss = self.loss_fn(chosen_reward, reject_reward)# 调用策略的 backward 方法,计算梯度self.strategy.backward(loss, del, self.optimizer)# 调用策略的 optimizer_step 方法,更新模型参数self.strategy.optimizer_step(self.optimizer)# 将优化器的梯度缓存清零_grad()# 计数器加一cnt += 1# 如果计数器达到 100if cnt == 100:# 调用学习率调度器的 step 方法,更新学习率self.scheduler.step()# 计算验证数据加载器上的平均距离和准确率dist, acc = self.eval_acc(self.valid_dataloader)# 重置计数器cnt = 0# 如果当前进程是 rank 0if is_rank_0():# 创建一个 DataFrame 来存储步数、损失、距离和准确率log = pd.DataFrame([[step_bar.n, loss.item(), dist, acc]],columns=['Step', 'Loss', 'Distance', 'Accuracy'])# 将 DataFrame 保存到 CSV 文件中_csv(str(time) + '.csv', mode='a', header=False)# 更新进度条step_bar.update(100)# 更新进度条epoch_bar.update(1)# 如果当前进程是 rank 0if is_rank_0():# 打印训练结束的消息print('Training finished!')# 计算评估数据加载器上的平均距离和准确率dist, acc = self.eval_acc(self.eval_dataloader)# 创建一个 DataFrame 来存储损失、距离和准确率log = pd.DataFrame([[ain_dataloader.__len__(), 'N/A', dist, acc]],columns=['Step', 'Loss', 'Distance', 'Accuracy'])# 将 DataFrame 保存到 CSV 文件中_csv(str(time) + '.csv', mode='a', header=False)
最后,通过ColossalAI/applications/Chat/coati/trainer/ppo.py to start PPO training
from typing import Any, Callable, Dict, List, Optional # 导入一些类型别名,用于类型注解
import torch # 导入PyTorch库,这是一个机器学习库,广泛用于深度学习模型的建立和训练
as nn # 导入PyTorch的神经网络模块
perience_maker import Experience, NaiveExperienceMaker # 导入Experience和NaiveExperienceMaker,前者用于保存Agent的经验,后者用于创建Experience对象
dels.base import Actor, Critic # 导入Actor和Critic,他们是PPO算法中的关键组成部分
ation_utils import update_model_kwargs_fn # 导入函数update_model_kwargs_fn,用于更新模型的参数
dels.loss import PolicyLoss, ValueLoss # 导入PolicyLoss和ValueLoss,分别计算策略损失和价值损失
play_buffer import NaiveReplayBuffer # 导入NaiveReplayBuffer,用于保存和回放经验
from torch.optim import Optimizer # 导入Optimizer,这是优化算法的基类
kenization_utils_base import PreTrainedTokenizerBase # 导入预训练的tokenizer基类,用于处理文本数据
from .base import Trainer # 导入Trainer类,这是训练循环的基类
from .callbacks import Callback # 导入Callback类,用于在训练过程中的某些阶段执行特定的函数
from .strategies import Strategy # 导入Strategy类,它定义了模型参数更新的策略# PPOTrainer类的定义,继承自Trainer
class PPOTrainer(Trainer): # 类的初始化函数,接受许多参数def __init__(self,strategy: Strategy, # 用于更新模型参数的策略actor: Actor, # 用于选择动作的模型critic: Critic, # 用于评估动作的模型reward_model: nn.Module, # 用于计算奖励的模型initial_model: Actor, # 用于生成初始策略的模型actor_optim: Optimizer, # 用于优化actor的优化器critic_optim: Optimizer, # 用于优化critic的优化器kl_coef: float = 0.1, # kl散度系数ptx_coef: float = 0.9, # ptx系数train_batch_size: int = 8, # 训练批大小buffer_limit: int = 0, # 缓冲区大小限制buffer_cpu_offload: bool = True, # 是否在cpu上处理缓冲区eps_clip: float = 0.2, # epsilon剪裁值value_clip: float = 0.4, # 价值剪裁值experience_batch_size: int = 8, # 经验批大小max_epochs: int = 1, # 最大训练周期数tokenizer: Optional[Callable[[Any], dict]] = None, # 用于文本处理的tokenizersample_replay_buffer: bool = False, # 是否从回放缓冲区中抽样dataloader_pin_memory: bool = True, # 数据加载器是否针对内存callbacks: List[Callback] = [], # 在训练过程中的某些阶段执行的函数列表**generate_kwargs) -> None: # 其他生成参数# 创造经验生成器experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef)# 创造经验回放缓冲区replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload)# 根据策略和actor设置默认的生成参数generate_kwargs = _set_default_generate_kwargs(strategy, generate_kwargs, actor)# 调用父类的初始化函数super().__init__(strategy, experience_maker, replay_buffer, experience_batch_size, max_epochs, tokenizer,sample_replay_buffer, dataloader_pin_memory, callbacks, **generate_kwargs)# 初始化actor和criticself.actor = itic = critic# 初始化损失函数self.actor_loss_fn = PolicyLoss(eps_itic_loss_fn = ValueLoss(value_clip)self.ptx_loss_fn = nn.CrossEntropyLoss(ignore_index=-100)self.ptx_coef = ptx_coef# 初始化优化器self.actor_optim = itic_optim = critic_optim# 训练步骤函数,根据经验对象计算actor和critic的损失,并使用策略进行反向传播和优化器更新def training_step(self, experience: Experience) -> Dict[str, float]:ain() # 将actor设置为训练模式ain() # 将critic设置为训练模式# 计算策略损失num_actions = experience.action_mask.size(1)action_log_probs = self.actor(experience.sequences, num_actions, attention_mask=experience.attention_mask)actor_loss = self.actor_loss_fn(action_log_probs,experience.action_log_probs,experience.advantages,action_mask=experience.action_mask)# 计算ptx损失if self.ptx_coef != 0:ptx = next(iter(self.pretrain_dataloader))['input_ids'].to(torch.cuda.current_device())label = next(iter(self.pretrain_dataloader))['labels'].to(torch.cuda.current_device())[:, 1:]attention_mask = next(iter(self.pretrain_dataloader))['attention_mask'].to(torch.cuda.current_device())ptx_log_probs = _base_model()(ptx, attention_mask=attention_mask)['logits'][..., :-1, :]ptx_loss = self.ptx_loss_fn(ptx_log_probs.view(-1, ptx_log_probs.size(-1)), label.view(-1))actor_loss = ptx_loss * self.ptx_coef + actor_loss * (1 - self.ptx_coef)self.strategy.backward(actor_loss, self.actor, self.actor_optim) # 使用策略进行反向传播self.strategy.optimizer_step(self.actor_optim) # 使用策略进行优化器步进self._grad() # 清零优化器的梯度# 计算价值损失values = itic(experience.sequences,action_mask=experience.action_mask,attention_mask=experience.attention_mask)critic_loss = itic_loss_fn(values,experience.ward,action_mask=experience.action_mask)self.strategy.backward(critic_loss, itic, itic_optim) # 使用策略进行反向传播self.strategy.optimizer_itic_optim) # 使用策略进行优化器步进_grad() # 清零优化器的梯度return {'reward': an().item()} # 返回平均奖励# 根据策略和actor设置默认的生成参数
def _set_default_generate_kwargs(strategy: Strategy, generate_kwargs: dict, actor: Actor) -> None:origin_model = strategy._unwrap_actor(actor)new_kwargs = {**generate_kwargs}# 使用huggingface模型的方法直接生成输入if 'prepare_inputs_fn' not in generate_kwargs and hasattr(origin_model, 'prepare_inputs_for_generation'):new_kwargs['prepare_inputs_fn'] = origin_model.prepare_inputs_for_generationif 'update_model_kwargs_fn' not in generate_kwargs:new_kwargs['update_model_kwargs_fn'] = update_model_kwargs_fnreturn new_kwargs# 保存模型的函数
def save_model(self, path: str, only_rank0: bool = False, tokenizer: Optional[PreTrainedTokenizerBase] = None) -> None:self.strategy.save_model(model=self.actor, path=path, only_rank0=only_rank0, tokenizer=tokenizer) # 使用策略保存模型
在获得最终模型权重后,还可通过量化降低推理硬件成本,并启动在线推理服务,仅需单张约 4GB 显存的 GPU 即可完成 70 亿参数模型推理服务部署
更多请参见另一篇文章:从零实现带RLHF的类ChatGPT:逐行解析微软DeepSpeed Chat
本文发布于:2024-02-27 18:02:44,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/1709109678114095.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
| 留言与评论(共有 0 条评论) |