LOADING

加载过慢请开启缓存 浏览器默认开启

强化学习实战1——交叉熵方法

Chapter1 交叉熵方法

1.1 RL方法的分类

交叉熵方法属于 无模型基于策略 的方法类别。

所有的 RL 方法可以被分成以下几类:

  1. 无模型 或 基于模型
  2. 基于价值 或 基于策略
  3. 在线策略 或 离线策略

交叉熵方法是无模型的、基于策略的在线策略的方法,这意味着:

  1. 它不构建环境的任何模型,只告诉智能体每一步需要做什么
  2. 它计算智能体的策略
  3. 它从环境中获取新数据

1.2 交叉熵方法的实践

交叉熵方法是基于策略的,非线性函数(神经网络)生成策略,它针对每一个观察都告诉智能体应该执行什么动作

实践中,策略通常表示为动作的概率分布,这和分类问题很像,策略的数量和要执行的动作数量相同

这种抽象让智能体变得非常简单:

将从环境中得到的观察传给NN,得到的动作的概率分布,使用概率分布来进行随机采样以获得要执行的动作。

在智能体的一生中,它的经历被表示成片段,每个片段都由一系列的观察(智能体从环境中获得的)、动作(智能体发出的)和奖励(由动作产生的)组成。

假设一共有四个片段(注意每个片段都有不同的 $o_i$、$a_i$、$r_i$值),每个单元格表示智能体在片段中的一步。由于环境的随机性以及智能体选择动作的不同方式,某些片段会比其他片段好。交叉熵方法的核心是将差的片段丢掉,并用好的片段来训练。所以,该方法的步骤如下:

  1. 使用当前的模型和环境产生N次片段
  2. 计算每个片段的总奖励,并确定奖励边界。通常使用总奖励的百分位来确定,例如50或70
  3. 将奖励在边界之下的片段去掉
  4. 用观察值作为输入、智能体产生的动作作为目标输出,训练剩余的“精英”片段
  5. 从第一步开始反复,知道得到满意的结果

1.3 交叉熵方法在 CartPole 中的应用

模型的核心是有 1 个隐藏层的NN,带有整流线性函数(Rectified Linear Unit, ReLU)以及 128 个隐藏层神经元。

其他超参数基本是随机设置的,并没有调优过,因为这个方法本身鲁棒性很好,并且收敛得很快。

HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70

我们将常量放在文件的最上面,它们包含了隐藏层中神经元的数量、在每次迭代中训练的片段数(16),以及用来过滤精英片段的奖励边界百分位。这里使用70作为奖励边界,这意味着会留下按奖励排序后前30%的片段

from torch import nn

class Net(nn.Module):
    def __init__(self, obs_size, hidden_size, n_actions):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions)
        )
        
    def forward(self, x):
        return self.net(x)

NN 并没有什么特别之处,它将从环境中得到的单个观察结果作为输入向量,并输出一个数字作为可以执行的动作。

NN 的输出是动作的概率分布,所以一个比较直接的方法是在最后一层使用一个非线性的softmax。但是,在前面的 NN 中,我们不使用 softmax 来增加训练过程中数值的稳定性。

比起先计算 softmax (使用了幂运算)再计算交叉熵损失(使用了对数概率),我们使用pytorch 的 nn.CrossEntropyLoss 类,它将 softmax 和交叉熵合二为一,能够提供更好的数值稳定性。CrossEntropyLoss 要求参数是NN中的原始、未归一化的值。

from collections import namedtuple

Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple(
    'EpisodeStep', field_names=['observation', 'action']
)

在这里,我们定义了两个命名元组类型的帮助类:

  1. EpisodeStep:这个用于表示智能体在片段中执行的一步,同时它会保存来自环境的观察和采取的动作。在精英片段的训练中我们会用到它
  2. Episode:这是单个片段,它保存了总的无折扣奖励以及EpisodeStep集合
# 用片段生成批的函数
def iterate_batches(env, net, batch_size):
    batch = []
    episode_reward = 0.0
    episode_steps = []
    obs = env.reset()
    sm = nn.Softmax(dim=1)    # 用来将 NN 的输出转换成动作的概率分布

    while True:
        obs_v = torch.FloatTensor([obs])
        act_probs_v = sm(net(obs_v))
        act_probs = act_probs_v.data.numpy()[0]

        action = np.random.choice(len(act_probs), p=act_probs)
        next_obs, reward, is_done, _ = env.step(action)

        episode_reward += reward
        step = EpisodeStep(observation=obs, action=action)
        episode_steps.append(step)

        if is_done:
            e = Episode(reward=episode_reward, steps=episode_steps)
            batch.append(e)
            episode_reward = 0.0
            episode_steps = []
            next_obs = env.reset()
            if len(batch) == batch_size:
                yield batch
                batch = []

            obs = next_obs

上述函数接受环境(Gym库中的Env实例)、NN 以及每个迭代需要生成的片段数作为输入

