柔性Actor-Critic算法原理及应用

释放双眼,带上耳机,听听看~!
本文介绍了柔性 Actor-Critic(Soft Actor-Critic,SAC)算法的原理和应用,包括最大化熵增强学习的概念、Q 值函数和策略函数的优化方式,以及SAC算法在处理Q值估计偏差问题上的应用。

前言

  柔性 Actor-Critic(Soft Actor-Critic,SAC)算法采用了最大化熵的想法。学习的目标是最大化熵正则化的累积奖励而不只是累计奖励,从而鼓励更多的探索。

maxπθE[∑tγt(r(St,At)+αH(πθ(⋅∣St)))]underset{pi _{theta } }{max} E[sum_{t}^{}gamma ^{t}(r(S_{t}, A_{t})+alpha H(pi _{theta }(cdot|S_{t}) )) ]

  这里αalpha是正则化系数。最大化熵增强学习这个想法已经被很多论文,包括 (Fox et al., 2016; Haarnojaet al., 2017; Levine et al., 2013; Nachum et al., 2017; Ziebart et al., 2008) 提及。

SAC算法原理

  SAC采用在价值函数和策略函数之间进行交替优化的方式来学习,而不只是通过估计策略πpi的 Q 值来提升策略。令Qϕ(s,a)Q_{phi } (s,a) 表示 Q 值函数,πθpi _{theta }表示策略函数。这里我们考虑连续动作的设定并假设 πθ 的输出为一个正态分布的期望和方差。和本书前面提到的方法类似,Q 值函数可以通过最小化柔性Bellman 残差来学习:

JQ(ϕ)=E[(Q(St,At)−r(St,At)−γEst+1[Vϕ~(st+1)])2]J_{Q}(phi )=E[(Q(S_{t}, A_{t})-r(S_{t}, A_{t})-gamma E_{s_{t+1}}[V_{tilde{phi } }(s_{t+1}) ])^{2} ]

  这里Vϕ~(s)=Eπθ[Qϕ~(s,a)−αlogπθ(a∣s)]V_{tilde{phi }} (s)=E_{pi _{theta } } [Q_{tilde{phi }} (s,a)-alpha logpi _{theta }(a|s)] Qϕ~Q_{tilde{phi }}表示参数ϕ~tilde{phi }由 Q 值函数的参数ϕphi 的指数移动平均数得到的目标 Q 值网络。策略函数πθpi _{theta }可以通过最小化以下的kl散度得到。

Jπ(θ)=Es∼D[Ea∼πθ[αlogπθ(a∣s)−Qϕ(s,a)]]J_{pi } (theta )=E_{ssim D } [E_{asim pi _{theta }}[alpha logpi _{theta }(a|s)-Q_{phi } (s,a)] ]

  实际中,SAC 也使用了两个 Q 值函数(同时还有两个Q值目标函数)来处理 Q 值估计的偏差问题,也就是令 Qϕ(s,a)=min(Qϕ1(s,a),Qϕ2(s,a))Q_{phi } (s,a )=min(Q_{phi_{1} } (s,a ),Q_{phi_{2} } (s,a ))。注意到Jπ(θ)J_{pi } (theta )中的期望也依赖于策略πθpi _{theta },我们可以使用似然比例梯度估计的方法来优化Jπ(θ)J_{pi } (theta )(Williams, 1992)。在连续动作空间的设定下,我们也可以用策略网络的重参数化来优化。这样往往能够减少梯度估计的方差。再参数化的做法将πθpi _{theta }表示成一个使用状态 s 和标准正态样本ϵepsilon 作为其输入的函数直接输出动作 a:

a=fθ(s,ϵ)a=f_{theta } (s,epsilon)

代入Jπ(θ)J_{pi } (theta )的式子中

Jπ(θ)=Es∼D,ϵ∼N[αlogπθ(fθ(s,ϵ)∣s)−Qϕ(s,fθ(s,ϵ))]J_{pi } (theta )=E_{ssim D,epsilon sim N }[alpha logpi_{theta }( f_{theta } (s,epsilon)|s)-Q_{phi }(s,f_{theta } (s,epsilon) ) ]

