PyTorch專欄(二十三): 強化學習(DQN)教程
- 2019 年 12 月 10 日
- 筆記
作者 | News
編輯 | 奇予紀
出品 | 磐創AI團隊出品
本教程介紹如何使用PyTorch從OpenAI Gym(https://gym.openai.com/)中的 CartPole-v0 任務上訓練一個Deep Q Learning (DQN) 代理。
1.任務
代理人必須在兩個動作之間做出決定 – 向左或向右移動推車 – 以使連接到它的桿保持直立。您可以在Gym網站(https://gym.openai.com/envs/CartPole-v0)上找到官方排行榜,裡面包含各種演算法以及可視化。

當代理觀察環境的當前狀態並選擇動作時,環境轉換到新狀態,並且還返回指示動作的後果的獎勵。在此任務中,每增加一個時間步長的獎勵為+1,如果桿落得太遠或者推車距離中心超過2.4個單位,則環境終止。這意味著更好的表現場景將持續更長的時間,以及積累更大的回報。
CartPole任務的設計使得代理的輸入是4個實際值,表示環境狀態(位置,速度等)。然而,神經網路可以純粹通過觀察場景來解決任務,因此我們將使用以cart為中心的螢幕修補程式作為輸入。也因為如此,我們的結果與官方排行榜的結果無法直接比較 – 因為我們的任務要困難得多。而且不幸的是,這確實減慢了訓練速度,因為我們必須渲染所有幀。
嚴格地說,我們將狀態顯示為當前螢幕修補程式與前一個修補程式之間的差異。這將允許代理從一個影像中考慮桿的速度。
2.需要的包
首先,讓我們導入所需的包。首先,我們需要gym(https://gym.openai.com/docs)來得到環境(使用pip install gym
)。我們還將使用PyTorch中的以下內容:
- 神經網路(
torch.nn
) - 優化(
torch.optim
) - 自動分化(
torch.autograd
) 視覺任務的實用程式(torchvision
)- 一個單獨的包
import gym import math import random import numpy as np import matplotlib import matplotlib.pyplot as plt from collections import namedtuple from itertools import count from PIL import Image import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as T env = gym.make('CartPole-v0').unwrapped # set up matplotlib is_ipython = 'inline' in matplotlib.get_backend() if is_ipython: from IPython import display plt.ion() # if gpu is to be used device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
3. 復現記憶(Replay Memory)
我們將使用經驗重播記憶來訓練我們的DQN。它存儲代理觀察到的轉換,允許我們之後重用此數據。通過隨機抽樣,轉換構建相關的一個批次。已經表明經驗重播記憶極大地穩定並改善了DQN訓練程式。
為此,我們需要兩個階段:
Transition
:一個命名元組,表示我們環境中的單個轉換。它實際上將(狀態,動作)對映射到它們的(next_state,reward)結果,狀態是螢幕差異影像,如稍後所述。ReplayMemory
:有界大小的循環緩衝區,用於保存最近觀察到的過渡。它還實現了一個.sample()
方法,用於為訓練選擇隨機batch的轉換。
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): """Saves a transition.""" if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory)
現在,讓我們定義我們的模型。但首先,讓我們快速回顧一下DQN是什麼。
4. DQN 演算法
我們的環境是確定性的,因此為了簡單起見,這裡給出的所有方程式也是確定性的。在強化學習文獻中,它們還包含對環境中隨機轉變的期望。
我們的目標是訓練出一種政策,試圖最大化折現累積獎勵

,其中

也稱為回報。折扣

應該是介於0和1之間的常數,以確保總和收斂。對於我們的代理來說,對比不確定的遠期未來,它更看重它們相當有信心的不久的將來。
Q-learning背後的主要思想是,如果我們有一個函數

,它可以告訴我們的回報是什麼,如果我們要在給定狀態下採取行動,那麼我們可以輕鬆地構建最大化我們獎勵的政策:

但是,我們不了解世界的一切,因此我們無法訪問

。但是,由於神經網路是通用函數逼近器,我們可以簡單地創建一個並訓練從而使得它類似於

。
對於我們的訓練更新規則,我們將使用一個事實,即某些策略的每個

函數都服從 Bellman 方程:

平等的兩邊之間的差異被稱為時間差異誤差,

:

為了最大限度地降低此錯誤,我們將使用Huber損失(https://en.wikipedia.org/wiki/Huber_loss)。當誤差很小時,Huber損失就像均方誤差一樣,但是當誤差很大時,就像平均絕對誤差一樣 – 當

的估計雜訊很多時,這使得它對異常值更加魯棒。
我們通過從重放記憶體中取樣的一批轉換

來計算:

5. Q_網路(Q_network)
我們的模型將是一個卷積神經網路,它接收當前和之前的螢幕修補程式之間的差異。它有兩個輸出,分別代表

和

(其中s是網路的輸入)。實際上,網路正在嘗試預測在給定當前輸入的情況下採取每個動作的預期回報。
class DQN(nn.Module): def __init__(self, h, w, outputs): super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2) self.bn3 = nn.BatchNorm2d(32) # 線性輸入連接的數量取決於conv2d層的輸出,因此取決於輸入影像的大小,因此請對其進行計算。 def conv2d_size_out(size, kernel_size = 5, stride = 2): return (size - (kernel_size - 1) - 1) // stride + 1 convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w))) convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h))) linear_input_size = convw * convh * 32 self.head = nn.Linear(linear_input_size, outputs) # 使用一個元素調用以確定下一個操作,或在優化期間調用batch。返回tensor([[left0exp,right0exp]...]). def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.head(x.view(x.size(0), -1))
6. 輸入提取
下面的程式碼是用於從環境中提取和處理渲染影像的實用程式。它使用了torchvision
軟體包,可以輕鬆構成影像變換。運行單元後,它將顯示一個提取的示例修補程式。
resize = T.Compose([T.ToPILImage(), T.Resize(40, interpolation=Image.CUBIC), T.ToTensor()]) def get_cart_location(screen_width): world_width = env.x_threshold * 2 scale = screen_width / world_width return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART def get_screen(): # gym要求的返回螢幕是400x600x3,但有時更大,如800x1200x3。 將其轉換為torch order(CHW)。 screen = env.render(mode='rgb_array').transpose((2, 0, 1)) # cart位於下半部分,因此不包括螢幕的頂部和底部 _, screen_height, screen_width = screen.shape screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)] view_width = int(screen_width * 0.6) cart_location = get_cart_location(screen_width) if cart_location < view_width // 2: slice_range = slice(view_width) elif cart_location > (screen_width - view_width // 2): slice_range = slice(-view_width, None) else: slice_range = slice(cart_location - view_width // 2, cart_location + view_width // 2) # 去掉邊緣,使得我們有一個以cart為中心的方形影像 screen = screen[:, :, slice_range] # 轉換為float類型,重新縮放,轉換為torch張量 # (this doesn't require a copy) screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 screen = torch.from_numpy(screen) # 調整大小並添加batch維度(BCHW) return resize(screen).unsqueeze(0).to(device) env.reset() plt.figure() plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation='none') plt.title('Example extracted screen') plt.show()
7. 訓練
7.1 超參數和實用程式
這個單元實例化我們的模型及其優化器,並定義了一些實用程式:
select_action
:將根據epsilon貪婪政策選擇一項行動。簡而言之,我們有時會使用我們的模型來選擇動作,有時我們只會統一取樣。選擇隨機操作的概率將從EPS_START
開始,並將以指數方式向EPS_END
衰減。EPS_DECAY
控制衰減的速度plot_durations
:幫助繪製episodes的持續時間,以及過去100個episodes的平均值(官方評估中使用的度量)。該圖將位於包含主要訓練循環的單元下方,並將在每個episodes後更新。
BATCH_SIZE = 128 GAMMA = 0.999 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 TARGET_UPDATE = 10 # 獲取螢幕大小,以便我們可以根據AI gym返回的形狀正確初始化圖層。 # 此時的典型尺寸接近3x40x90 # 這是get_screen()中的限幅和縮小渲染緩衝區的結果 init_screen = get_screen() _, _, screen_height, screen_width = init_screen.shape # 從gym行動空間中獲取行動數量 n_actions = env.action_space.n policy_net = DQN(screen_height, screen_width, n_actions).to(device) target_net = DQN(screen_height, screen_width, n_actions).to(device) target_net.load_state_dict(policy_net.state_dict()) target_net.eval() optimizer = optim.RMSprop(policy_net.parameters()) memory = ReplayMemory(10000) steps_done = 0 def select_action(state): global steps_done sample = random.random() eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY) steps_done += 1 if sample > eps_threshold: with torch.no_grad(): # t.max(1)將返回每行的最大列值。 # 最大結果的第二列是找到最大元素的索引,因此我們選擇具有較大預期獎勵的行動。 return policy_net(state).max(1)[1].view(1, 1) else: return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long) episode_durations = [] def plot_durations(): plt.figure(2) plt.clf() durations_t = torch.tensor(episode_durations, dtype=torch.float) plt.title('Training...') plt.xlabel('Episode') plt.ylabel('Duration') plt.plot(durations_t.numpy()) # 取100個episode的平均值並繪製它們 if len(durations_t) >= 100: means = durations_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) plt.plot(means.numpy()) plt.pause(0.001) # 暫停一下,以便更新圖表 if is_ipython: display.clear_output(wait=True) display.display(plt.gcf())
8. 訓練循環
在這裡,您可以找到執行優化的單個步驟的optimize_model
函數。
它首先對一個batch進行取樣,將所有張量連接成一個整體,計算

