《Scikit-Learn、Keras與TensorFlow機器學習實用指南(第二版)》第18章 強化學習

  • 2020 年 2 月 13 日
  • 筆記

(第二部分:深度學習) 第10章 使用Keras搭建人工神經網路 第11章 訓練深度神經網路 第12章 使用TensorFlow自定義模型並訓練 第13章 使用TensorFlow載入和預處理數據 第14章 使用卷積神經網路實現深度電腦視覺 第15章 使用RNN和CNN處理序列 第16章 使用RNN和注意力機制進行自然語言處理 第17章 使用自編碼器和GAN做表徵學習和生成式學習 第18章 強化學習 [第19章 規模化訓練和部署TensorFlow模型]


強化學習(RL)如今是機器學習的一大令人激動的領域,也是最老的領域之一。自從 1950 年被發明出來後,它被用於一些有趣的應用,尤其是在遊戲(例如 TD-Gammon,一個西洋雙陸棋程式)和機器控制領域,但是從未弄出什麼大新聞。直到 2013 年一個革命性的發展:來自英國的研究者發起了Deepmind 項目,這個項目可以學習去玩任何從頭開始的 Atari 遊戲,在多數遊戲中,比人類玩的還好,它僅使用像素作為輸入而沒有使用遊戲規則的任何先驗知識。這是一系列令人驚嘆的壯舉中的第一個,並在 2016 年 3 月以他們的系統AlphaGo戰勝了世界圍棋冠軍李世石而告終。從未有程式能勉強打敗這個遊戲的大師,更不用說世界冠軍了。今天,RL 的整個領域正在沸騰著新的想法,其都具有廣泛的應用範圍。DeepMind 在 2014 被Google以超過 5 億美元收購。

DeepMind是怎麼做到的呢?事後看來,原理似乎相當簡單:他們將深度學習運用到強化學習領域,結果卻超越了他們最瘋狂的設想。在本章中,我們將首先解釋強化學習是什麼,以及它擅長於什麼,然後我們將介紹兩個在深度強化學習領域最重要的技術:策略梯度和深度 Q 網路(DQN),包括討論馬爾可夫決策過程(MDP)。我們將使用這些技術來訓練一個模型來平衡移動車上的杆子;然後,我會介紹TF-Agents庫,這個庫利用先進的演算法,可以大大簡化創建RL系統,然後我們會用這個系統來玩Breakout,一個著名的 Atari 遊戲。本章最後,會介紹強化學習領域的最新進展。

學習優化獎勵

在強化學習中,智慧體在環境(environment)中觀察(observation)並且做出決策(action),隨後它會得到獎勵(reward)。它的目標是去學習如何行動能最大化期望獎勵。如果你不在意擬人化的話,可以認為正獎勵是愉快,負獎勵是痛苦(這樣的話獎勵一詞就有點誤導了)。總之,智慧體在環境中行動,並且在實驗和錯誤中去學習最大化它的愉快,最小化它的痛苦。

這是一個相當廣泛的設置,可以適用於各種各樣的任務。以下是幾個例子(詳見圖 16-1):

  1. 智慧體可以是控制一個機器人的程式。在此例中,環境就是真實的世界,智慧體通過許多的感測器例如攝像機或者觸覺感測器來觀察,它可以通過給電機發送訊號來行動。它可以被編程設置為如果到達了目的地就得到正獎勵,如果浪費時間,或者走錯方向,或摔倒了就得到負獎勵。
  2. 智慧體可以是控制 Ms.Pac-Man 的程式。在此例中,環境是 Atari 遊戲的模擬器,行為是 9 個操縱桿位(上下左右中間等等),觀察是螢幕,回報就是遊戲點數。
  3. 相似地,智慧體也可以是棋盤遊戲的程式,例如圍棋。
  4. 智慧體也可以不用去控制一個實體(或虛擬的)去移動。例如它可以是一個智慧恆溫器,當它調整到目標溫度以節能時會得到正獎勵,當人們需要自己去調節溫度時它會得到負獎勵,所以智慧體必須學會預見人們的需要。
  5. 智慧體也可以去觀測股票市場價格以實時決定買賣。獎勵的依據為掙錢或者賠錢。

圖18-1 強化學習案例:(a)行走機器人,(b)Ms.Pac-Man遊戲,(c)圍棋玩家,(d)恆溫器,(e)自動交易員

其實沒有正獎勵也是可以的,例如智慧體在迷宮內移動,它每分每秒都得到一個負獎勵,所以它要儘可能快的找到出口!還有很多適合強化學習的領域,例如自動駕駛汽車,推薦系統,在網頁上放廣告,或者控制一個影像分類系統讓它明白它應該關注於什麼。

策略搜索

智慧體用於改變行為的演算法稱為策略。例如,策略可以是一個把觀測當輸入,行為當做輸出的神經網路(見圖16-2)。

圖18-2 用神經網路策略做加強學習

這個策略可以是你能想到的任何演算法,它甚至可以是非確定性的。事實上,在某些任務中,策略根本不必觀察環境!舉個例子,例如,考慮一個真空吸塵器,它的獎勵是在 30 分鐘內撿起的灰塵數量。它的策略可以是每秒以概率p向前移動,或者以概率1-p隨機地向左或向右旋轉。旋轉角度將是-r+r之間的隨機角度,因為該策略涉及一些隨機性,所以稱為隨機策略。機器人將有一個不確定的軌跡,它保證它最終會到達任何可以到達的地方,並撿起所有的灰塵。問題是:30分鐘後它會撿起多少灰塵?

怎麼訓練這樣的機器人?你能調整的策略參數只有兩個:概率p和角度範圍r。一個想法是這些參數嘗試許多不同的值,並選擇執行最佳的組合(見圖 18-3)。這是一個策略搜索的例子,在這種情況下使用暴力方法。然而,當策略空間太大(通常情況下),以這樣的方式找到一組好的參數就像是大海撈針。

圖18-3 策略空間中的四個點以及機器人的對應行為

另一種搜尋策略空間的方法是遺傳演算法。例如你可以隨機創造一個包含 100 個策略的第一代基因,隨後殺死 80 個糟糕的策略,隨後讓 20 個倖存策略繁衍 4 代。一個後代只是它父輩基因的複製品加上一些隨機變異。倖存的策略加上他們的後代共同構成了第二代。你可以繼續以這種方式迭代代,直到找到一個好的策略。

另一種方法是使用優化技術,通過評估獎勵關於策略參數的梯度,然後通過跟隨梯度向更高的獎勵(梯度上升)調整這些參數。這種方法被稱為策略梯度(policy gradient, PG),我們將在本章後面詳細討論。例如,回到真空吸塵器機器人,你可以稍微增加概率P並評估這是否增加了機器人在 30 分鐘內拾起的灰塵的量;如果確實增加了,就相對應增加p,否則減少p。我們將使用 Tensorflow 來實現 PG 演算法,但是在這之前我們需要為智慧體創造一個生存的環境,所以現在是介紹 OpenAI Gym的時候了。

OpenAI Gym 介紹

強化學習的一個挑戰是,為了訓練對象,首先需要有一個工作環境。如果你想設計一個可以學習 Atari 遊戲的程式,你需要一個 Atari 遊戲模擬器。如果你想設計一個步行機器人,那麼環境就是真實的世界,你可以直接在這個環境中訓練你的機器人,但是這有其局限性:如果機器人從懸崖上掉下來,你不能僅僅點擊「撤消」。你也不能加快時間;增加更多的計算能力不會讓機器人移動得更快。一般來說,同時訓練 1000 個機器人是非常昂貴的。簡而言之,訓練在現實世界中是困難和緩慢的,所以你通常需要一個模擬環境,至少需要引導訓練。例如,你可以使用PyBullet或MuJoCo來做3D物理模擬。

OpenAI Gym 是一個工具包,它提供各種各樣的模擬環境(Atari 遊戲,棋盤遊戲,2D 和 3D 物理模擬等等),所以你可以訓練,比較,或開發新的 RL 演算法。

安裝之前,如果你是用虛擬環境創建的獨立的環境,需要先激活:

$ cd $ML_PATH                # 工作目錄 (e.g., $HOME/ml)  $ source my_env/bin/activate # Linux or MacOS  $ .my_envScriptsactivate  # Windows

接下來安裝 OpenAI gym。可通過pip安裝:

$ python3 -m pip install --upgrade gym

取決於系統,你可能還要安裝Mesa OpenGL Utility(GLU)庫(比如,在Ubuntu 18.04上,你需要運行apt install libglu1-mesa)。這個庫用來渲染第一個環境。接著,打開一個Python終端或Jupyter notebook,用make()創建一個環境:

>>> import gym  >>> env = gym.make("CartPole-v1")  >>> obs = env.reset()  >>> obs  array([-0.01258566, -0.00156614,  0.04207708, -0.00180545])

這裡創建了一個 CartPole 環境。這是一個 2D 模擬,其中推車可以被左右加速,以平衡放置在它上面的平衡桿(見圖 18-4)。你可以用gym.envs.registry.all()獲得所有可用的環境。在創建環境之後,需要使用reset()初始化。這會返回第一個觀察結果。觀察取決於環境的類型。對於 CartPole 環境,每個觀測是包含四個浮點數的 1D Numpy 向量:這些浮點數代表推車的水平位置(0.0 為中心)、速度(正是右)、桿的角度(0.0 為垂直)及角速度(正為順時針)。

render()方法展示環境(見圖18-4)。在Windows上,這需要安裝X Server,比如VcXsrv或Xming:

>>> env.render()  True

圖18-4 CartPole環境

提示:如果你在使用無頭伺服器(即,沒有顯示器),比如雲上的虛擬機,渲染就會失敗。解決的唯一方法是使用假X server,比如Xvfb 或 Xdummy。例如,裝好Xvfb之後(Ubuntu或Debian上運行apt install xvfb),用這條命令啟動Python:xvfb-run -s "-screen 0 1400x900x24" python3。或者,安裝Xvfb和pyvirtualdisplay(這個庫包裝了Xvfb),在程式啟動處運行pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

如果你想讓render()讓影像以一個 Numpy 數組格式返回,可以將mode參數設置為rgb_array(注意,這個環境會渲染環境到螢幕上):

>>> img = env.render(mode="rgb_array")  >>> img.shape  # height, width, channels (3=RGB)  (800, 1200, 3)

詢問環境,可以採取的可能行動:

>>> env.action_space  Discrete(2)

Discrete(2)的意思是可能的行動是整數0和1,表示向左(0)或向右(1)加速。其它的環境可能有其它離散的行動,或其它種類的行動(例如,連續性行動)。因為棍子是向右偏的(obs[2] > 0),讓車子向右加速:

>>> action = 1  # accelerate right  >>> obs, reward, done, info = env.step(action)  >>> obs  array([-0.01261699,  0.19292789,  0.04204097, -0.28092127])  >>> reward  1.0  >>> done  False  >>> info  {}

step()方法執行給定的動作並返回四個值:

obs:

這是新的觀測,小車現在正在向右走(obs[1]>0,註:當前速度為正,向右為正)。平衡桿仍然向右傾斜(obs[2]>0),但是他的角速度現在為負(obs[3]<0),所以它在下一步後可能會向左傾斜。

reward

在這個環境中,無論你做什麼,每一步都會得到 1.0 獎勵,所以遊戲的目標就是儘可能長的運行。

done

當遊戲結束時這個值會為True。當平衡桿傾斜太多、或越過螢幕、或超過200步時會發生這種情況。之後,必須重新設置環境才能重新使用。

info

該字典可以在其他環境中提供額外資訊用於調試或訓練。例如,在一些遊戲中,可以指示agent還剩多少條命。

