提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前言
我真的是醉了,刚开始无意间看到超级马里奥的强化学习,我就想学,然后。。。花了我零零散散两周的时间来从最初的Q_Learning,sarsa到高级的DQN,PG,DDPG再到最后的AC,A3C,PPO,我真的觉得没点看头,不知道学完之后能不能搞SuperMario,很不错,tf2.x的马里奥被我复现了。
一、安装库
之前学习的一直是Gym的环境,直接找到马里奥的游戏就是gym_super_mario_bros,
在这里遇到了个 很大的问题,没有C++ 工具库,在安装了几个G的VS软件以及插件后得以解决,我把它归为疑难杂症,在以往遇到这种bug时,直接去找该库的whl然后直接pip install whl所在路径就可以,但是这个库即使在gym官网拿到了它也需要C++支持,所以逼不得已安装了Visual Studio。
二、使用步骤
在这边主要解决的问题是看懂gym以及基本理解代码块思想,因为学习的时候是普通的迷宫,锤子游戏,并没有这么复杂的游戏环境;而起初只是为了复现而复现,最后模型跑出来都不知道怎么预测了,毕竟即使存储了观测值,动作价值等参数,如果不配游戏环境,这些ckpt模型存储的参数剖析出来又有啥用呢?后面在逐行写上自己理解的注释后才想到一个方法,我断点续训,训练后面的轮次时将环境游戏界面渲染打开不就可以实时看到之前的效果了吗?所以我就将注释打好了。
这里主要的是定义多种游戏环境,也就是多线程操作游戏环境
import multiprocessing as mp
import time
import cv2
from multiprocessing import freeze_support
# from IPython.display import clear_output
from gym import Wrapper
from gym.spaces import Box
import gym_super_mario_bros
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
from nes_py.wrappers import JoypadSpace
import numpy as np
import tensorflow as tf
from tensorflow.keras import Input, Model, Sequential
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Layer
# tf.compat.v1.disable_eager_execution() # 关闭动态图机制
class MultipleEnvironments(): # 游戏环境线程搭建
# num_envs线程数量,create_env创建游戏环境的方法
def __init__(self, num_envs, create_env, *args):
# 只有线程大于0才继续下去
assert num_envs > 0
# multiprocessing创建多个管道
# 返回两个连接对象,代表管道的两端,一般用于进程或者线程之间的通信
self.agent_conns, self.env_conns = zip(*[mp.Pipe() for _ in range(num_envs)])
print(self.env_conns)
# exit(0)
# 根据管道开始创建线程并启动
for conn in self.env_conns:
process = mp.Process(target=self.run, args=(conn, create_env, *args))
process.start()
# 重写run方法,线程启动函数
@staticmethod
def run(conn, create_env, *args):
# 获得初始化环境
env = create_env(*args)
while True:
# 创建默认线程缓冲区,存储指令和动作,创建环境指令
request, action = conn.recv()
# 根据指令,发送给线程操作
if request == 'step':
conn.send(env.step(action))
elif request == 'reset':
conn.send(env.reset())
elif request == 'render':
env.render()
elif request == 'close':
env.close()
break
# 判断环境是否有收到额外的参数
elif hasattr(env, request):
# 将额外参数传递给线程
conn.send(getattr(env, request))
else:
raise NotImplementedError
def __getattr__(self, name):
# 线程名如果在self中有存在,就返回线程名的信息
if name in self.__dict__:
return self.__dict__[name]
# 如果管道入口已关闭,报错
assert not env.agent_conns[0].closed, 'Environment closed.'
# 管道将线程名信息存入缓冲区
self.agent_conns[0].send([name, None])
return self.agent_conns[0].recv()
def step(self, actions):
# 如果管道入口已关闭,报错
assert not env.agent_conns[0].closed, 'Environment closed.'
# 将管道入口连接和动作遍历存放在step指令中
for conn, action in zip(self.agent_conns, actions):
conn.send(['step', action.item()])
# 所有管道的信息存储到各自的缓冲区内
return tuple(zip(*[conn.recv() for conn in self.agent_conns]))
def reset(self):
# 如果管道入口已关闭,报错
assert not env.agent_conns[0].closed, 'Environment closed.'
# 重置指令,重置指令不需要参数env.reset(),缓冲区存储None
for conn in self.agent_conns:
conn.send(['reset', None])
return tuple(conn.recv() for conn in self.agent_conns)
def render(self):
# 界面渲染,界面渲染指令不需要参数env.render(),缓冲区存储None
assert not env.agent_conns[0].closed, 'Environment closed.'
for conn in self.agent_conns:
conn.send(['render', None])
def close(self):
# 同上,关闭环境,关闭管道,关闭线程
assert not env.agent_conns[0].closed, 'Environment closed.'
for conn in self.agent_conns:
conn.send(['close', None])
conn.close()
for conn in self.env_conns:
conn.close()
在这块就是记忆化了,这里并不像Q_Learning那样全部存储,不然这么多层神经网络反向传递的数据,单单几个一维数组存不下的,所以这里只是起对每一回合的存储作用,这样的好处也不言而喻了,有绝对的空间优势,有绝对的查询效率。
class Memory():
# 默认初始化
def __init__(self):
self.reset()
# 存储当前环境下状态,动作,惩罚系数,价值,下一个观测值,是否结束游戏,等存入数组
def store(self, state, action, prob, reward, next_state, done):
self.states.insert(0, state)
self.actions.insert(0, action)
self.probs.insert(0, prob)
self.rewards.insert(0, reward)
self.next_states.insert(0, next_state)
self.dones.insert(0, done)
# 将所有变量初始化
def reset(self):
self.states = []
self.actions = []
self.probs = []
self.rewards = []
self.next_states = []
self.dones = []
神经网络模型32*3步长为2的四个卷积层再加上后面的一些全连接,训练模型时也增加了几个分类的全连接,比如给约束项policy的softmax分类连接层,以及对价值的结果单分类值。
# 建立神经网络特征值,tf2.x
class Feature(Layer):
def __init__(self):
super().__init__()
# 序列号创建卷积模型
self.model = Sequential([
# 32*3步长为2的卷积层*4
Conv2D(32, 3, strides=2, activation='relu', padding='same'),
Conv2D(32, 3, strides=2, activation='relu', padding='same'),
Conv2D(32, 3, strides=2, activation='relu', padding='same'),
Conv2D(32, 3, strides=2, activation='relu', padding='same'),
# 将多维数据降维到1维
Flatten(),
# 建立一层全连接
Dense(512, activation='relu'),
])
def call(self, x):
return self.model(x)
神经网络的训练与断点续训都在这里,clip散度,这里的神经网络并没有直观的展示Actor_Critic的样式,常用的优化方法有两种,一种是Adaptive KL Penalty Coefficient, 另一种是Clipped Surrogate Objective,在了解了部分大牛的说法下,clip的方法更好。
# 神经网络模型训练
class PPOTrainer():
# 初始化传入环境的初始化信息与状态,
# agent可操作的动作数量,
# lambda衰减率,
# γ衰减率
# 学习率
# 利用探索率
# 训练步数
# 交叉熵系数
# 断点续训存储
def __init__(
self,
obs_shape,
act_n,
lmbda=0.97,
gamma=0.99,
lr=2e-4,
eps_clip=0.2,
train_step=10,
entropy_coef=0.05,
checkpoint_path='mario',
):
# 创建记忆库
self.memory = Memory()
self.lmbda = lmbda
self.gamma = gamma
self.lr = lr
self.obs_shape = obs_shape
self.act_n = act_n
self.eps_clip = eps_clip
self.train_step = train_step
self.entropy_coef = entropy_coef
# 创建模型后得到约束项,价值,编译模型
self.policy, self.value, self.train_model = self.build_model()
# 创建检查点,每次传入相同的编译模型以及优化器才可以断点续训
ckpt = tf.train.Checkpoint(
train_model=self.train_model,
optimizer=self.train_model.optimizer,
)
# 存储ckpt模型,默认四个文件,如果隐藏tf构建图形化界面的话就只保存三个
self.ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=3)
def build_model(self):
# 输入初始化环境
s_input = Input(self.obs_shape)
# 惩罚系数
prob_old_input = Input([])
# 动作
action_old_input = Input([], dtype='int32')
# 输入gamma衰减率
gae_input = Input([])
# 目标价值输入
v_target_input = Input([])
# 调用神经网络
feature = Feature()
# 传入环境初始化参数
x = feature(s_input)
# 定义对当前动作多分类的全连接层
policy_dense = Dense(self.act_n, activation='softmax')
# 定义一个只有唯一输出的全连接层
value_dense = Dense(1)
# 创建全连接层动作约束项
prob = policy_dense(x)
# 创建价值全连接层
v = value_dense(x)
# 模型调用输入初始化环境调用神经网络,最后输出惩罚系数
policy = Model(inputs=s_input, outputs=prob)
# 同上,输出价值
value = Model(inputs=s_input, outputs=v)
# 在惩罚系数中,根据动作参数进行切片,获得该历史动作对惩罚系数的影响
prob_cur = tf.gather(prob, action_old_input, batch_dims=1)
# 该动作的惩罚系数比上历史惩罚系数的概率
ratio = prob_cur / (prob_old_input + 1e-3)
# 乘上gamma衰减率
surr1 = ratio * gae_input
# 对约束项系数,定义惩罚评估标准
surr2 = K.clip(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * gae_input
# 第二项为熵值计算,由于是按照动作概率采样,因此计算时不需再乘上概率,并且只需要计算当前动作概率的对数
policy_loss = -K.mean(K.minimum(surr1, surr2)) + K.mean(K.log(prob_cur + 1e-3)) * self.entropy_coef
value_loss = K.mean((v[:, 0] - v_target_input) ** 2)
# 约束项与价值损失
loss = policy_loss + value_loss
# 训练模型传入所有输入变量,输出loss,
train_model = Model(inputs=[s_input, prob_old_input, action_old_input, gae_input, v_target_input], outputs=loss)
train_model.add_loss(loss) # 添加loss计算方法
# 添加Adam优化器开始编译训练,最后返回约束项,价值,编译模型
train_model.compile(tf.keras.optimizers.Adam(self.lr))
return policy, value, train_model
def choose_action(self, states):
# 该状态动作选择
# states.shape: (env_num, height, width, skip_frames)
# 获得约束项的惩罚系数
probs = self.policy.predict(states) # shape: (env_num, act_n)
# 在惩罚系数中去随机选择动作
actions = [np.random.choice(range(self.act_n), p=prob) for prob in probs] # shape: (env_num)
# 返回动作和该动作的历史惩罚系数
return actions, probs[np.arange(len(probs)), actions]
# 存储,根据memory类,存储当前状态,动作,惩罚系数,价值,动作后状态,是否结束
def store(self, states, actions, probs, rewards, next_states, dones):
self.memory.store(states, actions, probs, rewards, next_states, dones)
def update_model(self, batch_size=128):
# 将记忆库数据转换成np数组
states = np.array(self.memory.states) # shape: (-1, env_num, height, width, skip_frames)
actions = np.array(self.memory.actions) # shape: (-1, env_num)
probs = np.array(self.memory.probs) # shape: (-1, env_num)
rewards = np.array(self.memory.rewards) # shape: (-1, env_num)
next_states = np.array(self.memory.next_states) # shape: (-1, env_num, height, width, skip_frames)
dones = np.array(self.memory.dones) # shape: (-1, env_num)
# 当前状态系数
env_num = states.shape[1]
# 调整参数形状为一维,便于存储
states = states.reshape([-1, *states.shape[2:]])
next_states = next_states.reshape([-1, *next_states.shape[2:]])
actions = actions.flatten()
probs = probs.flatten()
# 开始学习
for step in range(self.train_step):
# 得到该状态和下一个状态的观测值
v = self.value.predict(states, batch_size=batch_size)
v_next = self.value.predict(next_states, batch_size=batch_size)
v = v.reshape([v.shape[0] // env_num, env_num])
v_next = v_next.reshape([v_next.shape[0] // env_num, env_num])
# Q现实=价值+gamma衰减率*下一个观测值*是否完成
v_target = rewards + self.gamma * v_next * ~dones
# TD误差=Q现实-Q观测
td_errors = v_target - v
gae_lst = []
adv = 0
for delta in td_errors:
adv = self.gamma * self.lmbda * adv + delta
gae_lst.append(adv)
# 获得所有衰减情况
gaes = np.array(gae_lst)
gaes = gaes.flatten()
# 将Q现实降维
v_target = v_target.flatten()
# 使用神经网络开始学习
self.train_model.fit([states, probs, actions, gaes, v_target], batch_size=batch_size)
# 重置记忆库
self.memory.reset()
def save(self):
# 保存模型
self.ckpt_manager.save()
def load(self):
# 如果有就断点续训
if self.ckpt_manager.latest_checkpoint:
print("断点续训", '-' * 20)
# exit(0)
status = agent.ckpt_manager.checkpoint.restore(self.ckpt_manager.latest_checkpoint)
status.run_restore_ops() # 关闭动态图后需要添加这句执行restore操作 断点续训
对游戏环境的自定义和界面灰度化,有利于卷积的更好训练
# 处理游戏界面色彩
def process_frame(frame, height, width):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (width, height))[:, :, None] / 255.
return frame
# 自定义环境
class CustomEnvironment(Wrapper):
def __init__(self, env, height, width):
super().__init__(env)
# 观测空间设置为传入初始环境的信息,高,宽
self.observation_space = Box(low=0, high=1, shape=(height, width, 1))
self.height = height
self.width = width
# 行动
def step(self, action):
# 开始行动,传入动作,获取游戏下一个状态,该动作的价值,是否结束,没用信息
state, reward, done, info = self.env.step(action)
# 让彩色图片灰度化。有更利于卷积
state = process_frame(state, self.height, self.width)
if done:
# 结束后如果拿到旗子了就获得50奖励,否则-50
if info["flag_get"]:
reward += 50
else:
reward -= 50
# reward / 10减少价值差距
return state, reward / 10., done, info
# 重置环境时顺便变成灰度图
def reset(self):
return process_frame(self.env.reset(), self.height, self.width)
# 回合制获取总价值
class SkipFrame(Wrapper):
def __init__(self, env, skip=4):
super().__init__(env)
# 观测空间设置为传入初始环境的信息,高,宽
self.observation_space = Box(
low=0,
high=1,
shape=(*self.env.observation_space.shape[:-1], skip)
)
self.skip = skip
def step(self, action):
# 总价值
total_reward = 0
states = []
# 开始行动,传入动作,获取游戏下一个状态,该动作的价值,是否结束,没用信息
state, reward, done, info = self.env.step(action)
for i in range(self.skip):
if not done:
# 并未结束时添加总价值,记录状态
state, reward, done, info = self.env.step(action)
total_reward += reward
states.append(state)
else:
# 只记录状态
states.append(state)
# 将状态连接返回该回合信息
states = np.concatenate(states, axis=-1)
return states.astype(np.float32), total_reward, done, info
def reset(self):
state = self.env.reset()
states = np.concatenate([state for _ in range(self.skip)], axis=-1)
return states.astype(np.float32)
# 自动重置
class AutoReset(Wrapper):
def __init__(self, env):
super().__init__(env)
def step(self, action):
# 开始行动,传入动作,获取游戏下一个状态,该动作的价值,是否结束,没用信息
state, reward, done, info = self.env.step(action)
# 如果结束了就直接重置环境
if done:
state = self.env.reset()
return state, reward, done, info
def reset(self):
return self.env.reset()
# 创建gym_super_mario_bros的游戏环境SuperMarioBros-{world}-{stage}-v0
# world大关卡,stage小关卡,v0版本
def create_env(world, stage, action_type, height, width):
env = gym_super_mario_bros.make(f'SuperMarioBros-{world}-{stage}-v0')
# 创建手柄空间,执行传入的动作,
# 自定义长宽
# 跳帧操作
# 自动重置操作
env = JoypadSpace(env, action_type)
env = CustomEnvironment(env, height, width)
env = SkipFrame(env)
env = AutoReset(env)
return env
开始传入参数进行训练和预测
if __name__ == '__main__':
# 定义一轮之内的最大步数
max_step = 512
num_envs = 8 # 线程数量
height = 84 #游戏界面高
width = 84#游戏界面宽
world = 1 #大关卡
stage = 2 #小关卡
# 没有骚操作,只有简单的动作,ctrl点进去有其他两种组合动作,一个超简单, 一个复杂
action_type = SIMPLE_MOVEMENT
# 多线程调用
env = MultipleEnvironments(num_envs, create_env, world, stage, action_type, height, width)
session = tf.compat.v1.InteractiveSession() # 关闭动态图后,ckpt_manager.save()需要有默认的session
try:
# 调用神经网络
agent = PPOTrainer(
env.observation_space.shape,
env.action_space.n,
train_step=10,
lr=1e-4,
entropy_coef=0.05,
checkpoint_path=f'mario_{world}_{stage}'
)
agent.load() # 断点续训
# 重置环境
states = env.reset()
# 训练轮次
for epoch in range(1, 501):
# 当前轮次所有尝试的最远距离
max_pos = 0
min_pos = np.inf
#开始操作
for step in range(max_step):
env.render() # 开启图形化界面
# 根据状态选择动作,获得动作组合以及惩罚系数
actions, probs = agent.choose_action(np.stack(states, axis=0))
# 根据动作组合开始行动,这里的组合是指连招,比如右上跳, ['right', 'A', 'B']这组合是一个动作指令
next_states, rewards, dones, infos = env.step(actions)
# 存储记忆库
agent.store(states, actions, probs, rewards, next_states, dones)
# 下一步
states = next_states
# 获取当前回合最远距离,最近距离
max_pos = max(max_pos, max([info['x_pos'] for info in infos]))
min_pos = min(min_pos, min([info['x_pos'] if done else np.inf for info, done in zip(infos, dones)]))
# clear_output() # jupyter notebook 清屏
print(f'epoch: {epoch} | max position: {max_pos} | min position: {min_pos}')
# 学习
agent.update_model(batch_size=256)
# 每10轮保存一次模型
if epoch % 10 == 0:
agent.save()
finally:
env.close()
session.close()
总结
快关门了,匆匆忙忙写完下班。
重新整理了一下,上传到自己的gitee,以后回顾的时候再来复习复现。