和

,並將它們組合成我們的損失。通過定義,如果s是終端狀態,則設置

。我們還使用目標網路來計算

以增加穩定性。目標網路的權重在大多數時間保持凍結狀態,但每隔一段時間就會更新策略網路的權重。這通常是一系列步驟,但為了簡單起見,我們將使用episodes。
def optimize_model(): if len(memory) < BATCH_SIZE: return transitions = memory.sample(BATCH_SIZE) # 轉置batch(有關詳細說明,請參閱https://stackoverflow.com/a/19343/3343043)。 # 這會將過渡的batch數組轉換為batch數組的過渡。 batch = Transition(*zip(*transitions)) # 計算非最終狀態的掩碼並連接batch元素(最終狀態將是模擬結束後的狀態) non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.uint8) non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) # 計算Q(s_t,a) - 模型計算Q(s_t),然後我們選擇所採取的動作列。 # 這些是根據policy_net對每個batch狀態採取的操作 state_action_values = policy_net(state_batch).gather(1, action_batch) # 計算所有下一個狀態的V(s_{t+1}) # non_final_next_states的操作的預期值是基於「較舊的」target_net計算的; # 用max(1)[0]選擇最佳獎勵。這是基於掩碼合併的,這樣我們就可以得到預期的狀態值,或者在狀態是最終的情況下為0。 next_state_values = torch.zeros(BATCH_SIZE, device=device) next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() # 計算預期的Q值 expected_state_action_values = (next_state_values * GAMMA) + reward_batch # 計算Huber損失 loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)) # 優化模型 optimizer.zero_grad() loss.backward() for param in policy_net.parameters(): param.grad.data.clamp_(-1, 1) optimizer.step()
下面,您可以找到主要的訓練循環。在開始時,我們重置環境並初始state
張量。然後,我們取樣一個動作並執行它,觀察下一個螢幕和獎勵(總是1),並優化我們的模型一次。當episode結束時(我們的模型失敗),我們重新開始循環。
下面,num_episodes設置為小數值。您應該下載筆記型電腦並運行更多的epsiodes,例如300+以進行有意義的持續時間改進。
num_episodes = 50 for i_episode in range(num_episodes): # 初始化環境和狀態 env.reset() last_screen = get_screen() current_screen = get_screen() state = current_screen - last_screen for t in count(): # 選擇動作並執行 action = select_action(state) _, reward, done, _ = env.step(action.item()) reward = torch.tensor([reward], device=device) # 觀察新的狀態 last_screen = current_screen current_screen = get_screen() if not done: next_state = current_screen - last_screen else: next_state = None # 在記憶中存儲過渡 memory.push(state, action, next_state, reward) # 移動到下一個狀態 state = next_state # 執行優化的一個步驟(在目標網路上) optimize_model() if done: episode_durations.append(t + 1) plot_durations() break # 更新目標網路,複製DQN中的所有權重和偏差 if i_episode % TARGET_UPDATE == 0: target_net.load_state_dict(policy_net.state_dict()) print('Complete') env.render() env.close() plt.ioff() plt.show()
以下是說明整體結果數據流的圖表。

可以隨機選擇或根據策略選擇操作,從gym環境中獲取下一步樣本。我們將結果記錄在重放記憶體中,並在每次迭代時運行優化步驟。優化從重放記憶體中選擇一個隨機batch來進行新策略的訓練。「較舊的」target_net也用於優化以計算預期的Q值; 它會偶爾更新以保持最新狀態。