提示:使用完環境後,應當調用它的close()方法釋放資源。

讓我們硬編碼一個簡單的策略,當桿向左傾斜時向左邊加速,當桿向右傾斜時加速向右邊加速。我們使用這個策略來獲得超過 500 步的平均回報:

def basic_policy(obs):      angle = obs[2]      return 0 if angle < 0 else 1    totals = []  for episode in range(500):      episode_rewards = 0      obs = env.reset()      for step in range(200):          action = basic_policy(obs)          obs, reward, done, info = env.step(action)          episode_rewards += reward          if done:              break      totals.append(episode_rewards)

這段程式碼不難。讓我們看看結果:

>>> import numpy as np  >>> np.mean(totals), np.std(totals), np.min(totals), np.max(totals)  (41.718, 8.858356280936096, 24.0, 68.0)

即使有 500 次嘗試,這一策略從未使平衡桿在超過 68 個連續的步驟里保持直立。結果太好。如果你看一下 Juyter Notebook 中的模擬,你會發現,推車越來越強烈地左右擺動,直到平衡桿傾斜過度。讓我們看看神經網路是否能提出更好的策略。

神經網路策略

讓我們創建一個神經網路策略。就像之前我們編碼的策略一樣,這個神經網路將把觀察作為輸入,輸出要執行的動作。更確切地說,它將估計每個動作的概率,然後我們將根據估計的概率隨機地選擇一個動作(見圖 18-5)。在 CartPole 環境中,只有兩種可能的動作(左或右),所以我們只需要一個輸出神經元。它將輸出動作 0(左)的概率p,動作 1(右)的概率顯然將是1 - p。例如,如果它輸出 0.7,那麼我們將以 70% 的概率選擇動作 0,以 30% 的概率選擇動作 1。

圖18-5 神經網路策略

你可能奇怪為什麼我們根據神經網路給出的概率來選擇隨機的動作,而不是選擇最高分數的動作。這種方法使智慧體在探索新的行為利用那些已知可行的行動之間找到正確的平衡。舉個類比:假設你第一次去餐館,所有的菜看起來同樣吸引人,所以你隨機挑選一個。如果菜好吃,你可以增加下一次點它的概率,但是你不應該把這個概率提高到 100%,否則你將永遠不會嘗試其他菜肴,其中一些甚至比你嘗試的更好。

還要注意,在這個特定的環境中,過去的動作和觀察可以被放心地忽略,因為每個觀察都包含環境的完整狀態。如果有一些隱藏狀態,那麼你也需要考慮過去的行為和觀察。例如,如果環境僅僅揭示了推車的位置,而不是它的速度,那麼你不僅要考慮當前的觀測,還要考慮先前的觀測,以便估計當前的速度。另一個例子是當觀測是有雜訊的的,在這種情況下,通常你想用過去的觀察來估計最可能的當前狀態。因此,CartPole 問題是簡單的;觀測是無雜訊的,而且它們包含環境的全狀態。

下面是用tf.keras創建這個神經網路策略的程式碼:

import tensorflow as tf  from tensorflow import keras    n_inputs = 4 # == env.observation_space.shape[0]    model = keras.models.Sequential([      keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),      keras.layers.Dense(1, activation="sigmoid"),  ])

在導入之後,我們使用Sequential模型定義策略網路。輸入的數量是觀測空間的大小(在 CartPole 的情況下是 4 個),我們只有 5 個隱藏單元,並且我們只有 1 個輸出概率(向左的概率),所以輸出層只需一個使用sigmoid的神經元就成。如果超過兩個動作,每個動作就要有一個神經元,然後使用softmax激活函數。

好了,現在我們有一個可以觀察和輸出動作的神經網路了,那我們怎麼訓練它呢?

評價行為:信用分配問題

如果我們知道每一步的最佳動作,我們可以像通常一樣訓練神經網路,通過最小化估計概率和目標概率之間的交叉熵。這只是通常的監督學習。然而,在強化學習中,智慧體獲得的指導的唯一途徑是通過獎勵,獎勵通常是稀疏的和延遲的。例如,如果智慧體在 100 個步驟內設法平衡桿,它怎麼知道它採取的 100 個行動中的哪一個是好的,哪些是壞的?它所知道的是,在最後一次行動之後,杆子墜落了,但最後一次行動肯定不是負全責的。這被稱為信用分配問題:當智慧體得到獎勵時,很難知道哪些行為應該被信任(或責備)。如果一隻狗在表現優秀幾小時後才得到獎勵,它會明白它做對了什麼嗎?

為了解決這個問題,一個通常的策略是基於這個動作後得分的總和來評估這個個動作,通常在每個步驟中應用衰減因子r。例如(見圖 18-6),如果一個智慧體決定連續三次向右,在第一步之後得到 +10 獎勵,第二步後得到 0,最後在第三步之後得到 -50,然後假設我們使用衰減率r=0.8,那麼第一個動作將得到10 +r×0 + r2×(-50)=-22的分數。如果衰減率接近 0,那麼與即時獎勵相比,未來的獎勵不會有多大意義。相反,如果衰減率接近 1,那麼對未來的獎勵幾乎等於即時回報。典型的衰減率通常從 0.9 到 0.99之間。如果衰減率為 0.95,那麼未來 13 步的獎勵大約是即時獎勵的一半(0.9513×0.5),而當衰減率為 0.99,未來 69 步的獎勵是即時獎勵的一半。在 CartPole 環境下,行為具有相當短期的影響,因此選擇 0.95 的折扣率是合理的。

圖18-6 計算行動的回報:未來衰減求和

當然,一個好的動作可能會緊跟著一串壞動作,這些動作會導致平衡桿迅速下降,從而導致一個好的動作得到一個低分數(類似的,一個好行動者有時會在一部爛片中扮演主角)。然而,如果我們花足夠多的時間來訓練遊戲,平均下來好的行為會得到比壞的更好的分數。因此,為了獲得相當可靠的動作分數,我們必須運行很多次並將所有動作分數歸一化(通過減去平均值併除以標準偏差)。之後,我們可以合理地假設消極得分的行為是壞的,而積極得分的行為是好的。現在我們有一個方法來評估每一個動作,我們已經準備好使用策略梯度來訓練我們的第一個智慧體。讓我們看看如何做。

策略梯度

正如前面所討論的,PG 演算法通過遵循更高回報的梯度來優化策略參數。一種流行的 PG 演算法,稱為增強演算法,在 1929 由 Ronald Williams 提出。這是一個常見的變體:

  1. 首先,讓神經網路策略玩幾次遊戲,並在每一步計算梯度,這使得智慧體更可能選擇行為,但不應用這些梯度。
  2. 運行幾次後,計算每個動作的得分(使用前面段落中描述的方法)。
  3. 如果一個動作的分數是正的,這意味著動作是好的,可應用較早計算的梯度,以便將來有更大的的概率選擇這個動作。但是,如果分數是負的,這意味著動作是壞的,要應用相反梯度來使得這個動作在將來採取的可能性更低。我們的方法就是簡單地將每個梯度向量乘以相應的動作得分。
  4. 最後,計算所有得到的梯度向量的平均值,並使用它來執行梯度下降步驟。

讓我們使用 tf.keras 實現這個演算法。我們將訓練我們早先建立的神經網路策略,讓它學會平衡車上的平衡桿。首先,需要一個能執行一步的函數。假定做出的動作都是對的,激素親戚損失和梯度(梯度會保存一會,根據動作的結果再對其修改):

def play_one_step(env, obs, model, loss_fn):      with tf.GradientTape() as tape:          left_proba = model(obs[np.newaxis])          action = (tf.random.uniform([1, 1]) > left_proba)          y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)          loss = tf.reduce_mean(loss_fn(y_target, left_proba))      grads = tape.gradient(loss, model.trainable_variables)      obs, reward, done, info = env.step(int(action[0, 0].numpy()))      return obs, reward, done, grads

逐行看程式碼:

  • GradientTape程式碼塊內,先調用模型,傳入一個觀察(將觀察變形為包含單個實例的批次)。輸出是向左的概率。
  • 然後,選取一個0到1之間的浮點數,檢查是否大於left_proba。概率為left_proba時,actionFalse;概率為1-left_proba時,actionTrue。當將這個布爾值轉變為數字時,動作是0(左)或1(右)及對應的概率。
  • 接著,定義向左的目標概率:1減去動作(浮點值)。如果動作是0(左),則向左的目標概率等於1。如果動作是1(右),則目標概率等於0。
  • 然後使用損失函數計算損失,使用記錄器計算模型可訓練變數的損失梯度。這些梯度會在後面應用前,根據動作的結果做微調。
  • 最後,執行選擇的動作,無論是否結束,返回新的觀察、獎勵,和剛剛計算的梯度。

現在,創建另一個函數基於play_one_step()的多次執行函數,返回所有獎勵和每個周期和步驟的梯度:

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):      all_rewards = []      all_grads = []      for episode in range(n_episodes):          current_rewards = []          current_grads = []          obs = env.reset()          for step in range(n_max_steps):              obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)              current_rewards.append(reward)              current_grads.append(grads)              if done:                  break          all_rewards.append(current_rewards)          all_grads.append(current_grads)      return all_rewards, all_grads

這段程式碼返回了獎勵列表(每個周期一個獎勵列表,每個步驟一個獎勵),還有一個梯度列表(每個周期一個梯度列表,每個步驟一個梯度元組,每個元組每個變臉有一個梯度張量)。

演算法會使用play_multiple_episodes()函數,多次執行遊戲(比如,10次),然後會檢查所有獎勵,做衰減,然後歸一化。要這麼做,需要多個函數:第一個計算每個步驟的未來衰減獎勵的和,第二個歸一化所有這些衰減獎勵(減去平均值,除以標準差):

def discount_rewards(rewards, discount_factor):      discounted = np.array(rewards)      for step in range(len(rewards) - 2, -1, -1):          discounted[step] += discounted[step + 1] * discount_factor      return discounted    def discount_and_normalize_rewards(all_rewards, discount_factor):      all_discounted_rewards = [discount_rewards(rewards, discount_factor)                                for rewards in all_rewards]      flat_rewards = np.concatenate(all_discounted_rewards)      reward_mean = flat_rewards.mean()      reward_std = flat_rewards.std()      return [(discounted_rewards - reward_mean) / reward_std              for discounted_rewards in all_discounted_rewards]

檢測其是否有效:

>>> discount_rewards([10, 0, -50], discount_factor=0.8)  array([-22, -40, -50])  >>> discount_and_normalize_rewards([[10, 0, -50], [10, 20]],  ...                                discount_factor=0.8)  ...  [array([-0.28435071, -0.86597718, -1.18910299]),   array([1.26665318, 1.0727777 ])]

調用discount_rewards(),返回了我們想要的結果(見圖18-6)。可以確認函數discount_and_normalize_rewards()返回了每個周期每個步驟的歸一化的行動的結果。可以看到,第一個周期的表現比第二個周期的表現糟糕,所以歸一化的結果都是負的;第一個周期中的動作都是不好的,而第二個周期中的動作被認為是好的。

可以準備運行演算法了!現在定義超參數。運行150個訓練迭代,每次迭代完成10次周期,每個周期最多200個步驟。衰減因子是0.95:

n_iterations = 150  n_episodes_per_update = 10  n_max_steps = 200  discount_factor = 0.95

還需要一個優化器和損失函數。優化器用普通的Adam就成,學習率用0.01,因為是二元分類器,使用二元交叉熵損失函數:

optimizer = keras.optimizers.Adam(lr=0.01)  loss_fn = keras.losses.binary_crossentropy

接下來創建和運行訓練循環。