式子中 N 表示标准正态分布,πθpi _{theta }现在被表示为fθ f_{theta }

最后,SAC 还提供了自动调节正则化参数αalpha方法。该方法通过最小化以下损失函数实现。

J(α)=Ea∼πθ[−αlogπθ(a∣s)−ακ]J(alpha )=E_{asimpi_{theta } }[-alpha logpi_{theta }( a|s)-alpha kappa ]

  这里κkappa是一个可以理解为目标熵的超参数。这种更新αalpha的方法被称为自动熵调节方法。其背后的原理是在给定每一步平均熵至少为κkappa的约束下,原来的策略优化问题的对偶形式。对自动熵调节方法的严格表述感兴趣的读者,可以参考 SAC 的论文 (Haarnoja et al., 2018)。下面是SAC的伪代码。

柔性Actor-Critic算法原理及应用

SAC算法代码详解

以SAC: Pendulum-v0为例:

  AC 使用了离线策略的方式对随机策略进行优化。它最大的特点是使用了熵正则项,但也使用了一些 TD3 中的技术。其目标 Q 值的计算使用了两个 Q 网络中的最小值和策略π(a∣s)pi(a|s)的对数概率。例子中的代码使用了这些类:ReplayBuffer、SoftQNetwork、PolicyNetwork 和 SAC。

  其中 ReplayBuffer 和 SoftQNetwork 类与 TD3 中的 ReplayBuffer 和 QNetwork 类一样,这里就不再赘述,直接介绍后续的代码。

class ReplayBuffer: 
    def __init__(self, capacity):
        ......
    def push(self, state, action, reward, next_state, done):
        ......
    def sample(self, batch_size):
        ......
    def __len__(self):
        ......
class SoftQNetwork(Model): 
    def __init__(self, num_inputs, num_actions, hidden_dim, init_w=3e-3):
        ......
    def forward(self, input):
        ......

PolicyNetwork 类也和 TD3 的十分相似。不同之处在于,SAC 使用了一个随机策略网络,而不是 TD3 中的确定性策略网络。

class PolicyNetwork(Model):
    def __init__(self, num_inputs, num_actions, hidden_dim, action_range=1.,
                 init_w=3e-3, log_std_min=-20, log_std_max=2): 
                ......
    def forward(self, state): # 前向传播
                ......
    def evaluate(self, state, epsilon=1e-6): # 进行评估
                ......
    def get_action(self, state, greedy=False): # 获取动作
                ......
    def sample_action(self): # 采样动作
                ......

随机策略网络输出了动作和对数标准差来描述动作分布。因此网络有两层输出

class PolicyNetwork(Model):
    def __init__(self, num_inputs, num_actions, hidden_dim, action_range=1.,
                 init_w=3e-3, log_std_min=-20, log_std_max=2):
        super(PolicyNetwork, self).__init__()
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        w_init = tf.keras.initializers.glorot_normal(seed=None)
        self.linear1 = Dense(n_units=hidden_dim, act=tf.nn.relu, W_init=w_init,
                             in_channels=num_inputs, name=’policy1’)
        self.linear2 = Dense(n_units=hidden_dim, act=tf.nn.relu, W_init=w_init,
                             in_channels=hidden_dim, name=’policy2’)
        self.linear3 = Dense(n_units=hidden_dim, act=tf.nn.relu, W_init=w_init,
                             in_channels=hidden_dim, name=’policy3’)
        self.mean_linear = Dense(n_units=num_actions, W_init=w_init,
                                 b_init=tf.random_uniform_initializer(-init_w, init_w),
                                 in_channels=hidden_dim, name=’policy_mean’)
        self.log_std_linear = Dense(n_units=num_actions, W_init=w_init,
                                    b_init=tf.random_uniform_initializer(-init_w, init_w),
                                    in_channels=hidden_dim, name=’policy_logstd’)
        self.action_range = action_range
        self.num_actions = num_actions

