PyTorch深度学习实战——使用深度Q学习进行Pong游戏

0. 前言

我们已经学习了如何利用深度 Q 学习来进行 Gym 中的 CartPole 游戏。在本节中,我们将研究更复杂的 Pong 游戏,并了解如何结合深度 Q 学习与固定目标网络进行此游戏,同时利用基于卷积神经网络 (Convolutional Neural Networks, CNN) 的模型替代普通神经网络。

1. 结合固定目标网络的深度 Q 学习模型

1.1 模型输入

在本节中,我们的目标是构建一个可以与计算机进行乒乓球对战并击败它的智能体,该智能体预计能够获得 21 分。我们采用以下策略训练智能体用于进行 Pong 游戏:
裁剪图像的无关部分,获取游戏当前帧(状态):

模型输入

在上示图像中,我们获取了原始图像,并裁剪原始图像的顶部和底部像素。

1.2 模型策略

为了构建具有固定目标网络的深度 Q 学习模型,使用以下策略:

  • 堆叠四个连续的帧——智能体需要状态序列了解球是否正在向它靠近
  • 智能体在初始阶段采取随机动作进行游戏,并将当前状态、未来状态、采取的动作和奖励存储在内存中,只保留最近的 10,000 个动作的信息,并清除超过 10,000 的历史记录
  • 构建预测网络,从内存中获取状态样本并预测可能动作的值
  • 定义作为预测网络的副本——目标网络
  • 预测网络每更新 1000 次就更新一次目标网络
  • 1000epoch 结束时目标网络的权重与预测网络的权重相同
  • 利用目标网络计算下一个状态下最佳动作的 Q 值
  • 对于预测网络提议的动作,我们期望它预测即时奖励和下一个状态中最佳动作的 Q 值的总和
  • 最小化预测网络的 MSE 损失
  • 令智能体持续游戏,直到奖励最大化

2. 实现深度 Q 学习进行 Pong 游戏

根据上一小节的策略,使用 PyTorch 实现智能体,用于最大化 Pong 游戏奖励。

(1) 导入相关库,搭建游戏环境:

import gym
import numpy as np
import cv2
from collections import deque
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import namedtuple, deque
import torch
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = gym.make('PongDeterministic-v0')

(2) 获取状态空间和动作空间大小:

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

(3) 定义预处理函数,以便删除无关的底部和顶部像素:

def preprocess_frame(frame): 
    bkg_color = np.array([144, 72, 17])
    img = np.mean(frame[34:-16:2,::2]-bkg_color, axis=-1)/255.
    resized_image = img
    return resized_image

(4) 定义用于堆叠四个连续游戏帧的函数。

函数将 stack_frames、当前状态 state 和标志 is_new_episode 作为输入:

def stack_frames(stacked_frames, state, is_new_episode):
    # Preprocess frame
    frame = preprocess_frame(state)
    stack_size = 4

如果为新一回合,则从初始帧开始:

    if is_new_episode:
        # Clear stacked_frames
        stacked_frames = deque([np.zeros((80,80), dtype=np.uint8) for i in range(stack_size)], maxlen=4)
        # Because we're in a new episode, copy the same frame 4x
        for i in range(stack_size):
            stacked_frames.append(frame) 
        # Stack the frames
        stacked_state = np.stack(stacked_frames, axis=2).transpose(2, 0, 1)

如果并非新一回合,则从 stacked_frames 中删除最旧的帧并添加最新的帧:

    else:
        # Append frame to deque, automatically removes the oldest frame
        stacked_frames.append(frame)
        # Build the stacked state (first dimension specifies different frames)
        stacked_state = np.stack(stacked_frames, axis=2).transpose(2, 0, 1) 
    return stacked_state, stacked_frames

(5) 定义网络架构 DQNetwork