for iteration in range(n_iterations):      all_rewards, all_grads = play_multiple_episodes(          env, n_episodes_per_update, n_max_steps, model, loss_fn)      all_final_rewards = discount_and_normalize_rewards(all_rewards,                                                         discount_factor)      all_mean_grads = []      for var_index in range(len(model.trainable_variables)):          mean_grads = tf.reduce_mean(              [final_reward * all_grads[episode_index][step][var_index]               for episode_index, final_rewards in enumerate(all_final_rewards)                   for step, final_reward in enumerate(final_rewards)], axis=0)          all_mean_grads.append(mean_grads)      optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

逐行看下程式碼:

  • 在每次訓練迭代,循環調用play_multiple_episodes(),這個函數玩10次遊戲,返回每個周期和步驟的獎勵和梯度。
  • 然後調用discount_and_normalize_rewards()計算每個動作的歸一化結果(程式碼中是final_reward)。這樣可以測量每個動作的好壞結果。
  • 接著,循環每個可訓練變數,計算每個變數的梯度加權平均,權重是final_reward
  • 最後,將這些平均梯度應用於優化器:微調模型的變數。

就是這樣。這段程式碼可以訓練神經網路策略,模型可以學習保持棍子的平衡(可以嘗試notebook中的「策略梯度」部分)。每個周期的平均獎勵會非常接近200(200是環境默認的最大值)。成功!

提示:研究人員試圖找到一種即使當智慧體最初對環境一無所知時也能很好工作的演算法。然而,除非你正在寫論文,否則你應該儘可能多地將先前的知識注入到智慧體中,因為它會極大地加速訓練。例如,因為知道棍子要盡量垂直,你可以添加與棍子角度成正比的負獎勵。這可以讓獎勵不那麼分散,是訓練加速。此外,如果你已經有一個相當好的策略,你可以訓練神經網路模仿它,然後使用策略梯度來改進它。

儘管它相對簡單,但是該演算法是非常強大的。你可以用它來解決更難的問題,而不僅僅是平衡一輛手推車上的平衡桿。事實上,因為樣本不足,必須多次玩遊戲,才能取得更大進展。但這個演算法是更強大演算法的基礎,比如Actor-Critic演算法(後面會介紹)。

現在我們來看看另一個流行的演算法族。與 PG 演算法直接嘗試優化策略以增加獎勵相反,我們現在看的演算法不那麼直接:智慧體學習去估計每個狀態的未來衰減獎勵的期望總和,或者在每個狀態中的每個行為未來衰減獎勵的期望和。然後,使用這些知識來決定如何行動。為了理解這些演算法,我們必須首先介紹馬爾可夫決策過程(MDP)。

馬爾可夫決策過程

在二十世紀初,數學家 Andrey Markov 研究了沒有記憶的隨機過程,稱為馬爾可夫鏈。這樣的過程具有固定數量的狀態,並且在每個步驟中隨機地從一個狀態演化到另一個狀態。它從狀態S演變為狀態S'的概率是固定的,它只依賴於(S, S')對,而不是依賴於過去的狀態(系統沒有記憶)。

圖 18-7 展示了一個具有四個狀態的馬爾可夫鏈的例子。假設該過程從狀態S0開始,並且在下一步驟中有 70% 的概率保持在該狀態不變中。最終,它必然離開那個狀態,並且永遠不會回來,因為沒有其他狀態回到S0。如果它進入狀態S1,那麼它很可能會進入狀態S2(90% 的概率),然後立即回到狀態S1(以 100% 的概率)。它可以在這兩個狀態之間交替多次,但最終它會落入狀態S3並永遠留在那裡(這是一個終端狀態)。馬爾可夫鏈可以有非常不同的動力學,它們在熱力學、化學、統計學等方面有著廣泛的應用。

圖18-7 馬爾科夫鏈案例

馬爾可夫決策過程最初是在 20 世紀 50 年代由 Richard Bellman 描述的。它們類似於馬爾可夫鏈,但有一個不同:在狀態轉移的每一步中,一個智慧體可以選擇幾種可能的動作中的一個,並且過渡概率取決於所選擇的動作。此外,一些狀態過渡返回一些獎勵(正或負),智慧體的目標是找到一個策略,隨著時間的推移將最大限度地提高獎勵。

例如,圖 18-8 中所示的 MDP 在每個步驟中具有三個狀態(用圓圈表示)和三個可能的離散動作(用菱形表示)。

圖18-8 馬爾科夫決策過程案例

如果從狀態S0開始,可以在動作A0A1A2之間進行選擇。如果它選擇動作A1,它就保持在狀態S0中,並且沒有任何獎勵。因此,如果願意的話,它可以決定永遠呆在那裡。但是,如果它選擇動作A0,它有 70% 的概率獲得 +10 獎勵,並保持在狀態S0。然後,它可以一次又一次地嘗試獲得儘可能多的獎勵。但它將在狀態S1中結束這樣的行為。在狀態S1中,它只有兩種可能的動作:A0A2。它可以通過反覆選擇動作A0來選擇停留,或者它可以選擇動作A2移動到狀態S2並得到 -50 獎勵。在狀態S2中,除了採取行動A1之外,別無選擇,這將最有可能引導它回到狀態S0,在途中獲得 +40 的獎勵。通過觀察這個 MDP,你能猜出哪一個策略會隨著時間的推移而獲得最大的回報嗎?在狀態S0中,A0是最好的選擇,在狀態S2中,智慧體別無選擇,只能採取行動A1,但是在狀態S1中,智慧體否應該保持不動(A0)或通過火(A2),這是不明確的。

Bellman 找到了一種估計任何狀態S的最佳狀態值的方法,記作V(s),它是智慧體在其採取最佳行為達到狀態s後所有衰減未來獎勵的總和的平均期望。他證明,如果智慧體的行為最佳,那麼就適用于貝爾曼最優性公式(見公式 18-1)。這個遞歸公式表示,如果智慧體最優地運行,那麼當前狀態的最優值等於在採取一個最優動作之後平均得到的獎勵,加上該動作可能導致的所有可能的下一個狀態的期望最優值。

公式18-1 貝爾曼最優性公式

其中:

  • T(s, a, s′)為智慧體選擇動作a時從狀態s到狀態s'的概率。例如,圖18-8中,T(s2, a1, s0) = 0.8。
  • R(s, a, s′)為智慧體選擇以動作a從狀態s到狀態s'的過程中得到的獎勵。例如圖18-8中,R(s2, a1, s0) = +40。
  • γ為衰減率。

這個等式直接引出了一種演算法,該演算法可以精確估計每個可能狀態的最優狀態值:首先將所有狀態值估計初始化為零,然後用數值迭代演算法迭代更新它們(見公式 18-2)。一個顯著的結果是,給定足夠的時間,這些估計保證收斂到最優狀態值,對應於最優策略。

公式18-2 數值迭代演算法

在這個公式中,Vk(s)是在k次演算法迭代對狀態s的估計。

筆記:該演算法是動態規劃的一個例子,它將了一個複雜的問題(在這種情況下,估計潛在的未來衰減獎勵的總和)變為可處理的子問題,可以迭代地處理(在這種情況下,找到最大化平均報酬與下一個衰減狀態值的和的動作)

了解最佳狀態值可能是有用的,特別是評估策略,但它沒有明確地告訴智慧體要做什麼。幸運的是,Bellman 發現了一種非常類似的演算法來估計最優狀態-動作值(state-action values),通常稱為 Q 值。狀態行動(S, A)對的最優 Q 值,記為Q*(s, a),是智慧體在到達狀態S,然後選擇動作A之後平均衰減未來獎勵的期望的總和。但是在它看到這個動作的結果之前,假設它在該動作之後的動作是最優的。

下面是它的工作原理:再次,通過初始化所有的 Q 值估計為零,然後使用 Q 值迭代演算法更新它們(參見公式 18-3)。

公式18-3 Q值迭代演算法

一旦你有了最佳的 Q 值,定義最優的策略π*(s),就沒什麼作用了:當智慧體處於狀態S時,它應該選擇具有最高 Q 值的動作,用於該狀態:

讓我們把這個演算法應用到圖 18-8 所示的 MDP 中。首先,我們需要定義 MDP:

transition_probabilities = [ # shape=[s, a, s']          [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],          [[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],          [None, [0.8, 0.1, 0.1], None]]  rewards = [ # shape=[s, a, s']          [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],          [[0, 0, 0], [0, 0, 0], [0, 0, -50]],          [[0, 0, 0], [+40, 0, 0], [0, 0, 0]]]  possible_actions = [[0, 1, 2], [0, 2], [1]]

例如,要想知道經過動作a1,從s2到s0的過渡概率,我們需要查詢transition_probabilities[2][1][0](等於0.8)。相似的,要得到獎勵,需要查詢rewards[2][1][0](等於 +40)。要得到s2的可能的動作,需要查詢possible_actions[2](結果是a1)。然後,必須將Q-值初始化為0(對於不可能的動作,Q-值設為 –∞):

Q_values = np.full((3, 3), -np.inf) # -np.inf for impossible actions  for state, actions in enumerate(possible_actions):      Q_values[state, actions] = 0.0  # for all possible actions

現在運行Q-值迭代演算法。它反覆對Q-值的每個狀態和可能的動作應用公式18-3:

gamma = 0.90 # the discount factor    for iteration in range(50):      Q_prev = Q_values.copy()      for s in range(3):          for a in possible_actions[s]:              Q_values[s, a] = np.sum([                      transition_probabilities[s][a][sp]                      * (rewards[s][a][sp] + gamma * np.max(Q_prev[sp]))                  for sp in range(3)])

Q-值的結果如下:

>>> Q_values  array([[18.91891892, 17.02702702, 13.62162162],         [ 0.        ,        -inf, -4.87971488],         [       -inf, 50.13365013,        -inf]])

例如,當智慧體處於狀態s0,選擇動a1,衰減未來獎勵的期望和大約是17.0。

對於每個狀態,查詢擁有最高Q-值的動作:

>>> np.argmax(Q_values, axis=1) # optimal action for each state  array([0, 0, 1])

這樣就得到了衰減因子等於0.9時,這個MDP的最佳策略是什麼:狀態s0時選擇動作a0;在狀態s1時選擇動作a0;在狀態s2時選擇動作a1。有趣的是,如果將衰減因子提高到0.95,最佳策略發生了改變:在狀態s1時,最佳動作變為a2(通過火!)。道理很明顯,如果未來期望越高,忍受當前的痛苦是值得的。

時間差分學習

具有離散動作的強化學習問題通常可以被建模為馬爾可夫決策過程,但是智慧體最初不知道轉移概率是什麼(它不知道T(s, a, s′)),並且它不知道獎勵會是什麼(它不知道R(s, a, s′))。它必須經歷每一個狀態和每一次轉變並且至少知道一次獎勵,並且如果要對轉移概率進行合理的估計,就必須經歷多次。

時間差分學習(TD 學習)演算法與數值迭代演算法非常類似,但考慮到智慧體僅具有 MDP 的部分知識。一般來說,我們假設智慧體最初只知道可能的狀態和動作,沒有更多了。智慧體使用探索策略,例如,純粹的隨機策略來探索 MDP,並且隨著它的發展,TD 學習演算法基於實際觀察到的轉換和獎勵來更新狀態值的估計(見公式 18-4)。

公式18-4 TD學習演算法

在這個公式中:

  • α是學習率(例如 0.01)。
  • r + γ · Vk(s′)被稱為TD目標。
  • δk(s, r, s′)被稱為TD誤差。

公式的第一種形式的更為準確的表達,是使用

,它的意思是ak+1 ← (1 – α) · ak + α ·bk,公式18-4的第一行可以重寫為:

提示:TD學習和隨機梯度下降有許多相似點,特別是TD學習每次只處理一個樣本。另外,和隨機梯度下降一樣,如果逐漸降低學習率,是能做到收斂的(否則,會在最佳Q-值附近反覆跳躍)。

對於每個狀態S,該演算法只跟蹤智慧體離開該狀態時立即獲得的獎勵的平均值,再加上它期望稍後得到的獎勵(假設它的行為最佳)。

Q-學習

類似地,Q-學習演算法是 Q 值迭代演算法的改編版本,其適應轉移概率和回報在初始未知的情況(見公式18-5)。Q-學習通過觀察智慧體玩遊戲,逐漸提高Q-值的估計。一旦有了準確(或接近)的Q-值估計,則選擇具有最高Q-值的動作(即,貪婪策略)。

公式18-5 Q學習演算法

對於每一個狀態動作對(s,a),該演算法跟蹤智慧體在以動作A離開狀態S時獲得的即時獎勵平均值R,加上它期望稍後得到的獎勵。由於目標策略將最優地運行,所以我們取下一狀態的 Q 值估計的最大值。

以下是如何實現 Q-學習演算法。首先,需要讓一個智慧體探索環境。要這麼做的話,我們需要一個步驟函數,好讓智慧體執行一個動作,並返回結果狀態和獎勵:

def step(state, action):      probas = transition_probabilities[state][action]      next_state = np.random.choice([0, 1, 2], p=probas)      reward = rewards[state][action][next_state]      return next_state, reward

現在,實現智慧體的探索策略。因為狀態空間很小,使用簡單隨機策略就可以。如果長時間運行演算法,智慧體會多次訪問每個狀態,也會多次嘗試每個可能的動作:

def exploration_policy(state):      return np.random.choice(possible_actions[state])

然後,和之前一樣初始化Q-值,使用學習率遞降的方式運行Q-學習演算法(使用第11章介紹過的指數調度演算法):

alpha0 = 0.05 # initial learning rate  decay = 0.005 # learning rate decay  gamma = 0.90 # discount factor  state = 0 # initial state    for iteration in range(10000):      action = exploration_policy(state)      next_state, reward = step(state, action)      next_value = np.max(Q_values[next_state])      alpha = alpha0 / (1 + iteration * decay)      Q_values[state, action] *= 1 - alpha      Q_values[state, action] += alpha * (reward + gamma * next_value)      state = next_state

演算法會覆蓋最優Q-值,但會經歷多次迭代,可能有許多超參數調節。見圖18-9,Q-值迭代演算法(左)覆蓋速度很快,只用了不到20次迭代,而Q-學習演算法(右)用了8000次迭代才覆蓋完。很明顯,不知道過渡概率或獎勵,使得找到最佳策略顯著變難!

圖18-9 Q-值迭代演算法(左)對比Q-學習演算法(右)

Q-學習被稱為離線策略演算法,因為正在訓練的策略不是正在執行的策略:在前面的例子中,被執行的策略(探索策略)是完全隨機的,而訓練的演算法總會選擇具有最高Q-值的動作。相反的,策略梯度下降演算法是在線演算法:使用訓練的策略探索世界。令人驚訝的是,該演算法能夠通過觀察智慧體的隨機行為(例如當你的老師是一個醉猴子時,學習打高爾夫球)學習最佳策略。我們能做得更好嗎?

探索策略

當然,只有在探索策略充分探索 MDP 的情況下,Q 學習才能起作用。儘管一個純粹的隨機策略保證最終訪問每一個狀態和每個轉換多次,但可能需要很長的時間這樣做。因此,一個更好的選擇是使用 ε 貪婪策略:在每個步驟中,它以概率ε隨機地或以概率為1-ε貪婪地選擇具有最高 Q 值的動作。ε 貪婪策略的優點(與完全隨機策略相比)是,它將花費越來越多的時間來探索環境中有趣的部分,因為 Q 值估計越來越好,同時仍花費一些時間訪問 MDP 的未知區域。以ε為很高的值(例如,1)開始,然後逐漸減小它(例如,下降到 0.05)是很常見的。

或者,不依賴於探索的可能性,另一種方法是鼓勵探索策略來嘗試它以前沒有嘗試過的行動。這可以被實現為加到 Q-值估計的獎勵,如公式 18-6 所示。

公式18-6 使用探索函數的Q-學習

在這個公式中:

  • N(s′, a′)計算了在狀態s時選擇動作a的次數
  • f(Q, N)是一個探索函數,例如f(Q, N) = Q + κ/(1 + N),其中κ是一個好奇超參數,它測量智慧體被吸引到未知狀態的程度。

近似 Q 學習和深度Q-學習

Q 學習的主要問題是,它不能很好地擴展到具有許多狀態和動作的大(甚至中等)的 MDP。例如,假如你想用 Q 學習來訓練一個智慧體去玩 Ms. Pac-Man(圖18-1)。Ms. Pac-Man 可以吃超過 150 粒粒子,每一粒都可以存在或不存在(即已經吃過)。因此,可能狀態的數目大於 2150 ≈ 1045。空間大小比地球的的總原子數要多得多,所以你絕對無法追蹤每一個 Q 值的估計值。

解決方案是找到一個函數Qθ(s,a),使用可管理數量的參數(根據矢量θ)來近似 Q 值。這被稱為近似 Q 學習。多年來,人們都是手工在狀態中提取併線性組合特徵(例如,最近的鬼的距離,它們的方向等)來估計 Q 值,但在2013年, DeepMind 表明使用深度神經網路可以工作得更好,特別是對於複雜的問題。它不需要任何特徵工程。用於估計 Q 值的 DNN 被稱為深度 Q 網路(DQN),並且使用近似 Q 學習的 DQN 被稱為深度 Q 學習。

如何訓練DQN呢?這裡用DQN在給定的狀態動作對(s,a),來估計Q-值。感謝Bellman,我們知道這個近似Q-值要接近在狀態s執行動作a的獎勵r,加上之前的衰減獎勵。要估計未來衰減獎勵的和,我們只需在下一個狀態s',對於所有可能的動作a『執行DQN。針對每個可能的動作,獲得了近似的Q-值。然後挑選最高的,並做衰減,就得到了未來衰減獎勵的和。通過將獎勵r和未來衰減獎勵估計相加,得到了狀態動作對(s, a)的目標Q-值 y(s, a),見公式18-7。

公式18-7 目標Q-值

有了這個目標Q-值,可以使用梯度下降運行一步訓練演算法。具體地,要最小化Q-值Q(s, a)和目標Q-值的平方根方差(或使用Huber損失降低演算法對大誤差的敏感度)。這就是基礎的深度Q-學習演算法。下面用其處理平衡車問題。

實現深度Q-學習

首先需要的是一個深度Q-網路。理論上,需要一個輸入是狀態-動作對、輸出是近似Q-值的神經網路,但在實際中,使用輸入是狀態、輸出是每個可能動作的近似Q值的神經網路,會更加高效。要處理CartPole環境,我們不需要非常複雜的神經網路;只要幾個隱藏層就夠了:

env = gym.make("CartPole-v0")  input_shape = [4] # == env.observation_space.shape  n_outputs = 2 # == env.action_space.n    model = keras.models.Sequential([      keras.layers.Dense(32, activation="elu", input_shape=input_shape),      keras.layers.Dense(32, activation="elu"),      keras.layers.Dense(n_outputs)  ])

使用這個DQN選擇一個動作,選擇Q-值最大的動作。要保證智慧體探索環境,使用的是ε-貪婪策略(即,選擇概率為ε的隨機動作):

def epsilon_greedy_policy(state, epsilon=0):      if np.random.rand() < epsilon:          return np.random.randint(2)      else:          Q_values = model.predict(state[np.newaxis])          return np.argmax(Q_values[0])

不僅只根據最新的經驗訓練DQN,將所有經驗存儲在接力快取(或接力記憶)中,每次訓練迭代,從中隨機取樣一個批次。這樣可以降低訓練批次中的經驗相關性,可以極大的提高訓練效果。如下,使用雙端列表實現:

from collections import deque    replay_buffer = deque(maxlen=2000)

提示:雙端列表是一個鏈表,每個元素指向後一個和前一個元素。插入和刪除元素都非常快,但雙端列表越長,隨機訪問越慢。如果需要一個非常大的接力快取,可以使用環狀快取;見notebook中的「Deque vs Rotating List」章節。

每個經驗包含五個元素:狀態,智慧體選擇的動作,獎勵,下一個狀態,一個知識是否結束的布爾值(done)。需要一個小函數從接力快取隨機取樣。返回的是五個NumPy數組,對應五個經驗:

def sample_experiences(batch_size):      indices = np.random.randint(len(replay_buffer), size=batch_size)      batch = [replay_buffer[index] for index in indices]      states, actions, rewards, next_states, dones = [          np.array([experience[field_index] for experience in batch])          for field_index in range(5)]      return states, actions, rewards, next_states, dones

再創建一個使用ε-貪婪策略的單次玩遊戲函數,然後將結果經驗存儲在接力快取中:

def play_one_step(env, state, epsilon):      action = epsilon_greedy_policy(state, epsilon)      next_state, reward, done, info = env.step(action)      replay_buffer.append((state, action, reward, next_state, done))      return next_state, reward, done, info

最後,再創建最後一個批次取樣函數,用單次梯度下降訓練這個DQN:

batch_size = 32  discount_factor = 0.95  optimizer = keras.optimizers.Adam(lr=1e-3)  loss_fn = keras.losses.mean_squared_error    def training_step(batch_size):      experiences = sample_experiences(batch_size)      states, actions, rewards, next_states, dones = experiences      next_Q_values = model.predict(next_states)      max_next_Q_values = np.max(next_Q_values, axis=1)      target_Q_values = (rewards +                         (1 - dones) * discount_factor * max_next_Q_values)      mask = tf.one_hot(actions, n_outputs)      with tf.GradientTape() as tape:          all_Q_values = model(states)          Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)          loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))      grads = tape.gradient(loss, model.trainable_variables)      optimizer.apply_gradients(zip(grads, model.trainable_variables))

逐行看下程式碼:

  • 首先定義一些超參數,並創建優化器和損失函數。
  • 然後創建training_step()函數。先取樣經驗批次,然後使用DQN預測每個可能動作的每個經驗的下一狀態的Q-值。因為假定智慧體採取最佳行動,所以只保留下一狀態的最大Q-值。接著,我們使用公式18-7計算每個經驗的狀態-動作對的目標Q-值。
  • 接著,使用DQN計算每個有經驗的狀態-動作對的Q-值。但是,DQN還會輸出其它可能動作的Q-值,不僅是智慧體選擇的動作。所以,必須遮掩不需要的Q-值。tf.one_hot()函數可以方便地將動作下標的數組轉別為mask。例如,如果前三個經驗分別包含動作1,1,0,則mask會以[[0, 1], [0, 1], [1, 0],...]開頭。然後將DQN的輸出乘以這個mask,就可以排除所有不需要的Q-值。然後,按列求和,去除所有的零,只保留有經驗的狀態-動作對的Q-值。得到張量Q_values,包含批次中每個經驗的預測的Q-值。
  • 然後,計算損失:即有經驗的狀態-動作對的目標Q-值和預測Q-值的均方誤差。
  • 最後,對可訓練變數,用梯度下降步驟減小損失。

這是最難的部分。現在,訓練模型就簡單了:

for episode in range(600):      obs = env.reset()      for step in range(200):          epsilon = max(1 - episode / 500, 0.01)          obs, reward, done, info = play_one_step(env, obs, epsilon)          if done:              break      if episode > 50:          training_step(batch_size)

