Chapter1 交叉熵方法
1.1 RL方法的分类
交叉熵方法属于 无模型 和 基于策略 的方法类别。
所有的 RL 方法可以被分成以下几类:
- 无模型 或 基于模型
- 基于价值 或 基于策略
- 在线策略 或 离线策略
交叉熵方法是无模型的、基于策略的在线策略的方法,这意味着:
- 它不构建环境的任何模型,只告诉智能体每一步需要做什么
- 它计算智能体的策略
- 它从环境中获取新数据
1.2 交叉熵方法的实践
交叉熵方法是基于策略的,非线性函数(神经网络)生成策略,它针对每一个观察都告诉智能体应该执行什么动作
实践中,策略通常表示为动作的概率分布,这和分类问题很像,策略的数量和要执行的动作数量相同
这种抽象让智能体变得非常简单:
将从环境中得到的观察传给NN,得到的动作的概率分布,使用概率分布来进行随机采样以获得要执行的动作。
在智能体的一生中,它的经历被表示成片段,每个片段都由一系列的观察(智能体从环境中获得的)、动作(智能体发出的)和奖励(由动作产生的)组成。
假设一共有四个片段(注意每个片段都有不同的 $o_i$、$a_i$、$r_i$值),每个单元格表示智能体在片段中的一步。由于环境的随机性以及智能体选择动作的不同方式,某些片段会比其他片段好。交叉熵方法的核心是将差的片段丢掉,并用好的片段来训练。所以,该方法的步骤如下:
- 使用当前的模型和环境产生N次片段
- 计算每个片段的总奖励,并确定奖励边界。通常使用总奖励的百分位来确定,例如50或70
- 将奖励在边界之下的片段去掉
- 用观察值作为输入、智能体产生的动作作为目标输出,训练剩余的“精英”片段
- 从第一步开始反复,知道得到满意的结果
1.3 交叉熵方法在 CartPole 中的应用
模型的核心是有 1 个隐藏层的NN,带有整流线性函数(Rectified Linear Unit, ReLU)以及 128 个隐藏层神经元。
其他超参数基本是随机设置的,并没有调优过,因为这个方法本身鲁棒性很好,并且收敛得很快。
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70
我们将常量放在文件的最上面,它们包含了隐藏层中神经元的数量、在每次迭代中训练的片段数(16),以及用来过滤精英片段的奖励边界百分位。这里使用70作为奖励边界,这意味着会留下按奖励排序后前30%的片段
from torch import nn
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
super(Net, self).__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
def forward(self, x):
return self.net(x)
NN 并没有什么特别之处,它将从环境中得到的单个观察结果作为输入向量,并输出一个数字作为可以执行的动作。
NN 的输出是动作的概率分布,所以一个比较直接的方法是在最后一层使用一个非线性的softmax。但是,在前面的 NN 中,我们不使用 softmax 来增加训练过程中数值的稳定性。
比起先计算 softmax (使用了幂运算)再计算交叉熵损失(使用了对数概率),我们使用pytorch 的 nn.CrossEntropyLoss
类,它将 softmax 和交叉熵合二为一,能够提供更好的数值稳定性。CrossEntropyLoss
要求参数是NN中的原始、未归一化的值。
from collections import namedtuple
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple(
'EpisodeStep', field_names=['observation', 'action']
)
在这里,我们定义了两个命名元组类型的帮助类:
- EpisodeStep:这个用于表示智能体在片段中执行的一步,同时它会保存来自环境的观察和采取的动作。在精英片段的训练中我们会用到它
- Episode:这是单个片段,它保存了总的无折扣奖励以及EpisodeStep集合
# 用片段生成批的函数
def iterate_batches(env, net, batch_size):
batch = []
episode_reward = 0.0
episode_steps = []
obs = env.reset()
sm = nn.Softmax(dim=1) # 用来将 NN 的输出转换成动作的概率分布
while True:
obs_v = torch.FloatTensor([obs])
act_probs_v = sm(net(obs_v))
act_probs = act_probs_v.data.numpy()[0]
action = np.random.choice(len(act_probs), p=act_probs)
next_obs, reward, is_done, _ = env.step(action)
episode_reward += reward
step = EpisodeStep(observation=obs, action=action)
episode_steps.append(step)
if is_done:
e = Episode(reward=episode_reward, steps=episode_steps)
batch.append(e)
episode_reward = 0.0
episode_steps = []
next_obs = env.reset()
if len(batch) == batch_size:
yield batch
batch = []
obs = next_obs
上述函数接受环境(Gym库中的Env
实例)、NN 以及每个迭代需要生成的片段数作为输入
在每次迭代中,将当前的观察转换成 PyTorch 张量,并将其传入 NN 以获得动作概率分布。有以下几件事需要注意:
- 所有 pytorch 中的
nn.Module
实例都接受一批数据,对于 NN 也是一样的,所以我们将观察(在 cartpole 中为一个由 4 个数字组成的向量)转换成 1*4 大小的张量 - 由于没有在 NN 的输出使用非线性函数,它会输出一个原始的动作分数,因此需要将其用
softmax
函数处理 - NN 和 softmax 层都返回包含了梯度的张量,所以我们需要通过访问
tensor.data
字段来将其数据取出来,然后将张量转换成 Numpy 数组,该数组和输入一样,有同样的二维结构,0轴是批的维度,所以我们需要获取第一个元素,这样才能得到动作概率的一维向量。
既然有了动作的概率分布,只需要使用 Numpy 的 random.choice()
函数对分布进行采样,就能获得当前步骤该选择的动作。然后,将动作传给环境来获得下一个观察、奖励以及片段是否结束的标记
奖励被加入当前片段的总奖励,片段的步骤列表也添加来一个(observation, action)对。注意,保存的是用来选择动作的观察,而不是动作执行后从环境返回的观察。这些都是需要牢记的微小但重要的细节。
在这个函数的逻辑处理中,要理解一个非常重要的方面是:NN 的训练和片段的生成是同时进行的。它们并不是完全并行的,但是每积累了足够(16)的片段之后,控制权将转移到调用方,调用方会用梯度下降来训练 NN。所以,每当 yield 返回时, NN 都会稍微有点进步
def filter_batch(batch, percentile):
rewards = list(map(lambda s: s.reward, batch))
reward_bound = np.percentile(rewards, percentile)
reward_mean = float(np.mean(rewards))
train_obs = []
train_act = []
for reward, steps in batch:
if reward < reward_bound:
continue
train_obs.extend(map(lambda step: step.observation, steps))
train_act.extend(map(lambda step: step.action, steps))
train_obs_v = torch.FloatTensor(train_obs)
train_act_v = torch.LongTensor(train_act)
return train_obs_v, train_act_v, value_bound, reward_mean # 最后两个值只用于写入 TensorBoard,以检验智能体的性能
这个函数是交叉熵的核心——根据给定的一批片段和百分位值,计算奖励边界,以用于过滤要用于训练的精英片段。
为了获得奖励边界,我们将使用 Numpy 的 percentile
函数,该函数根据给定的值列表和百分位计算百分位的值。
然后再计算平均奖励用于监控
然后过滤片段。针对批中的每个片段,检查其总奖励值是否高于边界,如果高于,则将其观察和动作添加到要训练的列表中。
1.4 完整代码
#!/usr/bin/env python3
import gym
from collections import namedtuple
import numpy as np
from tensorboardX import SummaryWriter
import torch
import torch.nn as nn
import torch.optim as optim
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70
class Net(nn.Module):
def __init__(self, obs_size, hidden_size, n_actions):
super(Net, self).__init__()
self.net = nn.Sequential(
nn.Linear(obs_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, n_actions)
)
def forward(self, x):
return self.net(x)
Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])
def iterate_batches(env, net, batch_size):
batch = []
episode_reward = 0.0
episode_steps = []
obs = env.reset()
sm = nn.Softmax(dim=1)
while True:
obs_v = torch.FloatTensor([obs])
act_probs_v = sm(net(obs_v))
act_probs = act_probs_v.data.numpy()[0]
action = np.random.choice(len(act_probs), p=act_probs)
next_obs, reward, is_done, _ = env.step(action)
episode_reward += reward
step = EpisodeStep(observation=obs, action=action)
episode_steps.append(step)
if is_done:
e = Episode(reward=episode_reward, steps=episode_steps)
batch.append(e)
episode_reward = 0.0
episode_steps = []
next_obs = env.reset()
if len(batch) == batch_size:
yield batch
batch = []
obs = next_obs
def filter_batch(batch, percentile):
rewards = list(map(lambda s: s.reward, batch))
reward_bound = np.percentile(rewards, percentile)
reward_mean = float(np.mean(rewards))
train_obs = []
train_act = []
for reward, steps in batch:
if reward < reward_bound:
continue
train_obs.extend(map(lambda step: step.observation, steps))
train_act.extend(map(lambda step: step.action, steps))
train_obs_v = torch.FloatTensor(train_obs)
train_act_v = torch.LongTensor(train_act)
return train_obs_v, train_act_v, reward_bound, reward_mean
if __name__ == "__main__":
env = gym.make("CartPole-v0")
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)
writer = SummaryWriter(comment="-cartpole")
for iter_no, batch in enumerate(iterate_batches(
env, net, BATCH_SIZE)):
obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
optimizer.zero_grad()
action_scores_v = net(obs_v)
loss_v = objective(action_scores_v, acts_v)
loss_v.backward()
optimizer.step()
print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (
iter_no, loss_v.item(), reward_m, reward_b))
writer.add_scalar("loss", loss_v.item(), iter_no)
writer.add_scalar("reward_bound", reward_b, iter_no)
writer.add_scalar("reward_mean", reward_m, iter_no)
if reward_m > 199:
print("Solved!")
break
writer.close()