这里在 forward() 函数中的对数标准差上进行截断,防止标准差过大。

def forward(self, state):
    x = self.linear1(state)
    x = self.linear2(x)
    x = self.linear3(x)
    mean = self.mean_linear(x)
    log_std = self.log_std_linear(x)
    log_std = tf.clip_by_value(log_std, self.log_std_min, self.log_std_max)
    return mean, log_std

evaluate() 函数使用重参数技术从动作分布上采样动作,这样可以保证梯度能够反向传播。
函数也计算了采样动作在原始动作分布上的对数概率。

def evaluate(self, state, epsilon=1e-6):
    state = state.astype(np.float32)
    mean, log_std = self.forward(state)
    std = tf.math.exp(log_std) # 评估时不进行裁剪,裁剪会影响梯度
    normal = Normal(0, 1)
    z = normal.sample(mean.shape)
    action_0 = tf.math.tanh(mean + std * z) 
    action = self.action_range * action_0
    log_prob = Normal(mean, std).log_prob(mean + std * z) - tf.math.log(1. -
                                                                        action_0 ** 2 + epsilon) - np.log(self.action_range)
    log_prob = tf.reduce_sum(log_prob, axis=1)[:, np.newaxis]
    # 由于 reduce_sum 减少了 1 个维度,这里将维度扩展回来
    return action, log_prob, z, mean, log_std

get_action() 函数是前面函数的简单版。它只需要从动作分布上采样动作即可。

def get_action(self, state, greedy=False):
    mean, log_std = self.forward([state])
    std = tf.math.exp(log_std)
    normal = Normal(0, 1)
    z = normal.sample(mean.shape)
    action = self.action_range * tf.math.tanh(
        mean + std * z
    ) # 动作分布使用 TanhNormal 分布; 这里使用了重参数技术
    action = self.action_range * tf.math.tanh(mean) if greedy else action
    return action.numpy()[0]

sample_action() 函数更加简单。它只用在训练刚开始的时候采集第一次更新所需的数据。

def sample_action(self, ):
    a = tf.random.uniform([self.num_actions], -1, 1)
    return self.action_range * a.numpy()

SAC 的结构如下:

class SAC():
    def __init__(self, state_dim, action_dim, replay_buffer, hidden_dim, action_range,
                 soft_q_lr=3e-4, policy_lr=3e-4, alpha_lr=3e-4): # 建立网络及变量
            ......
    def target_ini(self, net, target_net): # 初始化目标网络时所需的硬拷贝更新
            ......
    def target_soft_update(self, net, target_net, soft_tau): # 更新目标网络时所用到的软更
            ......
    def update(self, batch_size, reward_scale=10., auto_entropy=True,
               target_entropy=-2, gamma=0.99, soft_tau=1e-2): # 更新 SAC 中所有的网络
            ......
    def save(self): # 存储训练参数
            ......
    def load(self): # 载入训练参数
            ......

SAC 算法中有 5 个网络,分别是 2 个 soft Q 网络及其目标网络,以及一个随机策略网络。另外还需要一个 alpha 变量来作为熵正则化的权衡系数。

