OpenAI Gym 是一个能够提供智能体统一 API 以及很多 RL 环境的库。有了它,我们就不需要写大把大把的样板代码了
在这篇文章中,我们会学习如何写下第一个有随机行为的智能体,并借此来进一步熟悉 RL 中的各种概念。在这篇文章结束时,你可能会理解以下内容:
- 将智能体插入 RL 框架所需的高层次要求
- 基本、纯 Python 实现的随机 RL 智能体
- OpenAI Gym
1. 剖析智能体
RL 世界中包含许多实体:
- 智能体:主动行动的人或物。实际上,智能体只是实现了某些策略的代码片段而已。这个策略根据观察决定每个时间点执行什么动作。
- 环境:某些世界的模型,它在智能体外部,负责提供观察并给予奖励。而且环境会根据智能体的动作改变自己的状态。
接下来,我们来探究如何实现它们。我们先从环境入手,先定义一个环境,限定交互步数,并且不管智能体执行任何动作,它都能给智能体返回随机奖励(先写一个简单的示例):
import random
from typing import List
class Environment:
def __init__(self):
# 环境初始化的内部状态
self.step_left = 10
def get_observation(self) -> List[float]:
# 该方法用于给智能体返回当前环境的观察
# 这里输出的观察向向量都是0,是因为我们没有给环境任何内部状态
return [0.0, 0.0, 0.0]
def get_actions(self) -> List[int]:
# 该方法允许智能体查询自己能执行的动作集
# 某些条件下,当环境变化的时候,智能体能执行的动作集也会发生改变
return [0, 1]
def is_done(self) -> bool:
# 给予智能体片段结束的信号
return self.steps_left == 0
def action(self, action: int) -> float:
# 环境的核心功能
# 它用于处理智能体的动作、返回该动作的奖励、更新已经执行的步数、拒绝执行已执行的片段
if self.is_done():
raise Exception("Game is over!")
self.steps_left -= 1
return random.random()
接下来我们来看看智能体部分,它更简单,只包含了两个部分:构造函数以及在环境中执行一步的方法:
class Agent:
def __init__(self):
# 初始化计数器,用来保存片段中智能体积累的总奖励
self.total_reward = 0.0
def step(self, env: Environment):
current_oba = env.get_observation()
actions = env.get_actions()
reward = env.action(random.choice(actions))
self.total_reward += reward
在上面这段代码中,step()
函数接受环境实例作为参数,并允许智能体执行以下操作:
- 观察环境
- 基于观察决定动作
- 向环境提交动作
- 获取当前步骤的奖励
对于我们这个例子,智能体比较笨,它在决定执行什么动作的时候会忽略得到的观察。取而代之的是,随机选择动作。下面还有一段胶水代码,它创建两个类并执行一次片段:
if __name__ == "__main__":
env = Environment()
agent = Agent()
while not env.is_done():
agent.step(env)
print("Total reward got: %.4f" % agent.total_reward)
前面这些简单的代码展示了 RL 模型的重要的基本概念。环境可以是极其复杂的物理模型,智能体也可以轻易地变成一个实现了最新 RL 算法的大型神经网络(NN),但是基本思想还是一致的——每一步,智能体都会从环境中得到观察,进行一番计算,最后选择要执行的动作。这个动作的结果就是奖励和新的观察。
你可能会问,如果基本思想是一样的,为什么还要从头开始实现呢?是否有人已经将其实现为一个通用库了呢?答案是肯定的,这样的框架已经存在了,但是在花时间讨论它们之前,先把你的开发环境准备好吧!
requirements如下所示(⚠️:其中gym版本尽量一致,或者不要用高版本;其他库可用最新版本):
atari-py==0.2.6
gym==0.15.3
numpy==1.17.2
opencv-python==4.1.1.26
tensorboard==2.0.1
torch==1.3.0
torchvision==0.4.1
pytorch-ignite==0.2.1
tensorboardX==1.9
tensorflow==2.0.0
ptan==0.6
2. OpenAI Gym API
2.1 动作空间
2.2 观察空间
2.3 环境
2.4 创建环境
2.5 车摆控制
我们来应用学到的知识探索 Gym 提供的最简单的 RL 环境:
>>> import gym
>>> e = gym.make("CartPole-v0")
这里我们导入了 gym 库,创建了一个叫做 CartPole(车摆系统)的环境。该环境来自经典的控制问题,其目的是控制底部附有木棒的平台。
这里的难点是,木棒会向左或向右倒,你需要在每一步,通过让平台往左或往右移动来保持平衡。
这个环境的观察是4个浮点数,包含了木棒质点的 $x$ 坐标、速度、与平台的角度以及角速度的信息。当然,通过应用一些数学和物理知识,将这些数字转换为动作来平衡木棒并不复杂,但问题是如何在不知道这些数字的确切含义、只知道奖励的情况下,学会平衡该系统?这个环境每执行一步,奖励都是1。片段会一直持续,直到木棒掉落为止,因此为了获得更多的累积奖励,我们需要以某种避免木棒掉落的方式平衡平台。
我们继续来编写代码:
>>> obs = e.reset()
>>> obs
array([ 0.04373323, -0.0498781 , -0.03481512, 0.01108434])
这里,先重置一下环境并获得第一个观察(在新创建环境时,总会重置一下它)。正如上文所说,观察结果是4个数字,我们来看一下如何提前知道这个信息。
>>> e.action_space
Discrete(2)
>>> e.observation_space
Box(4,)
action_space
字段是 Discrete
类型,所以动作只会是 0 或 1 ,其中 0 代表将平台推向左边,1代表推向右边。观察空间是 Box(4,)
,这代表大小为 4 的向量,其值在 [-inf, inf] 区间内。
>>> e.step(0)
(array([ 0.04273567, -0.24448391, -0.03459343, 0.29258258]), 1.0, False, {})
现在,通过执行动作 0 可以将平台推向左边,然后会获得包含 4 个元素的元组:
- 一个新的观察,即包含 4 个数字的新向量。
- 值为 1.0 的奖励。
- done 的标记为 False,表示片段还没有结束,目前的状态多少还是可以的。
- 环境的额外信息,在这里是一个空的字典。
接下来,对 action_space
和 observation_space
调用 Space
类的 sample()
方法。
>>> e.action_space.sample()
0
>>> e.action_space.sample()
1
>>> e.observation_space.sample()
array([ 2.4720373e+00, -5.7555515e+37, -2.8382450e-01, -9.5865417e+37],
dtype=float32)
>>> e.observation_space.sample()
array([-6.08082891e-01, 2.65997636e+38, 1.09545745e-01, -1.21019449e+38],
dtype=float32)
这个方法从底层空间返回一个随机样本,在 Discrete 动作空间的情况下,这意味着为 0 或 1 的随机数,而对于观察空间来说,这意味着包含 4 个数字的随机向量。对观察空间的随机采样看起来没什么用,确实是这样的,但当不知道如何执行动作的时候,从动作空间进行采样是有用的。在还不知道任何 RL 方法,却仍然想试一下 Gym 环境的时候,这个方法尤其方便。现在你知道如何为 CartPole 环境实现第一个行为随机的智能体了,我们来试一试。
3. 随机 CartPole 智能体
尽管这个环境比开头的那个例子里的环境复杂得多,但是智能体的代码却更短了。这就是重用性、抽象性以及第三方库的强大力量。
代码如下:
import gym
if __name__ == "__main__":
env = gym.make("CartPole-v0")
total_reward = 0.0
total_steps = 0
obs = env.reset()
while True:
action = env.action_space.sample()
obs, reward, done, _ = env.step(action)
total_reward += reward
total_steps += 1
if done:
break
print("Episode done in %d steps, total reward %.2f" %(total_steps, total_reward))
在该循环中,我们从动作空间中随机采样一个动作,然后让环境执行并返回下一个观察(obs)、reward 和 done 标记。如果片段结束,停止循环并展示执行了多少步以及累积获得了多少奖励。
随机智能体在木棒落地、片段结束之前,平均会执行 12~15 步。大部分 Gym 环境有一个“奖励边界”,它是智能体在 100 个连续片段中,为解决环境而应该得到的平均奖励。对于 CartPole 来说,这个边界是 195 ,这意味着,平均而言,智能体必须将木棒保持 195 个时间步长或更多。从这个角度来看,随机智能体貌似表现得很差。但是,不要失望,我们才刚刚起步,很快你就能解决 CartPole 以及其他很多有趣且富有挑战的环境了。
4. Gym 的额外功能:包装器和监控器
4.1 包装器
很多时候,你希望以某种通用的方式扩展环境的功能。例如,想象一个环境,它给了你一些观察,但是你想将它们累积缓存起来,用以提供智能体最近的 N 个观察。这在动态计算机游戏中是一个很常见的场景,比如单一一帧不足以了解游戏状态的完整信息。例如,你希望能够裁剪或预处理一些图像素以便智能体来消化这些信息,又或者你想以某种方式归一化奖励值。有相同结构的场景太多了,你可能想要将现有的环境“包装”起来并附加一些额外的逻辑。Gym 为这些场景提供了一个方便使用的框架——Wrapper
类。
Wrapper
类继承自 Env
类。它的构造函数只有一个参数,即要被包装的 Env 类的实例。为了附加额外的功能,需要重新定义想扩展的方法,例如 step() 或 reset()。唯一的要求就是需要调用超类中的原始方法。
为了处理更多特定的要求,例如 Wrapper 类只想要处理环境返回的观察或只处理动作,那么用 Wrapper 的子类过滤特定的信息即可。它们分别是:
- ObservationWrapper:需要重新定义父类的 observation(obs) 方法。obs 参数是被包装的环境给出的观察,这个方法需要返回给予智能体的观察。
- RewardWrapper:它暴露了一个 reward(rew) 方法,可以修改给予智能体的奖励值。
- ActionWrapper:需要覆盖 action(act) 方法,它能修改智能体传给包装环境的动作。
为了让它更实用,假设有一个场景,我们想要以 10% 的概率干涉智能体发出的动作流,将当前动作替换成随机动作。这看起来不是一个明智的决定,但是这个小技巧可以解决利用与探索问题,它是最实用、最强大的方法之一。通过发布随机动作,让智能体探索环境,时不时地偏离它原先的策略的轨迹。使用 ActionWrapper 类很容易就可以实现:
import gym
from typing import TypeVar
import random
Action = TypeVar('Action')
class RandomActionWrapper(gym.ActionWrapper):
def __init__(self, env, epsilon=0.1):
super(RandomActionWrapper, self).__init__(env)
self.epsilon = epsilon
先通过调用父类的 __init__
方法初始化包装器,并保存 epsilon(随机动作的概率)。
def action(self, action: Action) -> Action:
if random.random() < self.epsilon:
print("Random!")
return self.env.action_space.sample()
return action
我们需要覆盖这个方法,并通过它来修改智能体的动作。每一次都先掷骰子,都会有 epsilon 的概率从动作空间采样一个随机动作,用来替换智能体传给我们的动作。注意,这里用了 action_space
和包装抽象,这样就能写抽象的代码了,这适用于 Gym 的任意一个环境。另外,每次替换动作的时候必须将消息打印出来,以验证包装器是否生效。当然,在生产代码中,这不是必需的。
if __name__ == "__main__":
env = RandomActionWrapper(gym.make('CartPole-v0'))
是时候应用一下包装器了。创建一个普通的 CartPole 环境,并将其传入 Wrapper 构造函数。然后,将 Wrapper 类当成一耳光普通的 Env 实例,用它来取代原始的 CartPole。因为 Wrapper 类继承自 Env 类,并且暴露了相同的接口,我们可以任意地嵌套包装器。
obs = env.reset()
total_reward = 0.0
while True:
obs, reward, done, _ = env.step(0)
total_reward += reward
if done:
break
print("Reward got: %.2f" % total_reward)
除了智能体比较笨,每次都选择同样的 0 号动作外,代码几乎相同。通过运行代码,应该能看到包装器确实生效了。
如果愿意,可以在包装器创建时指定 epsilon 参数,验证这样的随机性平均下来,会提升智能体得到的分数。
4.2 监控器
它的实现方式与 Wrapper 类似,可以将智能体的性能信息写入文件,也可以选择将智能体的动作录下来。
下面来看一下如何将 Monitor 加入随机 CartPole 智能体中,唯一的区别就是下面这段代码:
if __name__ == "__main__":
env = gym.make("CartPole-v0")
env = gym.wrappers.Monitor(env, "recording")
传给 Monitor 类的第二个参数是监控结果存放的目录名。目录不应该存在,否则程序会抛出异常。