在每次迭代中,将当前的观察转换成 PyTorch 张量,并将其传入 NN 以获得动作概率分布。有以下几件事需要注意:

  1. 所有 pytorch 中的 nn.Module 实例都接受一批数据,对于 NN 也是一样的,所以我们将观察(在 cartpole 中为一个由 4 个数字组成的向量)转换成 1*4 大小的张量
  2. 由于没有在 NN 的输出使用非线性函数,它会输出一个原始的动作分数,因此需要将其用 softmax 函数处理
  3. NN 和 softmax 层都返回包含了梯度的张量,所以我们需要通过访问 tensor.data 字段来将其数据取出来,然后将张量转换成 Numpy 数组,该数组和输入一样,有同样的二维结构,0轴是批的维度,所以我们需要获取第一个元素,这样才能得到动作概率的一维向量。

既然有了动作的概率分布,只需要使用 Numpy 的 random.choice() 函数对分布进行采样,就能获得当前步骤该选择的动作。然后,将动作传给环境来获得下一个观察、奖励以及片段是否结束的标记

奖励被加入当前片段的总奖励,片段的步骤列表也添加来一个(observation, action)对。注意,保存的是用来选择动作的观察,而不是动作执行后从环境返回的观察。这些都是需要牢记的微小但重要的细节。

在这个函数的逻辑处理中,要理解一个非常重要的方面是:NN 的训练和片段的生成是同时进行的。它们并不是完全并行的,但是每积累了足够(16)的片段之后,控制权将转移到调用方,调用方会用梯度下降来训练 NN。所以,每当 yield 返回时, NN 都会稍微有点进步

def filter_batch(batch, percentile):
    rewards = list(map(lambda s: s.reward, batch))
    reward_bound = np.percentile(rewards, percentile)
    reward_mean = float(np.mean(rewards))

    train_obs = []
    train_act = []
    for reward, steps in batch:
        if reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, steps))
        train_act.extend(map(lambda step: step.action, steps))

        train_obs_v = torch.FloatTensor(train_obs)
        train_act_v = torch.LongTensor(train_act)
        return train_obs_v, train_act_v, value_bound, reward_mean    # 最后两个值只用于写入 TensorBoard,以检验智能体的性能

这个函数是交叉熵的核心——根据给定的一批片段和百分位值,计算奖励边界,以用于过滤要用于训练的精英片段。

为了获得奖励边界,我们将使用 Numpy 的 percentile 函数,该函数根据给定的值列表和百分位计算百分位的值。

然后再计算平均奖励用于监控

然后过滤片段。针对批中的每个片段,检查其总奖励值是否高于边界,如果高于,则将其观察和动作添加到要训练的列表中。

1.4 完整代码

#!/usr/bin/env python3
import gym
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriter

import torch
import torch.nn as nn
import torch.optim as optim


HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70


class Net(nn.Module):
    def __init__(self, obs_size, hidden_size, n_actions):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions)
        )

    def forward(self, x):
        return self.net(x)


Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])


def iterate_batches(env, net, batch_size):
    batch = []
    episode_reward = 0.0
    episode_steps = []
    obs = env.reset()
    sm = nn.Softmax(dim=1)
    while True:
        obs_v = torch.FloatTensor([obs])
        act_probs_v = sm(net(obs_v))
        act_probs = act_probs_v.data.numpy()[0]
        action = np.random.choice(len(act_probs), p=act_probs)
        next_obs, reward, is_done, _ = env.step(action)
        episode_reward += reward
        step = EpisodeStep(observation=obs, action=action)
        episode_steps.append(step)
        if is_done:
            e = Episode(reward=episode_reward, steps=episode_steps)
            batch.append(e)
            episode_reward = 0.0
            episode_steps = []
            next_obs = env.reset()
            if len(batch) == batch_size:
                yield batch
                batch = []
        obs = next_obs


def filter_batch(batch, percentile):
    rewards = list(map(lambda s: s.reward, batch))
    reward_bound = np.percentile(rewards, percentile)
    reward_mean = float(np.mean(rewards))

    train_obs = []
    train_act = []
    for reward, steps in batch:
        if reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, steps))
        train_act.extend(map(lambda step: step.action, steps))

    train_obs_v = torch.FloatTensor(train_obs)
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean


if __name__ == "__main__":
    env = gym.make("CartPole-v0")
    # env = gym.wrappers.Monitor(env, directory="mon", force=True)
    obs_size = env.observation_space.shape[0]
    n_actions = env.action_space.n

    net = Net(obs_size, HIDDEN_SIZE, n_actions)
    objective = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=net.parameters(), lr=0.01)
    writer = SummaryWriter(comment="-cartpole")

    for iter_no, batch in enumerate(iterate_batches(
            env, net, BATCH_SIZE)):
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
        
        optimizer.zero_grad()
        action_scores_v = net(obs_v)
        loss_v = objective(action_scores_v, acts_v)
        loss_v.backward()
        optimizer.step()
        print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (
            iter_no, loss_v.item(), reward_m, reward_b))
        writer.add_scalar("loss", loss_v.item(), iter_no)
        writer.add_scalar("reward_bound", reward_b, iter_no)
        writer.add_scalar("reward_mean", reward_m, iter_no)
        if reward_m > 199:
            print("Solved!")
            break
    writer.close()