class SAC():
    def __init__(self, state_dim, action_dim, replay_buffer, hidden_dim, action_range,
                 soft_q_lr=3e-4, policy_lr=3e-4, alpha_lr=3e-4):
        self.replay_buffer = replay_buffer
        self.soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim)
        self.soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim)
        self.target_soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim)
        self.target_soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim)
        self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range)
        self.log_alpha = tf.Variable(0, dtype=np.float32, name=’log_alpha’)
        self.alpha = tf.math.exp(self.log_alpha)
        print(’Soft Q Network (1,2): ’, self.soft_q_net1)
        print(’Policy Network: ’, self.policy_net)
        self.soft_q_net1.train()
        self.soft_q_net2.train()
        self.target_soft_q_net1.eval()
        self.target_soft_q_net2.eval()
        self.policy_net.train()
        self.target_soft_q_net1 = self.target_ini(self.soft_q_net1,
                                                  self.target_soft_q_net1)
        self.target_soft_q_net2 = self.target_ini(self.soft_q_net2,
                                                  self.target_soft_q_net2)
        self.soft_q_optimizer1 = tf.optimizers.Adam(soft_q_lr)
        self.soft_q_optimizer2 = tf.optimizers.Adam(soft_q_lr)
        self.policy_optimizer = tf.optimizers.Adam(policy_lr)
        self.alpha_optimizer = tf.optimizers.Adam(alpha_lr)

这里我们介绍一下 update() 函数。其他函数和之前 TD3 的代码一样,这里不做赘述。和往常一样,在 update() 函数的开始,我们先从回放缓存中采样数据。对奖励值进行正则化,以提高训练效果。

def update(self, batch_size, reward_scale=10., auto_entropy=True, target_entropy=-2,
           gamma=0.99, soft_tau=1e-2):
    state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
    reward = reward[:, np.newaxis] 
    done = done[:, np.newaxis]
    reward = reward_scale * (reward - np.mean(reward, axis=0)) / (
            np.std(reward, axis=0) + 1e-6
    )

在这之后,我们将基于下一个状态值计算相应的 Q 值。SAC 使用了两个目标网络输出中较小的值,这里和 TD3 相同。但是与之不同的是,SAC 在计算目标 Q 值的时候增加了熵正则项。这里的 log_prob 部分是一个权衡策略随机性的熵值。

new_next_action, next_log_prob, _, _, _ = self.policy_net.evaluate(next_state)
target_q_input = tf.concat([next_state, new_next_action], 1) # 第 0 维是样本数量
target_q_min = tf.minimum(
    self.target_soft_q_net1(target_q_input),
    self.target_soft_q_net2(target_q_input)
) - self.alpha * next_log_prob
target_q_value = reward + (1 - done) * gamma * target_q_min

在计算 Q 值之后,训练 Q 网络就很简单了。

q_input = tf.concat([state, action], 1)
with tf.GradientTape() as q1_tape:
    predicted_q_value1 = self.soft_q_net1(q_input)
    q_value_loss1 =
        tf.reduce_mean(tf.losses.mean_squared_error(predicted_q_value1,
                                                    target_q_value))
q1_grad = q1_tape.gradient(q_value_loss1, self.soft_q_net1.trainable_weights)
self.soft_q_optimizer1.apply_gradients(zip(q1_grad,
                                           self.soft_q_net1.trainable_weights))
with tf.GradientTape() as q2_tape:
    predicted_q_value2 = self.soft_q_net2(q_input)
    q_value_loss2 =
        tf.reduce_mean(tf.losses.mean_squared_error(predicted_q_value2,
                                                    target_q_value))
q2_grad = q2_tape.gradient(q_value_loss2, self.soft_q_net2.trainable_weights)
self.soft_q_optimizer2.apply_gradients(zip(q2_grad,
                                           self.soft_q_net2.trainable_weights))

这里的策略损失考虑了额外的熵项。通过最大化损失函数,可以训练策略来使预期回报和熵之间的权衡达到最佳。

with tf.GradientTape() as p_tape:
    new_action, log_prob, z, mean, log_std = self.policy_net.evaluate(state)
    new_q_input = tf.concat([state, new_action], 1) # 第 0 维是样本数量
    # 实现方式一
    predicted_new_q_value = tf.minimum(self.soft_q_net1(new_q_input),
                                       self.soft_q_net2(new_q_input))
    # 实现方式二
    # predicted_new_q_value = self.soft_q_net1(new_q_input)
    policy_loss = tf.reduce_mean(self.alpha * log_prob - predicted_new_q_value)