class DQNetwork(nn.Module):
    def __init__(self, states, action_size):
        super(DQNetwork, self).__init__()
        
        self.conv1 = nn.Conv2d(4, 32, (8, 8), stride=4)
        self.conv2 = nn.Conv2d(32, 64, (4, 4), stride=2)
        self.conv3 = nn.Conv2d(64, 64, (3, 3), stride=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(2304, 512)
        self.fc2 = nn.Linear(512, action_size)
        
    def forward(self, state): 
        x = F.relu(self.conv1(state))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

(6) 定义 Agent 类。

定义 __init__ 方法:

class Agent():
    def __init__(self, state_size, action_size):
        
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(0)

        ## hyperparameters
        self.buffer_size = 10000
        self.batch_size = 32
        self.gamma = 0.99
        self.lr = 0.0001
        self.update_every = 4
        self.update_every_target = 1000 
        self.learn_every_target_counter = 0
        # Q-Network
        self.local = DQNetwork(state_size, action_size).to(device)
        self.target = DQNetwork(state_size, action_size).to(device)
        self.optimizer = optim.Adam(self.local.parameters(), lr=self.lr)

        # Replay memory
        self.memory = deque(maxlen=self.buffer_size) 
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        # Initialize time step (for updating every few steps)
        self.t_step = 0

__init__ 方法添加目标网络和目标网络的更新频率。

定义权重更新方法 step

    def step(self, state, action, reward, next_state, done):
        # Save experience in replay memory
        self.memory.append(self.experience(state[None], action, reward, next_state[None], done))
        
        # Learn every update_every time steps.
        self.t_step = (self.t_step + 1) % self.update_every
        if self.t_step == 0:
   # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > self.batch_size:
                experiences = self.sample_experiences()
                self.learn(experiences, self.gamma)

定义 act 方法,根据给定状态获取要执行的动作:

    def act(self, state, eps=0.):
        # Epsilon-greedy action selection
        if random.random() > eps:
            state = torch.from_numpy(state).float().unsqueeze(0).to(device)
            self.local.eval()
            with torch.no_grad():
                action_values = self.local(state)
            self.local.train()
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

定义 learn 函数,训练预测模型:

    def learn(self, experiences, gamma):
        self.learn_every_target_counter+=1
        states, actions, rewards, next_states, dones = experiences
       # Get expected Q values from local model
        Q_expected = self.local(states).gather(1, actions)

        # Get max predicted Q values (for next states) from target model
        Q_targets_next = self.target(next_states).detach().max(1)[0].unsqueeze(1)
        # Compute Q targets for current state
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
        
        # Compute loss
        loss = F.mse_loss(Q_expected, Q_targets)

        # Minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ------------------- update target network ------------------- #
        if self.learn_every_target_counter%1000 ==0:
            self.target_update() 

在以上代码中,Q_targets_next 是使用目标模型预测的,而非预测模型,每经过 1,000 步后更新目标网络,其中 learn_every_target_counter 是用于确定是否应该更新目标模型的计数器。

定义函数 target_update 用于更新目标模型:

    def target_update(self):
        print('target updating')
        self.target.load_state_dict(self.local.state_dict())

定义函数 sample_experiences 用于从内存中采样经验:

    def sample_experiences(self):
        experiences = random.sample(self.memory, k=self.batch_size)        
        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)        
        return (states, actions, rewards, next_states, dones)

(7) 定义智能体对象:

agent = Agent(state_size, action_size)

(8) 定义用于训练智能体的参数:

n_episodes=5000
max_t=5000
eps_start=1.0
eps_end=0.02
eps_decay=0.995
scores = [] # list containing scores from each episode
scores_window = deque(maxlen=100) # last 100 scores
eps = eps_start
stack_size = 4
stacked_frames = deque([np.zeros((80,80), dtype=np.int) for i in range(stack_size)], maxlen=stack_size) 

(9) 训练智能体:

for i_episode in range(1, n_episodes+1):
    state = env.reset()
    state, frames = stack_frames(stacked_frames, state, True)
    score = 0
    for i in range(max_t):
        action = agent.act(state, eps)
        next_state, reward, done, _ = env.step(action)
        next_state, frames = stack_frames(frames, next_state, False)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break 
    scores_window.append(score) # save most recent score
    scores.append(score) # save most recent score
    eps = max(eps_end, eps_decay*eps) # decrease epsilon
    print('\rEpisode {}\tReward {} \tAverage Score: {:.2f} \tEpsilon: {}'.format(i_episode,score,np.mean(scores_window), eps), end="")
    if i_episode % 100 == 0:
        print('\rEpisode {}\tAverage Score: {:.2f} \tEpsilon: {}'.format(i_episode, np.mean(scores_window), eps))

随着训练回合的增加分数变化情况如下:

import matplotlib.pyplot as plt
plt.plot(scores)
plt.title('Scores over increasing episodes')
plt.show()

模型性能监测

从上图中可以看出,智能体逐渐学会了进行 Pong 游戏,能够获得较高奖励。

相关链接

PyTorch深度学习实战(1)——神经网络与模型训练过程详解
PyTorch深度学习实战(3)——使用PyTorch构建神经网络
PyTorch深度学习实战(11)——卷积神经网络
PyTorch深度学习实战(45)——强化学习
PyTorch深度学习实战(46)——深度Q学习

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-22 12:04:03       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-22 12:04:03       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-22 12:04:03       45 阅读
  4. Python语言-面向对象

    2024-07-22 12:04:03       55 阅读

热门阅读

  1. String/StringBuffer/StringBuilder 区别(详解)

    2024-07-22 12:04:03       17 阅读
  2. 排序规则utf8_general_ci的作用是什么?

    2024-07-22 12:04:03       12 阅读
  3. OMOST 作画能力的硬核解析[C#]

    2024-07-22 12:04:03       14 阅读
  4. Linux 驱动学习笔记

    2024-07-22 12:04:03       14 阅读