手机网站建设教程视频,安装网站程序,强 一级二级2022,网站建设公司地址Stable-Baselines 3 部分源代码解读 ./common/on_policy_algorithm.py
前言
阅读PPO相关的源码#xff0c;了解一下标准库是如何建立PPO算法以及各种tricks的#xff0c;以便于自己的复现。
在Pycharm里面一直跳转#xff0c;可以看到PPO类是最终继承于基类#xff0c;也…Stable-Baselines 3 部分源代码解读 ./common/on_policy_algorithm.py
前言
阅读PPO相关的源码了解一下标准库是如何建立PPO算法以及各种tricks的以便于自己的复现。
在Pycharm里面一直跳转可以看到PPO类是最终继承于基类也就是这个py文件的内容。
所以阅读源码就先从这里开始。: )
import 包
import sys
import time
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Unionimport numpy as np
import torch as th
from gym import spacesfrom stable_baselines3.common.base_class import BaseAlgorithm
from stable_baselines3.common.buffers import DictRolloutBuffer, RolloutBuffer
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.type_aliases import GymEnv, MaybeCallback, Schedule
from stable_baselines3.common.utils import obs_as_tensor, safe_mean
from stable_baselines3.common.vec_env import VecEnvOnPolicyAlgorithm 类
这个类是PPO算法类的中间曾夹在底层基类和上层PPO类的之间。
主要是同策略算法例如A2C和PPO算法。
policy、env和learning_rate三者与基类base-class.py的一致
n_steps表示每次更新前需要经过的时间步作者在这里给出了n_steps * n_envs的例子可能的意思是如果环境是重复的多个打算做并行训练的话那么就是每个子环境的时间步乘以环境的数量
batch_size经验回放的最小批次信息
gamma、gae_lambda、clip_range、clip_range_vf均是具有默认值的参数分别代表“折扣因子”、“GAE奖励中平衡偏置和方差的参数”、“为网络参数而限制幅度的范围”、“为值函数网络参数而限制幅度的范围”
normalize_advantage标志是否需要归一化优势advantage
ent_coef、vf_coef损失计算的熵系数
max_grad_norm最大的梯度长度梯度下降的限幅
use_sde、sde_sample_freq是状态独立性探索只适用于连续环境与基类base-class.py的一致
target_kl限制每次更新时KL散度不能太大因为clipping限幅不能防止大量更新
monitor_wrapper标志是否需要Gym库提供的监视器包装器
_init_setup_model是否建立模型也就是是否在创建这个实例过程中创建初始化模型
class OnPolicyAlgorithm(BaseAlgorithm):The base for On-Policy algorithms (ex: A2C/PPO).:param policy: The policy model to use (MlpPolicy, CnnPolicy, ...):param env: The environment to learn from (if registered in Gym, can be str):param learning_rate: The learning rate, it can be a functionof the current progress remaining (from 1 to 0):param n_steps: The number of steps to run for each environment per update(i.e. batch size is n_steps * n_env where n_env is number of environment copies running in parallel):param gamma: Discount factor:param gae_lambda: Factor for trade-off of bias vs variance for Generalized Advantage Estimator.Equivalent to classic advantage when set to 1.:param ent_coef: Entropy coefficient for the loss calculation:param vf_coef: Value function coefficient for the loss calculation:param max_grad_norm: The maximum value for the gradient clipping:param use_sde: Whether to use generalized State Dependent Exploration (gSDE)instead of action noise exploration (default: False):param sde_sample_freq: Sample a new noise matrix every n steps when using gSDEDefault: -1 (only sample at the beginning of the rollout):param tensorboard_log: the log location for tensorboard (if None, no logging):param monitor_wrapper: When creating an environment, whether to wrap itor not in a Monitor wrapper.:param policy_kwargs: additional arguments to be passed to the policy on creation:param verbose: Verbosity level: 0 for no output, 1 for info messages (such as device or wrappers used), 2 fordebug messages:param seed: Seed for the pseudo random generators:param device: Device (cpu, cuda, ...) on which the code should be run.Setting it to auto, the code will be run on the GPU if possible.:param _init_setup_model: Whether or not to build the network at the creation of the instance:param supported_action_spaces: The action spaces supported by the algorithm.def __init__(self,policy: Union[str, Type[ActorCriticPolicy]],env: Union[GymEnv, str],learning_rate: Union[float, Schedule],n_steps: int,gamma: float,gae_lambda: float,ent_coef: float,vf_coef: float,max_grad_norm: float,use_sde: bool,sde_sample_freq: int,tensorboard_log: Optional[str] None,monitor_wrapper: bool True,policy_kwargs: Optional[Dict[str, Any]] None,verbose: int 0,seed: Optional[int] None,device: Union[th.device, str] auto,_init_setup_model: bool True,supported_action_spaces: Optional[Tuple[spaces.Space, ...]] None,):super().__init__(policypolicy,envenv,learning_ratelearning_rate,policy_kwargspolicy_kwargs,verboseverbose,devicedevice,use_sdeuse_sde,sde_sample_freqsde_sample_freq,support_multi_envTrue,seedseed,tensorboard_logtensorboard_log,supported_action_spacessupported_action_spaces,)self.n_steps n_stepsself.gamma gammaself.gae_lambda gae_lambdaself.ent_coef ent_coefself.vf_coef vf_coefself.max_grad_norm max_grad_normself.rollout_buffer None# 调用基类的_setup_model()模型if _init_setup_model:self._setup_model()def _setup_model(self) - None:# 初始化学习率让他可以调用self._setup_lr_schedule()# 设置随机数种子self.set_random_seed(self.seed)# 设置经验池子的类如果观测空间是spaces.Dict类那么就赋值DictRolloutBuffer# 如果观测空间不是spaces.Dict类那么就赋值RolloutBufferbuffer_cls DictRolloutBuffer if isinstance(self.observation_space, spaces.Dict) else RolloutBuffer# 根据类初始化实例经验池子# 初始化经验池子的是时候将设备信息、折扣率、GAE超参数和环境的数量也传进去了self.rollout_buffer buffer_cls(self.n_steps,self.observation_space,self.action_space,deviceself.device,gammaself.gamma,gae_lambdaself.gae_lambda,n_envsself.n_envs,)# 初始化策略直接输入状态空间、动作空间、可调用的学习率、是否使用状态独立性探索以及自己制定策略# 的时候自己家的模型的参数和激活函数self.policy self.policy_class( # pytype:disablenot-instantiableself.observation_space,self.action_space,self.lr_schedule,use_sdeself.use_sde,**self.policy_kwargs # pytype:disablenot-instantiable)# 将策略放到GPU/CPU中self.policy self.policy.to(self.device)def collect_rollouts(self,env: VecEnv,callback: BaseCallback,rollout_buffer: RolloutBuffer,n_rollout_steps: int,) - bool:# 收集环境交互数据# 这个方法使用当前的策略并将交互历史填充到RolloutBuffer经验池子中# rollout的意思是无模型的概念而不是有模型的RL或规划里面的rollout的概念# env 用于训练的环境# callback 在每个时间步都会调用的回调函数# rollout_buffer 将收集的经验放置到rollout_buffer中# 在每个环境中需要收集的条数# 返回值是True如果rollout_buffer收集了这么多的经验返回值是False如果回调函数提前终止了# 这个rollouts。Collect experiences using the current policy and fill a RolloutBuffer.The term rollout here refers to the model-free notion and should notbe used with the concept of rollout used in model-based RL or planning.:param env: The training environment:param callback: Callback that will be called at each step(and at the beginning and end of the rollout):param rollout_buffer: Buffer to fill with rollouts:param n_rollout_steps: Number of experiences to collect per environment:return: True if function returned with at least n_rollout_stepscollected, False if callback terminated rollout prematurely.assert self._last_obs is not None, No previous observation was provided# 将策略转变到评估模式# Switch to eval mode (this affects batch norm / dropout)self.policy.set_training_mode(False)# 重置经验池子如果使用状态独立性探索那么就重置策略的噪声n_steps 0rollout_buffer.reset()# Sample new weights for the state dependent explorationif self.use_sde:self.policy.reset_noise(env.num_envs)# 回调函数执行on_rollout_start()命令跳转定义时候没有看到具体定义callback.on_rollout_start()while n_steps n_rollout_steps:# 如果使用了状态独立性探索并且达到了探索频率的节点那么就重置策略的噪声if self.use_sde and self.sde_sample_freq 0 and n_steps % self.sde_sample_freq 0:# Sample a new noise matrixself.policy.reset_noise(env.num_envs)# 在断开梯度的情况下转换观测数据到tensor张量内然后输入到策略中输出动作、价值和对数概率# 最后再将动作数据转移到numpy中with th.no_grad():# Convert to pytorch tensor or to TensorDictobs_tensor obs_as_tensor(self._last_obs, self.device)actions, values, log_probs self.policy(obs_tensor)actions actions.cpu().numpy()# Rescale and perform action# 归一化动作信息限制在动作空间的上下界clipped_actions actions# Clip the actions to avoid out of bound errorif isinstance(self.action_space, spaces.Box):clipped_actions np.clip(actions, self.action_space.low, self.action_space.high)# 将动作信息输入到环境中输出新的观测、奖励数值、是否完成以及其他信息。new_obs, rewards, dones, infos env.step(clipped_actions)# 处理回调函数和更新经验池子self.num_timesteps env.num_envs# Give access to local variablescallback.update_locals(locals())if callback.on_step() is False:return Falseself._update_info_buffer(infos)n_steps 1# 如果动作空间是离散空间的话那么就转变成一个列向量if isinstance(self.action_space, spaces.Discrete):# Reshape in case of discrete actionactions actions.reshape(-1, 1)# 判断数据是否是终止的# 终止之后计算累计奖励# Handle timeout by bootstraping with value function# see GitHub issue #633for idx, done in enumerate(dones):if (doneand infos[idx].get(terminal_observation) is not Noneand infos[idx].get(TimeLimit.truncated, False)):terminal_obs self.policy.obs_to_tensor(infos[idx][terminal_observation])[0]with th.no_grad():terminal_value self.policy.predict_values(terminal_obs)[0]rewards[idx] self.gamma * terminal_value# 经验池子输入的是上一个状态、动作、奖励、上一个回合的开始状态、价值列表以及对数概率rollout_buffer.add(self._last_obs, actions, rewards, self._last_episode_starts, values, log_probs)self._last_obs new_obsself._last_episode_starts dones# 计算下一个状态的价值with th.no_grad():# Compute value for the last timestepvalues self.policy.predict_values(obs_as_tensor(new_obs, self.device))# 计算回报和优势rollout_buffer.compute_returns_and_advantage(last_valuesvalues, donesdones)callback.on_rollout_end()return Truedef train(self) - None:# 这个是父类的方法# 在子类的实际PPO类中做了重写Consume current rollout data and update policy parameters.Implemented by individual algorithms.raise NotImplementedErrordef learn(self: SelfOnPolicyAlgorithm,total_timesteps: int,callback: MaybeCallback None,log_interval: int 1,tb_log_name: str OnPolicyAlgorithm,reset_num_timesteps: bool True,progress_bar: bool False,) - SelfOnPolicyAlgorithm:iteration 0# 初始化模型total_timesteps, callback self._setup_learn(total_timesteps,callback,reset_num_timesteps,tb_log_name,progress_bar,)callback.on_training_start(locals(), globals())while self.num_timesteps total_timesteps:# 这里开始执行上面的函数在环境中收集数据收集完了就继续训练# 如果出了故障了就在接下来跳出循环continue_training self.collect_rollouts(self.env, callback, self.rollout_buffer, n_rollout_stepsself.n_steps)if continue_training is False:break# 跌带次数1,并根据当前的训练次数更新学习率iteration 1self._update_current_progress_remaining(self.num_timesteps, total_timesteps)# 在控制台按照预先定义的频率输出相关信息# Display training infosif log_interval is not None and iteration % log_interval 0:time_elapsed max((time.time_ns() - self.start_time) / 1e9, sys.float_info.epsilon)fps int((self.num_timesteps - self._num_timesteps_at_start) / time_elapsed)self.logger.record(time/iterations, iteration, excludetensorboard)if len(self.ep_info_buffer) 0 and len(self.ep_info_buffer[0]) 0:self.logger.record(rollout/ep_rew_mean, safe_mean([ep_info[r] for ep_info in self.ep_info_buffer]))self.logger.record(rollout/ep_len_mean, safe_mean([ep_info[l] for ep_info in self.ep_info_buffer]))self.logger.record(time/fps, fps)self.logger.record(time/time_elapsed, int(time_elapsed), excludetensorboard)self.logger.record(time/total_timesteps, self.num_timesteps, excludetensorboard)self.logger.dump(stepself.num_timesteps)self.train()callback.on_training_end()return selfdef _get_torch_save_params(self) - Tuple[List[str], List[str]]:state_dicts [policy, policy.optimizer]return state_dicts, []