跑600次遊戲,每次最多200步。在每一步,先計算ε-貪婪策略的epsilon值:這個值在500個周期內,從1線性降到0.01。然後調用play_one_step()函數,用ε-貪婪策略挑選動作,然後執行並在接力快取中記錄經驗。如果周期結束,就退出循環。最後,如果超過了50個周期,就調用training_step()函數,用從接力快取取出的批次樣本訓練模型。玩50個周期,而不訓練的原因是給接力快取一些時間來填充(如果等待的不夠久,則接力快取中的樣本散度太小)。像上面這樣,我們就實現了深度Q-學習演算法。

圖18-10展示了智慧體在每個周期獲得的總獎勵。

圖18-10 深度Q學習演算法的學習曲線

可以看到,在前300個周期,演算法的進步不大(部分是因為ε在一開始時非常高),然後表現突然提升到了200(環境最高值)。這說明演算法效果不錯,並且比策略梯度演算法快得多!但僅僅幾個周期之後,性能就驟降到了25。這被稱為「災難性遺忘」,這是所有RL演算法都面臨的大問題:隨著智慧體探索環境,不斷更新策略,但是在環境的一部分學到的內容可能和之前學到的內容相悖。經驗是關聯的,學習環境不斷改變 —— 這不利於梯度下降!如果增加接力快取的大小,可以減輕這個問題。但真實的情況是,強化學習很難:訓練通常不穩定,需要嘗試許多超參數值和隨機種子。例如,如果改變每層神經元的數量,從32到30或34,模型表現不會超過100(DQN只有一個隱藏層時,可能更穩定)。

筆記:強化學習非常困難,很大程度是因為訓練的不穩定性,以及巨大的超參數和隨機種子的不穩定性。就像Andrej Karpathy說的:「監督學習自己就能工作,強化學習被迫工作」。你需要時間、耐心、毅力,還有一點運氣。這是為什麼強化學習不是常用的深度學習演算法的原因。除了AlphaGo和Atari遊戲,還有一些其它應用:例如,Google使用RL優化數據中心的費用,也用於一些機器人應用的超參數調節,和推薦系統。

你可能想為什麼我們不畫出損失。事實證明損失不是模型表現的好指標。就算損失下降,智慧體的表現也可能更糟(例如,智慧體困在了環境中,則DQN開始對區域過擬合)。相反的,損失可能變大,但智慧體表現不錯(例如,如果DQN知道Q-值,就能提高預測的品質,智慧體就能表現得更好,得到更多獎勵,但因為DQN還設置了更大的目標,所以誤差增加了)。

我們現在學的基本的深度Q-學習演算法,在玩Atari時太不穩定。DeepMind是怎麼做的呢?他們調節了演算法。

深度Q-學習的變體

下面看幾個深度Q-學習演算法的變體,它們不僅訓練穩定而且很快。

固定Q-值目標

在基本的深度Q-學習演算法中,模型不僅做預測還自己設置目標。有點像一隻狗追自己的尾巴。回饋循環使得網路不穩定:會發生分叉、搖擺、凍結,等等。要解決問題,DeepMind在2013年的論文中使用了兩個DQN,而不是一個:第一個是在線模型,它在每一步進行學習,並移動智慧體;另一個是目標模型只定義目標。目標模型只是在線模型的克隆:

target = keras.models.clone_model(model)  target.set_weights(model.get_weights())

然後,在training_step()函數中,只需要變動一行,使用目標模型計算接下來狀態的Q-值:

next_Q_values = target.predict(next_states)

最後,在訓練循環中,必須每隔一段周期(比如,每50個周期),將在線模型的權重複制到目標模型中:

if episode % 50 == 0:      target.set_weights(model.get_weights())

因為目標模型更新的沒有在線模型頻繁,Q-值目標更加穩定,前面討論回饋循環減弱了。這個方法是DeepMind在2013年的論文中提出的方法之一,可以讓智慧體從零學習Atari遊戲。要穩定訓練,他們使用的學習率是0.00025,很小,每隔10000步才更新目標模型,接力快取的大小是1百萬。並且epsilon降低的很慢,用1百萬步從1降到0.1,他們讓演算法運行了5000萬步。

本章後面會用這些超參數,使用TF-Agents庫訓練DQN智慧體來玩Breakout。在此之前,再看另一個性能更好的DQN變體。

雙DQN

2015年的論文中,DeepMind調節了他們的DQN演算法,提高了性能,也穩定化了訓練。他們稱這個變體為雙DQN。演算法更新的原因,是觀察到目標網路傾向於高估Q-值。事實上,假設所有動作都一樣好:目標模型預測的Q-值應該一樣,但因為是估計值,其中一些可能存在更大的幾率。目標模型會選擇最大的Q-值,最大的Q-值要比平均Q-值稍大,就像高估真正的Q值(就像在測量池塘深度時,測量隨機水波的最高峰)。要修正這個問題,他們提出使用在線模型,而不是目標模型,來選擇下一狀態的最佳動作,只用目標模型估計這些最佳動作的Q-值。下面是改善後的training_step()函數:

def training_step(batch_size):      experiences = sample_experiences(batch_size)      states, actions, rewards, next_states, dones = experiences      next_Q_values = model.predict(next_states)      best_next_actions = np.argmax(next_Q_values, axis=1)      next_mask = tf.one_hot(best_next_actions, n_outputs).numpy()      next_best_Q_values = (target.predict(next_states) * next_mask).sum(axis=1)      target_Q_values = (rewards +                         (1 - dones) * discount_factor * next_best_Q_values)      mask = tf.one_hot(actions, n_outputs)      [...] # the rest is the same as earlier

幾個月之後,人們又提出了另一個改進的DQN演算法。

優先經驗接力

除了均勻地從接力快取取樣經驗,如果更頻繁地取樣重要經驗如何呢?這個主意被稱為重要性取樣(importance sampling,IS)或優先經驗接力(prioritized experience replay,PER),是在2015年的論文中由DeepMind發表的。

更具體的,可以導致快速學習成果的經驗被稱為重要經驗。但如何估計呢?一個可行的方法是測量TD誤差的大小 δ = r + γ·V(s′) – V(s) 。大TD誤差說明過渡 (s, r, s′) 很值得學習。當經驗記錄在接力快取中,它的重要性被設為非常大的值,保證可以快速取樣。但是,一旦被取樣(以及每次取樣時),就計算RD誤差δ,這個經驗的優先度設為p = |δ| (加上一個小常數,保證每個經驗的取樣概率不是零)。取樣優先度為p的概率P正比於pζ,ζ是調整取樣貪婪度的超參數:當ζ=0時,就是均勻取樣,ζ=1時,就是完全的重要性取樣。在論文中,作者使用的是ζ=0.6,最優值取決於任務。

但有一點要注意,因為樣本偏向重要經驗,必須要在訓練時,根據重要性降低經驗的重要性,否則模型會對重要經驗過擬合。更加清楚的講,重要經驗取樣更頻繁,但訓練時的權重要小。要這麼做,將每個經驗的訓練權重定義為w = (n P)–β,n是接力快取的經驗數,β是平衡重要性偏向的超參數(0是不偏向,1是完全偏向)。在論文中,作者一開始使用的是β=0.4,在訓練結束,提高到了β=1。最佳值取決於任務,如果你提高了一個,也要提高其它的值。

接下來是最後一個重要的DQN演算法的變體。

對決DQN

對決DQN演算法(DDQN,不要與雙DQN混淆)是DeepMind在另一篇2015年的論文中提出的。要明白原理,首先狀態-動作對(s,a)的Q-值,可以表示為Q(s, a) = V(s) + A(s, a),其中V(s)是狀態s的值,A(s, a)是狀態s採取行動a的結果。另外,狀態的值等於狀態最佳動作a的Q-值(因為最優策略會選最佳動作),因此V(s) = Q(s, a),即A(s, a*) = 0。在對決DQN中,模型估計狀態值和每個動作的結果。因為最佳動作的結果是0,模型減去最大預測結果。下面是一個簡單的對決DQN,用Functional API實現:

K = keras.backend  input_states = keras.layers.Input(shape=[4])  hidden1 = keras.layers.Dense(32, activation="elu")(input_states)  hidden2 = keras.layers.Dense(32, activation="elu")(hidden1)  state_values = keras.layers.Dense(1)(hidden2)  raw_advantages = keras.layers.Dense(n_outputs)(hidden2)  advantages = raw_advantages - K.max(raw_advantages, axis=1, keepdims=True)  Q_values = state_values + advantages  model = keras.Model(inputs=[input_states], outputs=[Q_values])

演算法的其餘部分和之前一樣。事實上,你可以創建一個雙對決DQN,並結合優先經驗隊列!更為一般地,許多RL方法都可以結合起來,就像DeepMind在2017年的論文展示的。論文的作者將六個不同的方法結合起來,訓練了一個智慧體,稱為「彩虹」,表現很好。

不過,要實現所有這些方法,進行調試、微調,並且訓練模型需要很多工作。因此,不要重新草輪子,最好的方法是復用可擴展的、使用效果好的庫,比如TF-Agents。

TF-Agents庫

TF-Agents 庫是基於TensorFlow實現的強化學習庫,Google開發並在2018年開源。和OpenAI Gym一樣,它提供了許多現成的環境(包括了OpenAI Gym環境的包裝),還支援PyBullet庫(用於3D物理模擬),DeepMind的DM控制庫(基於MuJoCo的物理引擎),Unity的ML-Agents庫(模擬了許多3D環境)。它還使用了許多RL演算法,包括REINFORCE、DQN、DDQN,和各種RL組件,比如高效接力快取和指標。TF-Agents速度快、可擴展、便於使用、可自定義:你可以創建自己的環境和神經網路,可以對任意組件自定義。在這一節,我們使用TF-Agents訓練一個智慧體玩Breakout,一個有名的Atari遊戲(見圖18-11),使用的是DQN演算法(可以換成任何你想用的演算法)。

圖18-11 Breakout遊戲

安裝 TF-Agents

先安裝 TF-Agents 。可以使用pip安裝(如果使用的是虛擬環境,一定要先激活;如果不激活,要使用選項--user,或用管理員許可權):

$ python3 -m pip install --upgrade tf-agents

警告:寫作本書時,TF-Agents還很新,每天都有新進展,因此API可能會和現在有所不同 —— 但大體相同。如果程式碼不能運行,我會更新Jupyter notebook。

然後,創建一個TF-Agents包裝了OpenAI GGym的Breakout的環境。要這麼做,需要先安裝OpenAI Gym的Atari依賴:

$ python3 -m pip install --upgrade 'gym[atari]'

這條命令安裝了atari-py,這是Arcade學習環境的Python介面,這個學習環境是基於Atari 2600模擬器Stella。

TF-Agents 環境

如果一切正常,就能引入TF-Agents,創建Breakout環境了:

>>> from tf_agents.environments import suite_gym  >>> env = suite_gym.load("Breakout-v4")  >>> env  <tf_agents.environments.wrappers.TimeLimit at 0x10c523c18>

這是OpenAI Gym環境的包裝,可以通過屬性gym訪問:

>>> env.gym  <gym.envs.atari.atari_env.AtariEnv at 0x24dcab940>

TF-Agents 環境和 OpenAI Gym 環境非常相似,但有些差別。首先,reset()方法不返回觀察;返回的是TimeStep對象,它包裝了觀察,和一些其它資訊:

>>> env.reset()  TimeStep(step_type=array(0, dtype=int32),           reward=array(0., dtype=float32),           discount=array(1., dtype=float32),           observation=array([[[0., 0., 0.], [0., 0., 0.],...]]], dtype=float32))

step()方法返回的也是TimeStep對象:

>>> env.step(1) # Fire  TimeStep(step_type=array(1, dtype=int32),           reward=array(0., dtype=float32),           discount=array(1., dtype=float32),           observation=array([[[0., 0., 0.], [0., 0., 0.],...]]], dtype=float32))