p_grad = p_tape.gradient(policy_loss, self.policy_net.trainable_weights)
self.policy_optimizer.apply_gradients(zip(p_grad,
                                          self.policy_net.trainable_weights))

最后,我们要更新熵权衡系数 alpha 和目标网络。

if auto_entropy is True:
    with tf.GradientTape() as alpha_tape:
        alpha_loss = -tf.reduce_mean((self.log_alpha * (log_prob +
                                                        target_entropy)))
    alpha_grad = alpha_tape.gradient(alpha_loss, [self.log_alpha])
    self.alpha_optimizer.apply_gradients(zip(alpha_grad, [self.log_alpha]))
    self.alpha = tf.math.exp(self.log_alpha)
else: 
    self.alpha = 1.
    alpha_loss = 0
# 软更新目标价值网络
self.target_soft_q_net1 = self.target_soft_update(self.soft_q_net1,
                                                  self.target_soft_q_net1, soft_tau)
self.target_soft_q_net2 = self.target_soft_update(self.soft_q_net2,
                                                  self.target_soft_q_net2, soft_tau)

训练的主循环和 TD3 一样,先建立环境和智能体。

env = gym.make(ENV_ID).unwrapped
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_range = env.action_space.high # 缩放动作,[-action_range, action_range]
# 设置随机种子,方便复现效果
env.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)
# 初始化智能体
agent = SAC(state_dim, action_dim, action_range, HIDDEN_DIM,
            replay_buffer, SOFT_Q_LR, POLICY_LR, ALPHA_LR)
t0 = time.time()

之后,使用智能体和环境交互,并存储用于更新的采样数据。在第一次更新之前,用随机动作来采集数据。

# 训练循环
if args.train:
    frame_idx = 0
    all_episode_reward = []
    # 这里需要进行一次额外的调用,来使内部函数进行一些初始化操作,让其可以正常使用
    # model.forward 函数
    state = env.reset().astype(np.float32)
    agent.policy_net([state])
    for episode in range(TRAIN_EPISODES):
        state = env.reset().astype(np.float32)
        episode_reward = 0
        for step in range(MAX_STEPS):
            if RENDER:
                env.render()
            if frame_idx > EXPLORE_STEPS:
                action = agent.policy_net.get_action(state)
            else:
                action = agent.policy_net.sample_action()
            next_state, reward, done, _ = env.step(action)
            next_state = next_state.astype(np.float32)
            done = 1 if done is True else 0
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward
            frame_idx += 1

采集到足够的数据后,我们可以开始在每步进行更新。

if len(replay_buffer) > BATCH_SIZE:
    for i in range(UPDATE_ITR):
        agent.update(
            BATCH_SIZE, reward_scale=REWARD_SCALE,
            auto_entropy=AUTO_ENTROPY,
        target_entropy=-1. * action_dim
        )
if done:
    break

通过上述步骤,智能体就可以通过不断更新变得越来越强了。增加下面的代码可以更好地显示训练过程。

if episode == 0:
    all_episode_reward.append(episode_reward)
else:
    all_episode_reward.append(all_episode_reward[-1] * 0.9 + episode_reward *
                              0.1)
print(
    ’Training | Episode: {}/{} | Episode Reward: {:.4f} | Running Time:
      {:.4f}’.format(
      episode+1, TRAIN_EPISODES, episode_reward,
      time.time() - t0
    )
)

最后,存储模型并且绘制学习曲线。

agent.save()
plt.plot(all_episode_reward)
if not os.path.exists(’image’):
    os.makedirs(’image’)
plt.savefig(os.path.join(’image’, ’sac.png’))
本网站的内容主要来自互联网上的各种资源,仅供参考和信息分享之用,不代表本网站拥有相关版权或知识产权。如您认为内容侵犯您的权益,请联系我们,我们将尽快采取行动,包括删除或更正。
AI教程

深度学习模型服务:定义、挑战和设计

2023-12-18 11:08:14

AI教程

扩散模型:图像生成的新思路

2023-12-18 11:17:14

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索