屬性rewardobservation是獎勵和觀察,與OpenAI Gym相同(除了reward表示為NumPy數組)。對於周期的第一個時間步,屬性step_type等於0,1是中間步,2後最後一步。可以調用時間步的is_last()方法,檢測是否是最後一步。最後,discount屬性指明了在這個時間步的衰減率。在這個例子中的值等於1,所以沒有任何衰減。可以通過在載入環境時設置discount參數,定義衰減因子。

筆記:在任何時候,你可以通過調用方法current_time_step() method.訪問環境的當前時間步。

環境配置

TF-Agents 環境提供了配置,包括觀察、動作、時間步,以及它們的形狀、數據類型、名字,還有最小值和最大值:

>>> env.observation_spec()  BoundedArraySpec(shape=(210, 160, 3), dtype=dtype('float32'), name=None,                   minimum=[[[0. 0. 0.], [0. 0. 0.],...]],                   maximum=[[[255., 255., 255.], [255., 255., 255.], ...]])  >>> env.action_spec()  BoundedArraySpec(shape=(), dtype=dtype('int64'), name=None,                   minimum=0, maximum=3)  >>> env.time_step_spec()  TimeStep(step_type=ArraySpec(shape=(), dtype=dtype('int32'), name='step_type'),           reward=ArraySpec(shape=(), dtype=dtype('float32'), name='reward'),           discount=BoundedArraySpec(shape=(), ..., minimum=0.0, maximum=1.0),           observation=BoundedArraySpec(shape=(210, 160, 3), ...))

可以看到,觀察就是Atari螢幕的截圖,用形狀是 [210, 160, 3] 的NumPy數組表示。要渲染環境,可以調用env.render(mode="human"),如果想用NumPy數組的形式返回圖片,可以調用env.render(mode="rgb_array")(與OpenAI Gym不同,這是默認模式)。

有四個可能的動作。Gym的Atari環境有另一個方法,可以知道每個動作對應什麼:

>>> env.gym.get_action_meanings()  ['NOOP', 'FIRE', 'RIGHT', 'LEFT']

提示:配置是配置類的一個實例,可以是嵌套列表、字典。如果配置是嵌套的,則配置對象必須匹配配置的嵌套結構。例如,如果觀察配置是 {"sensors": ArraySpec(shape=[2]), "camera": ArraySpec(shape=[100, 100])} ,有效觀察應該是 {"sensors": np.array([1.5, 3.5]), "camera": np.array(...)}tf.nest包提供了工具處理嵌套結構(即,nests)。

觀察結果很大,所以需要做降取樣,並轉換成灰度。這樣可以加速訓練,減少記憶體使用。要這麼做,要使用環境包裝器。

環境包裝器和Atari預處理

TF-Agents在tf_agents.environments.wrappers中,提供了一些環境包裝器。正如名字,它們可以包裝環境,轉發每個調用,還可以添加其它功能。以下是一些常見的包裝器:

ActionClipWrapper

  • 根據動作配置裁剪動作。

ActionDiscretizeWrapper

  • 將連續動作空間量化到離散的動作空間。例如,如果原始環境的動作空間是 -1.0 到 +1.0 的連續範圍,但是如果想用演算法支援離散的動作空間,比如 DQN,就可以用discrete_env = ActionDiscretizeWrapper(env, num_actions=5)包裝環境,新的discrete_env有離散的可能動作空間:0、1、2、3、4。這些動作對應原始環境的動作-1.0、-0.5、0.0、0.5、1.0。

ActionRepeat

  • 將每個動作重複n次,並積累獎勵。在許多環境中,這麼做可以顯著加速訓練。

RunStats

  • 記錄環境數據,比如步驟數和周期數。

TimeLimit

  • 超過最大的時間步數,則中斷環境。

VideoWrapper

  • 記錄環境的影片。

要創建包裝環境,需要先創建一個包裝器,將包裝過的環境傳遞給構造器。例如,下面的程式碼將一個環境包裝在ActionRepeat中,讓每個動作重複四次:

from tf_agents.environments.wrappers import ActionRepeat    repeating_env = ActionRepeat(env, times=4)

OpenAI Gym 在gym.wrappers中有一些環境包裝器。但它們是用來包裝Gym環境,不是TF-Agents環境,所以要使用的話,必須用Gym包裝器包裝Gym環境,再用TF-Agents包裝器再包裝起來。suite_gym.wrap_env()函數可以實現,只要傳入Gym環境和Gym包裝器列表,和/或 TF-Agents 包裝器的列表。另外,suite_gym.load()函數既能創建Gym環境,如果傳入包裝器,也能做包裝。每個包裝器在包裝時沒有參數,所以如果想設置參數,必須傳入lambda。例如,下面的程式碼創建了一個Breakout環境,每個周期最多運行10000步,每個動作重複四次:

from gym.wrappers import TimeLimit    limited_repeating_env = suite_gym.load(      "Breakout-v4",      gym_env_wrappers=[lambda env: TimeLimit(env, max_episode_steps=10000)],      env_wrappers=[lambda env: ActionRepeat(env, times=4)])

對於Atari環境,大多數論文使用了標準預處理步驟,TF-Agents 提供了便捷的AtariPreprocessing包裝器做預處理。以下是支援的預處理:

灰度和降取樣

  • 將觀察轉換為灰度,並降取樣(默認是84 × 84像素)

最大池化

  • 遊戲的最後兩幀使用1 × 1過濾器做最大池化。是為了去除閃爍點。

跳幀

  • 智慧體每隔n個幀做一次觀察(默認是4),對於每一幀,動作都要重複幾次,並收集所有的獎勵。這麼做可以有效加速遊戲,因為獎勵延遲降低,訓練也加速了。

丟命損失

在某些遊戲中,獎勵是基於得分的,所以智慧體死掉的話,不會立即受到懲罰。一種方法是當死掉時,立即結束遊戲。這種做法有些爭議,所以默認是關掉的。

因為默認Atari環境已經應用了隨機跳幀和最大池化,我們需要載入原生不跳幀的變體,BreakoutNoFrameskip-v4。另外,從Breakout遊戲中的一幀並不能知道球的方向和速度,這會使得智慧體很難玩好遊戲(除非這是一個RNN智慧體,它可以在步驟之間保存狀態)。應對方法之一是使用一個環境包裝器,沿著每個頻道維度,將多個幀疊起來做輸出。FrameStack4包裝器實現了這個策略,返回四個幀的棧式結果。下面就創建一個包裝過的Atari環境。

from tf_agents.environments import suite_atari  from tf_agents.environments.atari_preprocessing import AtariPreprocessing  from tf_agents.environments.atari_wrappers import FrameStack4    max_episode_steps = 27000 # <=> 108k ALE frames since 1 step = 4 frames  environment_name = "BreakoutNoFrameskip-v4"    env = suite_atari.load(      environment_name,      max_episode_steps=max_episode_steps,      gym_env_wrappers=[AtariPreprocessing, FrameStack4])

預處理的結果展示在圖18-12中。可以看到解析度更低了,但足夠玩遊戲了。另外,幀沿著頻道維度疊起來,所以紅色表示的是三步之前到現在的幀,綠色是從兩步之前,藍色是前一幀,粉色是當前幀。根據這一幀的觀察,智慧體可以看到球是像左下角移動的,所以應該繼續將板子向左移動(和前面一步相同)。

圖18-12 預處理Breakout觀察

最後,可以將環境包裝進TFPyEnvironment

from tf_agents.environments.tf_py_environment import TFPyEnvironment    tf_env = TFPyEnvironment(env)

這樣就能在TensorFlow圖中使用這個環境(在底層,這個類使用的是tf.py_function(),這可以讓圖調用任何Python程式碼)。有了TFPyEnvironment類,TF-Agents支援純Python環境和基於TensorFlow環境。更為一般的,TF-Agents支援並提供了純Python和基於TensorFlow的組件(智慧體,接力快取,指標,等等)。

有了Breakout環境,預處理和TensorFlow支援,我們必須創建DQN智慧體,和其它要訓練的組件。下面看看系統架構。

訓練架構

TF-Agents 訓練程式通常分為兩個並行運行的部分,見圖18-13:左邊,driver使用收集策略探索環境選擇動作,並收集軌跡(即,經驗),將軌跡發送給observer,observer將軌跡存儲到接力快取中;右邊,智慧體從接力快取中取出軌跡批次,然後訓練網路。總而言之,左邊的部分探索環境、收集軌跡,右邊的部分學習更新收集策略。

圖18-13 一個典型的TF-Agents訓練架構

這張圖有些疑惑點,回答如下:

  • 為什麼使用多個環境呢?這是為了讓driver並行探索多個環境的複製,發揮CPU、GPU的能力,給訓練演算法提供低關聯的軌跡。
  • 什麼是軌跡?這是從一個時間步向下一個時間步過渡的簡潔表徵,或是一連串連續的從時間步n到時間步n+t的過渡。driver收集的軌跡傳給observer,再將其存入接力快取,接著再被取樣用來訓練。
  • 為什麼需要observer?driver不能直接保存軌跡嗎?事實上,driver可以直接保存軌跡,但這麼做的話,會使得架構不夠靈活。例如,如果不想使用接力快取,該怎麼做呢?如果想用軌跡做一些其它事情,比如計算指標,該怎麼做呢?事實上,observer是使用軌跡作為參數的任意函數。可以用observer將軌跡存入接力快取,或保存為TFRecord文件,或計算指標,或其它事情。另外,可以將多個observer傳給driver,廣播軌跡。

提示:儘管這個架構是最常見的,但是可以盡情自定義,可以更換成自己的組件。事實上,除非是研究新的RL演算法,更適合使用自定義的環境來做自己的任務。要這麼做,需要創建一個自定義類,繼承自tf_agents.environments.py_environment包的PyEnvironment類,並重寫一些方法,比如action_spec()observation_spec()_reset()_step()(見notebook的章節Creating a Custom TF_Agents Environment)。

現在創建好了所有組件:先是深度Q-網路,然後是DQN智慧體(負責創建收集策略),然後是接力快取和observer,一些訓練指標,driver,最後是數據集。有了所有組件之後,先用一些軌跡填充接力快取,然後運行主訓練循環。因此,從創建深度Q-網路開始。

創建深度Q-網路

TF-Agents 庫在tf_agents.networks包和子包中提供了許多網路。我們使用tf_agents.networks.q_network.QNetwork類:

from tf_agents.networks.q_network import QNetwork    preprocessing_layer = keras.layers.Lambda(                            lambda obs: tf.cast(obs, np.float32) / 255.)  conv_layer_params=[(32, (8, 8), 4), (64, (4, 4), 2), (64, (3, 3), 1)]  fc_layer_params=[512]    q_net = QNetwork(      tf_env.observation_spec(),      tf_env.action_spec(),      preprocessing_layers=preprocessing_layer,      conv_layer_params=conv_layer_params,      fc_layer_params=fc_layer_params)

這個QNetwork的輸入是觀察,每個動作輸出一個Q-值,所以必須給出觀察和動作的配置。先是預處理層:一個lambda層將觀察轉換為32位浮點數,並做歸一化(範圍落到0.0和1.0之間)。觀察包含無符號位元組,佔用空間是32位浮點數的四分之一,這就是為什麼不在前面將觀察轉換為32位浮點數;我們要節省接力快取的記憶體空間。接著,網路使用三個卷積層:第一個有32個8 × 8過濾器,步長是4,第二個有64個4 × 4過濾器,步長是2,第三個層有64個 3 × 3的過濾器,步長為1。最後,使用一個有512個神經元的緊密層,然後是一個有4個神經元的緊密輸出層,輸出是Q-值(每個動作一個Q-值)。所有卷積層和除了輸出層的緊密層使用ReLU激活函數(可以通過設置參數activation_fn改變)。輸出層不使用激活函數。

QNetwork的底層包含兩個部分:一個處理觀察的編碼網路,和一個輸出Q-值的輸出層。TF-Agent的EncodingNetwork類實現了多種智慧體都使用了的神經網路架構(見圖18-14)。

圖18-14 編碼網路架構

可能有一個或多個輸入。例如,如果每個觀察包括感測器數據加上攝影機圖片,就有兩個輸入。每個輸入可能需要一些預處理步驟,你可以通過preprocessing_layers參數指定Keras層列表,每個輸入有一個預處理層,網路會將層應用到每個對應的輸入上(如果輸入需要多個預處理層,可以傳入一個完整模型,因為Keras模型也可以用作層)。如果有兩個或更多輸入,必須通過參數preprocessing_combiner傳入其它層,將預處理層的輸出合併成一個輸出。

然後,編碼層會順序應用一列卷積層,只要指定參數conv_layer_params。這是一個包含3個元組的列表(每個卷積層一個元組),指明過濾器的數量、核大小,步長。卷積層之後,如果設置參數fc_layer_params,編碼網路可以應用一串緊密層:參數fc_layer_params是一個列表,包含每個緊密層的神經元數。另外,通過參數dropout_layer_params,還可以傳入dropout率列表(每個緊密層一個),給每個緊密成設置dropout。QNetwork將編碼網路的輸出傳入給緊密輸出層(每個動作一個神經元)。

筆記:QNetwork類非常靈活,可以創建許多不同的架構,如果需要更多的靈活性,還以通過創建自己的類:擴展類tf_agents.networks.Network,像常規自定義Keras層一樣實現。tf_agents.networks.Network類是keras.layers.Layer類的子類,前者添加了一些智慧體需要的功能,比如創建網路的淺複製(即,只複製架構,不複製權重)。例如,DQNAgent使用這個功能創建在線模型。

有了DQN,接下來創建DQN智慧體。

創建DQN智慧體

利用tf_agents​.agents包和它的子包,TF-Agents庫實現了多種類型的智慧體。我們使用類tf_agents.agents​.dqn.dqn_agent.DqnAgent

from tf_agents.agents.dqn.dqn_agent import DqnAgent    train_step = tf.Variable(0)  update_period = 4 # train the model every 4 steps  optimizer = keras.optimizers.RMSprop(lr=2.5e-4, rho=0.95, momentum=0.0,                                       epsilon=0.00001, centered=True)  epsilon_fn = keras.optimizers.schedules.PolynomialDecay(      initial_learning_rate=1.0, # initial ε      decay_steps=250000 // update_period, # <=> 1,000,000 ALE frames      end_learning_rate=0.01) # final ε  agent = DqnAgent(tf_env.time_step_spec(),                   tf_env.action_spec(),                   q_network=q_net,                   optimizer=optimizer,                   target_update_period=2000, # <=> 32,000 ALE frames                   td_errors_loss_fn=keras.losses.Huber(reduction="none"),                   gamma=0.99, # discount factor                   train_step_counter=train_step,                   epsilon_greedy=lambda: epsilon_fn(train_step))  agent.initialize()

逐行看下程式碼:

  • 首先創建計算訓練步驟數的變數。
  • 然後創建優化器,使用2015 DQN論文相同的超參數。
  • 接著,創建對象PolynomialDecay,根據當前的訓練步驟(用於降低學習率,也可以是其它值),用於計算ε-貪婪收集策略的ε值。在100萬 ALE 幀內(等於250000步驟,因為跳幀周期等於4),將ε值從1降到0.01(也是2015 DQN論文的用值)。另外,每隔4步(即,16個ALE幀),所以ε值是在62500個訓練步內下降的。
  • 然後創建DQNAgent,傳入時間步和動作配置、QNetwork、優化器、目標模型更新間的訓練步驟數、損失函數、衰減率、變數train_step、返回ε值的函數(不接受參數,這就是為什麼使用匿名函數傳入train_step的原因)。注意,損失函數對每個實例返回一個誤差,不是平均誤差,所以要設置reduction="none"
  • 最後,啟動智慧體。

然後,創建接力快取和observer。

創建接力快取和observer

TF-Agents 庫在tf_agents.replay_buffers包實現了多種接力快取。一些是用純Python寫的(模組名開頭是py_),其它是基於TensorFlow的(開頭是tf_)。我們使用tf_agents.replay_buffers.tf_uniform_replay_buffer包追蹤的TFUniformReplayBuffer類。它實現了高效均勻取樣的接力快取:

from tf_agents.replay_buffers import tf_uniform_replay_buffer    replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(      data_spec=agent.collect_data_spec,      batch_size=tf_env.batch_size,      max_length=1000000)

看一下這些參數:

data_spec

  • 數據的配置會存儲在接力快取中。DQN智慧體知道收集數據什麼樣,通過屬性collect_data_spec做數據配置。

batch_size

  • 軌跡數量添加到每個步驟。在這個例子中,軌跡數是1,因為driver每個步驟執行一個動作收集一個軌跡。如果環境是一個批次化的環境(環境在每個時間步接收批次動作,返回批次觀察),則driver必須在每個時間步保存批次的軌跡。因為使用的是TensorFlow接力快取,需要知道批次大小(創建計算圖)。批次化環境的一個例子是ParallelPyEnvironment(出自包tf_agents.environments.parallel_py_environment):用獨立進程並行運行多個環境(對於相同的動作和觀察配置,進程可以不同),每個步驟接收批次化的動作,並在環境中執行(每個環境一個動作),然後返回所有觀察結果。

max_length

  • 接力快取的最大大小。我們創建一個可以存儲100萬個軌跡的接力快取(和2015 DQN論文一樣)。這需要不少記憶體。

提示:當存儲兩個連續的軌跡,它們包含兩個連續的觀察,每個觀察有四個幀(因為包裝器是FrameStack4),但是第二個觀察中的三個幀是多餘的(第一個觀察中已經存在了)。換句話說,使用的記憶體大小是必須的四倍。要避免這個問題,可以使用包tf_agents.replay_buffers.py_hashed_replay_bufferPyHashedReplayBuffer:它能沿著觀察的最後一個軸對存儲的軌跡去重。

現在創建向接力快取寫入軌跡的observer。observer就是一個接收軌跡參數的函數(或是調用對象),所以可以直接使用方法add_method()(綁定replay_buffer對象)作為observer:

replay_buffer_observer = replay_buffer.add_batch

如果想創建自己的observer,可以一個包含參數trajectory的函數。如果必須有狀態,可以寫一個包含方法__call__(self, trajectory)的類。例如,下面是一個每次調用,計數器都會加1的observer(除了軌跡表示周期間的邊界,不算成一步),每隔100次累加,顯示總數(rend=""保證展示的計數器處於一條線)。

class ShowProgress:      def __init__(self, total):          self.counter = 0          self.total = total      def __call__(self, trajectory):          if not trajectory.is_boundary():              self.counter += 1          if self.counter % 100 == 0:              print("r{}/{}".format(self.counter, self.total), end="")

接下來創建一些訓練指標。

創建訓練指標

TF-Agents庫再tf_agents.metrics包中實現了幾個RL指標,一些是基於純Python的,一些是基於TensorFlow的。創建一些指標統計周期數、步驟數、周期的平均數、平均周期長度:

from tf_agents.metrics import tf_metrics    train_metrics = [      tf_metrics.NumberOfEpisodes(),      tf_metrics.EnvironmentSteps(),      tf_metrics.AverageReturnMetric(),      tf_metrics.AverageEpisodeLengthMetric(),  ]

筆記:訓練或實現策略時,對獎勵做衰減是合理的,這是為了平衡當前獎勵與未來獎勵的平衡。但是,當周期結束時,可以通過對所有未衰減的獎勵求和來做評估。出於這個原因,AverageReturnMetric計算了每個周期未衰減獎勵的和,並追蹤平均值。

任何時候,可以調用result()方法獲取指標(例如,train_metrics[0].result())。或者,可以調用log_metrics(train_metrics)記錄所有指標(這個函數位於tf_agents.eval.metric_utils包):

>>> from tf_agents.eval.metric_utils import log_metrics  >>> import logging  >>> logging.get_logger().set_level(logging.INFO)  >>> log_metrics(train_metrics)  [...]  NumberOfEpisodes = 0  EnvironmentSteps = 0  AverageReturn = 0.0  AverageEpisodeLength = 0.0

接下來創建收集driver。

創建收集driver

正如圖18-13,driver是使用給定策略探索環境的對象,收集經驗,並廣播給observer。在每一步,發生的事情如下:

  • driver將當前時間步傳給收集策略,收集策略使用時間步選擇動作,並返回包含動作的動作步對象。
  • driver然後將動作傳給環境,環境返回下一個時間步。
  • 最後,driver創建一個軌跡對象表示過渡,並廣播給所有觀察。

一些策略,比如RNN策略,是有狀態的:策略根據給定的時間步和內部狀態選擇動作。有狀態策略在動作步返回自己的狀態,driver會在下一個時間步將這個狀態返回給策略。另外,driver將策略狀態保存到軌跡中(在欄位policy_info中):當智慧體取樣一條軌跡,它必須設置策略的狀態設為取樣時間步時的狀態。

另外,就像前面討論的,環境可能是批次化的環境,這種情況下,driver將批次化的時間步傳給策略(即,時間步對象包含批次觀察、批次步驟類型、批次獎勵、批次衰減,這四個批次的大小相同)。driver還傳遞前一批次的策略狀態。然後,策略返回去批次動作步,包含著批次動作和批次策略狀態。最後,driver創建批次化軌跡(即,軌跡包含批次步驟類型、批次觀察、批次動作、批次獎勵,更一般地,每個軌跡屬性一個批次,所有批次大小相同)。

有兩個主要的driver類:DynamicStepDriverDynamicEpisodeDriver。第一個收集給定數量步驟的經驗,第二個收集給定數量周期數的經驗。我們想收集每個訓練迭代的四個步驟的經驗(正如2015 DQN論文),所以創建一個DynamicStepDriver

from tf_agents.drivers.dynamic_step_driver import DynamicStepDriver    collect_driver = DynamicStepDriver(      tf_env,      agent.collect_policy,      observers=[replay_buffer_observer] + training_metrics,      num_steps=update_period) # collect 4 steps for each training iteration

傳入環境、智慧體的收集策略、observer列表(包括接力快取observer和訓練指標),最後是要運行的步驟數(這個例子中是4)。現在可以調用方法run()來運行,但最好先用純隨機策略收集的經驗先填充接力快取。要這麼做,可以使用類RandomTFPolicy創建第二個driver,運行20000步這個策略(等於80000個模擬幀,正如2015 DQN論文)。可以用ShowProgress observer展示進展:

from tf_agents.policies.random_tf_policy import RandomTFPolicy    initial_collect_policy = RandomTFPolicy(tf_env.time_step_spec(),                                          tf_env.action_spec())  init_driver = DynamicStepDriver(      tf_env,      initial_collect_policy,      observers=[replay_buffer.add_batch, ShowProgress(20000)],      num_steps=20000) # <=> 80,000 ALE frames  final_time_step, final_policy_state = init_driver.run()

快要能運行訓練循環了。只需要最後一個組件:數據集。

創建數據集

要從接力快取取樣批次的軌跡,可以調用get_next()方法。這返回了軌跡的批次,還返回了含有樣本id和取樣概率的BufferInfo對象(可能對有些演算法有用,比如PER)。例如,下面的程式碼取樣了一個包含兩條軌跡的批次(子周期),每個包含三個連續步。這些子周期見圖18-15(每行包含一個周期的三個連續步):

>>> trajectories, buffer_info = replay_buffer.get_next(  ...     sample_batch_size=2, num_steps=3)  ...  >>> trajectories._fields  ('step_type', 'observation', 'action', 'policy_info',   'next_step_type', 'reward', 'discount')  >>> trajectories.observation.shape  TensorShape([2, 3, 84, 84, 4])  >>> trajectories.step_type.numpy()  array([[1, 1, 1],         [1, 1, 1]], dtype=int32)

trajectories對象是一個命名元組,有7個欄位。每個欄位包含一個張量,前兩個維度是2和3(因為有兩條軌跡,每個三個時間步)。這解釋了為什麼observation欄位的形狀是 [2, 3, 84, 84, 4] :這是兩條軌跡,每條軌跡三個時間步,每步的觀察是84 × 84 × 4。相似的,step_type張量的形狀是 [2, 3] :在這個例子中,兩條軌跡包含三個連續步驟,步驟是在周期的中部,(類型是1, 1, 1)。在第二條軌跡中,看不到第一個觀察中左下方的球,在接下來的兩個觀察中,球消失了,所以智慧體會死,但周期不會馬上結束,因為還剩幾條命。

圖18-15 包含三個連續步驟的兩條軌跡

每條軌跡是連續時間步和動作步的簡潔表徵,初衷是為了避免繁瑣,怎麼做呢?見圖18-16,過渡n由時間步n、動作步n、時間步n+1組成,而過渡n+1由時間步n+1、動作步n+1、時間步n+2。如果將這兩個過渡直接存入接力快取,時間步n+1是重複的。為了避免重複,nth軌跡步只包括時間步n的類型和觀察(不是獎勵和衰減),不包括時間步n+1的觀察(但是,不包括下一個時間步類型的複製)。

圖18-16 軌跡,過渡,時間步和動作步

因此,如果有批次軌跡,每個軌跡有t+1步驟(從時間步n到時間步n+t),包含從時間步n到時間步n+t的所有數據,但沒有獎勵和時間步n的衰減(但包括時間步n+t+1的獎勵和衰減)。這表示t過渡(n 到 n + 1, n + 1 到 n + 2, …, n + t – 1 到 n + t)

模組tf_agents.trajectories.trajectory中的函數to_transition()將批次化的軌跡轉變為包含批次time_stepaction_stepnext_time_step的列表。注意,第二個維度是2,而不是3,這是因為t + 1個時間步之間有t個過渡:

>>> from tf_agents.trajectories.trajectory import to_transition  >>> time_steps, action_steps, next_time_steps = to_transition(trajectories)  >>> time_steps.observation.shape  TensorShape([2, 2, 84, 84, 4]) # 3 time steps = 2 transitions

筆記:取樣的軌跡可能會將兩個(或多個)周期重疊!這種情況下,會包含邊界過渡,意味著過渡的step_type等於2(結束),next_step_type等於0(開始)。當然,TF-Agents可以妥善處理這些軌跡(例如,通過在碰到邊界時重新設置策略狀態)。軌跡的方法is_boundary()返回只是每一步是否是邊界的張量。

對於主訓練循環,不使用get_next(),而是用tf.data.Dataset。這樣,就能藉助Data API的高效(並行計算和預提取)。要這麼做,可以調用接力快取的as_dataset()方法:

dataset = replay_buffer.as_dataset(      sample_batch_size=64,      num_steps=2,      num_parallel_calls=3).prefetch(3)

在每個訓練步驟,提取包含64條軌跡的批次(和2015 DQN論文一樣),每條軌跡有兩步(即,2 步 = 1個完整過渡,包括下一步的觀察)。這個數據集能並行處理三條軌跡,預提取三條軌跡。

筆記:對於策略演算法,比如策略梯度,每個經驗只需取樣一次,訓練完就可以丟掉。在這個例子中,你還可以使用一個接力快取,但使用接力快取的gather_all()方法,在每個訓練迭代獲取軌跡張量,訓練完,再動過clear()方法清空接力快取。

有了所有組件之後,就可以訓練模型了。

創建訓練循環

要加速訓練,將主函數轉換為TensorFlow函數。可以使用函數tf_agents.utils.common.function(),它包裝了tf.function(),還有一些其它選項:

from tf_agents.utils.common import function    collect_driver.run = function(collect_driver.run)  agent.train = function(agent.train)

寫一個小函數,可以n_iterations次運行主訓練循環:

def train_agent(n_iterations):      time_step = None      policy_state = agent.collect_policy.get_initial_state(tf_env.batch_size)      iterator = iter(dataset)      for iteration in range(n_iterations):          time_step, policy_state = collect_driver.run(time_step, policy_state)          trajectories, buffer_info = next(iterator)          train_loss = agent.train(trajectories)          print("r{} loss:{:.5f}".format(              iteration, train_loss.loss.numpy()), end="")          if iteration % 1000 == 0:              log_metrics(train_metrics)

這個函數先向收集策略詢問初始狀態(給定環境批次大小,這個例子中是1)。因為策略是無狀態的,返回的是空元組(所以可以寫成policy_state = ())。然後,創建一個數據集的迭代器,並運行訓練循環。在每個迭代,調用driver的run()方法,傳入當前的時間步(最初是None)和當前的策略狀態。運行收集策略,收集四步的經驗,將收集到的軌跡廣播給接力快取和指標。然後,從數據集取樣一個批次軌跡,傳給智慧體的train()方法。返回對象train_loss,可能根據智慧體的類型有變動。接著,展示迭代數和訓練損失,每隔1000次迭代,輸出所有指標的日誌。現在可以調用train_agent()做一些迭代,智慧體就能逐漸學會玩Breakout了。

train_agent(10000000)

訓練需要大量算力和極大的耐心(根據硬體,可能需要幾個小時甚至幾天),可能還需要用不同的隨機種子多次運行,以得到更好的結果,但是訓練完成後,智慧體在玩Breakout就比人厲害了。你還可以在其它Atari遊戲上訓練這個DQN智慧體:智慧體對於大多數動作遊戲都可以超越人的表現,但是智慧體對長故事線遊戲不擅長。

流行RL演算法概覽

本章結束前,快速瀏覽一些流行的RL演算法:

Actor-Critic 演算法

  • 將策略梯度和深度Q-網路結合而成RL演算法族。Actor-Critic智慧體包含兩個神經網路:一個策略網路和一個DQN。用智慧體的經驗正常訓練DQN。與常規PG相比,策略網路的學習有所不同:智慧體(actor)依賴DQN(critic)估計的動作值。就像運動員(智慧體)在教練(DQN)的幫助下學習。

非同步優勢Actor-Critic 演算法(A3C)

  • 這是DeepMind在2016年推出的重要的Actor-Critic 演算法的變體,其中多個智慧體並行學習,探索環境的不同複製。每隔一段間隔,每個智慧體非同步更新主網權重,然後從網路拉取最新權重。每個智慧體都對網路產生共現,也從其它智慧體學習。另外,DQN不估計Q-值,而是估計每個動作的優勢,這樣可以穩定訓練。

優勢Actor-Critic 演算法(A2C)

  • A3C演算法的變體,去除了非同步。所有模型更新是同步的,所以梯度更新傾向於大批次,可以讓模型更好地利用GPU。

軟Actor-Critic 演算法(SAC)

  • Tuomas Haarnoja 和其它UC Berkeley研究員在2018年提出的Actor-Critic變體。這個演算法不僅學習獎勵,還最大化其動作的熵。換句話說,在儘可能獲取更多獎勵的同時,盡量不可預測。這樣可以鼓勵智慧體探索環境,可以加速訓練。在DQN的估計不好時,可以避免重複執行相同的動作。這個演算法取樣非常高效(與前面的演算法相反,前者取樣慢)。TF-Agents中有SAC。

近似策略優化(PPO)

  • 基於A2C的演算法,它能裁剪函數的損失,避免過量權重更新(會導致訓練不穩定)。PPO是信任區域策略優化(TRPO)的簡化版本,作者是John Schulman和其它OpenAI研究員。OpenAI在2019年四月弄了個大新聞,他們用基於PPO的OpenAI Five打敗了多人遊戲Dota2的世界冠軍。TF-Agents中有PPO。

基於好奇探索

  • RL演算法中反覆出現的問題是獎勵過於稀疏,這使得學習太慢且低效。Deepak Pathak和其它UC Berkeley的研究員提出了解決方法:忽略獎勵,讓智慧體極度好奇地探索環境?獎勵變為了智慧體的一部分,而不是來自環境。相似的,讓孩子變得更好奇,比純粹的獎勵孩子,能取得更好的結果。怎麼實現呢?智慧體不斷地預測動作的結果,並探索結果不匹配預測的環境。換句話說,智慧體想得到驚喜。如果結果是可預測的(枯燥),智慧體就去其它地方。但是,如果結果不可預測,智慧體發現無法控制結果,也會變得無聊。只用好奇心,作者成功地訓練智慧體玩電子遊戲:即使智慧體失敗不會受懲罰,遊戲也會結束,智慧體是玩膩了。

這一章學習了許多主題:策略梯度、馬爾科夫鏈、馬爾科夫決策過程、Q-學習、近似Q-學習、深度Q-學習及其變體(固定Q-值目標、雙DQN、對決DQN、優先經驗接力)。還討論了如何使用TF-Agents訓練智慧體,最後瀏覽了一些流行的演算法。強化學習是一個龐大且令人興奮的領域,每天都有新主意和新演算法冒出來,希望這章能激發你的好奇心!

練習

  1. 如何定義強化學習?它與傳統的監督和非監督學習有什麼不同?
  2. 你能想到什麼本章沒有提到過的強化學習的應用?環境是什麼?智慧體是什麼?什麼是可能的動作,什麼是獎勵?
  3. 什麼是衰減率?如果你修改了衰減率那最優策略會變化嗎?
  4. 如何測量強化學習智慧體的表現?
  5. 什麼是信用分配問題?它怎麼出現的?怎麼解決?
  6. 使用接力快取的目的是什麼?
  7. 什麼是off策略 RL 演算法?
  8. 使用策略梯度處理 OpenAI gym 的「LunarLander-v2」 環境。需要安裝Box2D依賴(python3 -m pip install gym[box2d])。
  9. 用任何可行的演算法,使用TF-Agents訓練可以達到人類水平的可以玩SpaceInvaders-v4的智慧體。
  10. 如果你有大約 100 美元備用,你可以購買 Raspberry Pi 3 再加上一些便宜的機器人組件,在 Pi 上安裝 TensorFlow,然後讓我們嗨起來~!舉個例子,看看 Lukas Biewald 的這個有趣的帖子,或者看看 GoPiGo 或 BrickPi。從簡單目標開始,比如讓機器人轉向最亮的角度(如果有光感測器)或最近的物體(如果有聲吶感測器),並移動。然後可以使用深度學習:比如,如果機器人有攝影機,可以實現目標檢測演算法,檢測人並向人移動。還可以利用RL演算法讓智慧體自己學習使用馬達達到目的。

參考答案見附錄 A。


(第二部分:深度學習) 第10章 使用Keras搭建人工神經網路 第11章 訓練深度神經網路 第12章 使用TensorFlow自定義模型並訓練 第13章 使用TensorFlow載入和預處理數據 第14章 使用卷積神經網路實現深度電腦視覺 第15章 使用RNN和CNN處理序列 第16章 使用RNN和注意力機制進行自然語言處理 第17章 使用自編碼器和GAN做表徵學習和生成式學習 第18章 強化學習 [第19章 規模化訓練和部署TensorFlow模型]