­

使用』推土距離『構建強悍的WGAN

讀者讀到此處時或許會有一個感觸,網絡訓練的目的是讓網絡在接收輸入數據後,它輸出的結果在給定衡量標準上變得越來越好,由此「衡量標準」設計的好壞對網絡訓練最終結果產生至關重要的作用。

回想上一節,當我們把N張數據圖片輸入到網絡後,網絡會輸出一個含有N個分量的向量,接着我們先構造一個含有N個1的向量,然後判斷網絡得出的向量與構造的含有N個1的向量是否足夠「接近」。

算法判斷兩個向量是否接近的標準是「交叉熵」,也就是

其中

對應構造的含有N個1的向量中對應的分量,也就是無論i取什麼值都有:

則是網絡接收第i張圖片後輸出其為真實圖片對應的概率。當輸入圖片比較複雜時,使用交叉熵來衡量輸出結果的好壞在數學上有嚴重缺陷,簡單的說交叉熵不能夠精確的衡量網絡是否已經有效的識別出圖片特徵,這裡我們介紹另一種衡量方法叫「推土距離」。推土距離的定義如下,假設地面兩處位置上有兩個形狀不同的土堆,如下圖所示:

,P和Q分佈表示兩處土堆,每個長條方塊可以看做是一個小沙丘,你的任務是使用推土機將P中某個沙丘上的土搬到另一個沙丘,使得最后土堆P的形狀和Q的形狀一模一樣。顯然沙土的搬運方法有很多種,一種搬運法如圖下圖所示:

上圖,箭頭表示把沙土從箭頭起始的沙丘搬運到箭頭所指向的沙丘,當然還可以有另外的搬運法,如下圖所示:

如圖17-7所示,將土堆從箭頭起始的沙丘搬運到箭頭指向的沙丘,所得結果也能使土堆P向土堆Q轉換,但如果我們考慮到搬運的成本,如果將搬運土堆的重量乘以土堆移動的距離作為一次搬運成本,那麼不難看第一章圖所示的搬運法比1第二張圖所示的搬運法更節省。

所謂搬圖距離就是所有可行的搬土方法中能實現成本最小的那種搬運方法,使用W(P,Q)來標記。不難看出P和Q其實可以對應兩種不同的概率分佈,因此推土距離本質上就是將給定概率分佈P轉換成概率分佈Q,並且要求轉換所產生的成本要儘可能小。我們可以通過下圖對「推土距離」進行更形象的理解:

上圖中,在P和Q之間對應一個二維矩陣,每一行對應將土堆P對應沙丘中的沙土暈倒Q中對應列所示沙丘的距離,方塊的顏色越深表示表示運送沙土的數量越多,使用符號

來表示上圖所示矩陣,注意到它的每一行所有元素加總對應P中所在沙丘的含土量,每一列對應Q中相應沙丘的含土量,因此使用

表示將土堆中Xp對應沙丘運送到Xq對應沙丘的土量,使用

表示兩個沙丘的距離,那麼一個搬運方案就可以使用公式

來表示。而推土距離就是所有可行方案中擁有最小成本那種,使用

來表示,其中符號

表示所有可行搬運方案的集合,推土距離是數學最優化領域中非常複雜的難題。接下來我們看看WGAN網絡的數學原理,我們就可以使用搬圖距離來衡量網絡輸出結果的好壞,算法將使用下面公式來描述Discriminator網絡的損失函數:

要說明該公式能表示G,D之間的推土距離需要相當複雜的推導,在此我們暫時忽略。公式看起來似乎很複雜,讀者不必要被它嚇到,它要做的事情很簡單。在17.1.1節中,如果圖形來自於數據集,那麼算法就構造全是1的向量,如果圖像來自生成者網絡,那麼算法就 構造全是0的向量。

根據上面公式我們對算法做一些小修改,如果圖像來自生成者網絡,那麼構造分量全是-1的分量,這意味着算法將訓練Discriminator網絡,使得它接收N張來自數據集的圖片,輸出的N個結果的平均值要儘可能大。

在公式中還有一個約束條件需要注意,那就是:

滿足該條件的函數必須具備如下性質:

也就是說如果把Discriminator網絡看做一個函數,那麼網絡輸出數據的特性必須滿足上面公式。但是在實踐上我們無法直接構造一個網絡使得它的特性滿足上面公式,因此算法使用一種便宜之計就是將Discriminator網絡內部參數的值限定在區間(-1,1)。「偏移之計」的做法其實並不能讓鑒別者網絡滿足約束條件,只不過它能讓算法取得較好的結果,在後面會給出更好的處理方法。接下來我們看看WGAN網絡的實現,首先我們要加載訓練所需的圖片數據:

import numpy as np  import os  from keras.datasets import cifar10  def  load_cifar10(label):#加載keras代碼庫自帶的cifar數據集,裏面是各種物體的圖片       (x_train, y_train), (x_test, y_test) = cifar10.load_data()       train_mask = [y[0] == label for y in y_train] #將給定標籤的圖片挑選出來       test_mask = [y[0] == label for y in y_test]       x_data = np.concatenate([x_train[train_mask], x_test[test_mask]] )       y_data = np.concatenate([y_train[train_mask], y_test[test_mask]])       x_data = (x_data.astype('float32') - 127.5) / 127.5       return (x_data, y_data)  CIFAR_HORSE_LABEL = 7 #圖片類別由標籤值對應,7對應所有馬的圖片  (x_train, y_train) = load_cifar10(CIFAR_HORSE_LABEL)#加載所有馬圖片  import matplotlib.pyplot as plt  plt.imshow((x_train[150, :,:, :] + 1) / 2)  

代碼將keras庫附帶的數據集cifar加載到內存,該數據集對應了多種物品的的圖片,每種特定物品使用標籤值就行區分,代碼中使用的標籤值7對應所有馬的圖片,後面實現的WGAN將專門使用馬的圖片來訓練,因此訓練結束後網絡會學會如何繪製馬的圖片,上面代碼運行後所得結果如下圖所示:

接下來構造生成者和鑒別者網絡並將其拼接成一個整體:

import glob  import imageio  import matplotlib.pyplot as plt  import numpy as np  import os  import PIL  from tensorflow.keras import layers  import time  from IPython import display  BUFFER_SIZE = 6000  BATCH_SIZE = 256  EPOCHS = 12000  # 批量化和打亂數據  train_dataset = tf.data.Dataset.from_tensor_slices(x_train).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)  class Model(tf.keras.Model):      def  __init__(self):          super(Model, self).__init__()          self.model_name = "Model"          self.model_layers = []      def  call(self, x):          x = tf.convert_to_tensor(x, dtype = tf.float32)          for layer in self.model_layers:              x = layer(x)          return x  class  Generator(Model):      def  __init__(self):          super(Generator, self).__init__()          self.model_name = "generator"          self.generator_layers = []          self.generator_layers.append(tf.keras.layers.Dense(4*4*128, use_bias = False))          self.generator_layers.append(tf.keras.layers.BatchNormalization(momentum = 0.8))          self.generator_layers.append(tf.keras.layers.LeakyReLU())          self.generator_layers.append(tf.keras.layers.Reshape((4, 4, 128)))          self.generator_layers.append(tf.keras.layers.UpSampling2D())          self.generator_layers.append(tf.keras.layers.Conv2D(128, (5, 5),strides = (1,1),                                                                padding = 'same',                                                                use_bias = False))          self.generator_layers.append(tf.keras.layers.BatchNormalization())          self.generator_layers.append(tf.keras.layers.LeakyReLU())          self.generator_layers.append(tf.keras.layers.UpSampling2D()) #upSampling2D將數據通過複製的方式擴大一倍          self.generator_layers.append(tf.keras.layers.Conv2D(64, (5,5), strides = (1,1),padding = 'same',                                                                use_bias = False))          self.generator_layers.append(tf.keras.layers.BatchNormalization(momentum = 0.8))          self.generator_layers.append(tf.keras.layers.LeakyReLU())          self.generator_layers.append(tf.keras.layers.UpSampling2D())          self.generator_layers.append(tf.keras.layers.Conv2DTranspose(32, (5,5), strides = (1,1),                                                                padding = 'same',                                                                use_bias = False))          self.generator_layers.append(tf.keras.layers.BatchNormalization())          self.generator_layers.append(tf.keras.layers.LeakyReLU())          self.generator_layers.append(tf.keras.layers.Conv2DTranspose(3, (5,5), strides = (1,1),                                                                padding = 'same',                                                                use_bias = False,                                                                activation = 'tanh'))          self.model_layers = self.generator_layers #最終輸出數據的規格為(32,32,3)      def  create_variables(self, z_dim):          x =  np.random.normal(0, 1, (1, z_dim))          x = self.call(x)  class Discriminator(Model):      def __init__(self):#鑒別者網絡卷積層的規格為(32, 64,128, 128)          super(Discriminator, self).__init__()          self.model_name = "discriminator"          self.discriminator_layers = []          self.discriminator_layers.append(tf.keras.layers.Conv2D(32, (5,5), strides = (2,2),                                                                   padding = 'same'))          self.discriminator_layers.append(tf.keras.layers.LeakyReLU())          self.discriminator_layers.append(tf.keras.layers.Conv2D(64, (5,5), strides = (2,2),                                                                   padding = 'same'))          self.discriminator_layers.append(tf.keras.layers.LeakyReLU())          self.discriminator_layers.append(tf.keras.layers.Dropout(0.3))          self.discriminator_layers.append(tf.keras.layers.Conv2D(128, (5,5), strides = (2,2),                                                           padding = 'same'))          self.discriminator_layers.append(tf.keras.layers.LeakyReLU())          self.discriminator_layers.append(tf.keras.layers.Conv2D(128, (5,5), strides = (1,1),                                                           padding = 'same'))          self.discriminator_layers.append(tf.keras.layers.LeakyReLU())          self.discriminator_layers.append(tf.keras.layers.Flatten())          self.discriminator_layers.append(tf.keras.layers.Dense(1, activation = "tanh"))          self.model_layers = self.discriminator_layers      def  create_variables(self): #必須要調用一次call網絡才會實例化          x = np.expand_dims(x_train[200, :,:,:], axis = 0)          self.call(x)  

讀者需要注意,在代碼實現中,鑒別者和生成者網絡跟上一節有一些明顯差異,首先鑒別者網絡的卷積層輸出規格變為(32, 64, 128, 128),同時去掉了Dorpout網絡層,生成者網絡使用Upsampling2D來擴展數據規格。 此處需要展開說明Upsampling2D網絡層的操作流程,它的作用與17.1.1節使用的Conv2DTranspose一樣,都是將輸入數據的規格擴大一倍,但做法不同,它僅僅是將輸入二維數組的元素進行複製,具體操作如下:

接下來看看網絡訓練過程的實現,訓練流程與17.1.1節大同小異,但是有幾個要點需要注意: ··· class GAN(): def init(self, z_dim): self.epoch = 0 self.z_dim = z_dim #關鍵向量的維度

     #設置生成者和鑒別者網絡的優化函數       self.discriminator_optimizer = tf.optimizers.Adam(0.0002)       self.generator_optimizer = tf.optimizers.Adam(0.0002)       self.generator = Generator()       self.generator.create_variables(z_dim)       self.discriminator = Discriminator()       self.discriminator.create_variables()       self.seed = tf.random.normal([16, z_dim])       self.d_loss = []       self.d_loss_real = []       self.d_loss_fake = []       self.g_loss = []       self.discriminator_trains = 5       self.image_batch_count = 0   def train_discriminator(self, image_batch):      '''      訓練鑒別師網絡,它的訓練分兩步驟,首先是輸入正確圖片,讓網絡有識別正確圖片的能力。      然後使用生成者網絡構造圖片,並告知鑒別師網絡圖片為假,讓網絡具有識別生成者網絡偽造圖片的能力      '''        with tf.GradientTape(persistent=True, watch_accessed_variables=False) as tape: #只修改鑒別者網絡的內部參數          tape.watch(self.discriminator.trainable_variables)          noise = tf.random.normal([len(image_batch), self.z_dim])          true_logits = self.discriminator(image_batch, training = True)          gen_imgs = self.generator(noise, training = True) #讓生成者網絡根據關鍵向量生成圖片          fake_logits = self.discriminator(gen_imgs, training = True)          d_loss_real = tf.multiply(tf.ones_like(true_logits), true_logits)#根據推土距離將真圖片的標籤設置為1          d_loss_fake = tf.multiply(-tf.ones_like(fake_logits), fake_logits)#將偽造圖片的標籤設置為-1            with tf.GradientTape(watch_accessed_variables=False) as iterploted_tape:              t = tf.random.uniform(shape = (len(image_batch), 1, 1, 1)) #生成[0,1]區間的隨機數              interploted_imgs = tf.add(tf.multiply(1 - t, image_batch), tf.multiply(t, gen_imgs))              iterploted_tape.watch(interploted_imgs)              interploted_loss = self.discriminator(interploted_imgs)          interploted_imgs_grads = iterploted_tape.gradient(interploted_loss, interploted_imgs)          grad_norms = tf.norm(interploted_imgs_grads)          penalty = 10 * tf.reduce_mean((grad_norms - 1) ** 2)          d_loss = d_loss_real + d_loss_fake + penalty      grads = tape.gradient(d_loss , self.discriminator.trainable_variables)      self.discriminator_optimizer.apply_gradients(zip(grads, self.discriminator.trainable_variables)) #改進鑒別者網絡內部參數      self.d_loss.append(d_loss)      self.d_loss_real.append(d_loss_real)      self.d_loss_fake.append(d_loss_fake)     def  train_generator(self, batch_size): #訓練生成者網絡      '''      生成者網絡訓練的目的是讓它生成的圖像儘可能通過鑒別者網絡的審查      '''      with tf.GradientTape(persistent=True,watch_accessed_variables=False) as tape: #只能修改生成者網絡的內部參數不能修改鑒別者網絡的內部參數          tape.watch(self.generator.trainable_variables)          noise = tf.random.normal([batch_size, self.z_dim])          gen_imgs = self.generator(noise, training = True) #生成偽造的圖片          d_logits = self.discriminator(gen_imgs,training = True)          g_loss = tf.multiply(tf.ones_like(d_logits), d_logits)#將標籤設置為1      grads = tape.gradient(g_loss, self.generator.trainable_variables) #調整生成者網絡內部參數使得它生成的圖片儘可能通過鑒別者網絡的識別      self.generator_optimizer.apply_gradients(zip(grads, self.generator.trainable_variables))      self.g_loss.append(g_loss)   def  train_step(self):       train_dataset.shuffle(BUFFER_SIZE)       image_batchs = train_dataset.take(self.discriminator_trains)       for image_batch in image_batchs:#注意先訓練鑒別者網絡5回才訓練生成者網絡一回           self.train_discriminator(image_batch)       self.train_generator(256)   def  train(self, epochs, run_folder):#啟動訓練流程       for  epoch in range(EPOCHS):           start = time.time()           self.epoch = epoch           self.train_step()           if  self.epoch % 10 == 0:              display.clear_output(wait=True)              self.sample_images(run_folder) #將生成者構造的圖像繪製出來              self.save_model(run_folder) #存儲兩個網絡的內部參數              print("time for epoc:{} is {} seconds".format(epoch, time.time() - start))   def  sample_images(self, run_folder): #繪製生成者構建的圖像      predictions = self.generator(self.seed)      predictions = predictions.numpy()      predictions = 0.5 * (predictions + 1)      predictions = np.clip(predictions, 0, 1)      fig = plt.figure(figsize=(4,4))      for i in range(predictions.shape[0]):          plt.subplot(4, 4, i+1)          plt.imshow(predictions[i, :, :, :] )          plt.axis('off')      plt.savefig('/content/drive/My Drive/WGAN_GP/images/sample{:04d}.png'.format(self.epoch))      plt.show()   def  save_model(self, run_folder): #保持網絡內部參數      self.discriminator.save_weights(os.path.join(run_folder, 'discriminator.h5'))      self.generator.save_weights(os.path.join(run_folder, 'generator.h5'))   def  load_model(self, run_folder):      self.discriminator.load_weights(os.path.join(run_folder, 'discriminator.h5'))      self.generator.load_weights(os.path.join(run_folder, 'generator.h5'))  

gan = GAN(z_dim = 100) gan.train(epochs = EPOCHS, run_folder = 『/content/drive/My Drive/WGAN_GP/checkpoints』) ··· 代碼與上節有幾個重要區別,第一個區別是在train_discriminator和train_generator函數中,代碼將偽造圖片對應的標籤從0改為-1,這種改動意味着鑒別者網絡在對輸入圖片的真實性進行評估。

因此在訓練鑒別者網絡時,將真實圖片的標籤設置為1,將偽造圖片的標籤設置為-1,意味着算法想訓練鑒別者網絡,讓它給真實圖片賦予更高評分,給偽造圖片賦予更低評分,生成者網絡的目的是使得生成的圖片儘可能的獲得鑒別者網絡的高評分。

訓練代碼中還有一個要點在於每次訓練完鑒別者網絡後,需要將網絡內部參數的值剪切到位於區間[-0.01,0.01]之間,這種做法目的是讓鑒別者網絡作為一個函數能滿足損失函數公式,問題在於這種做法與將網絡變成

類型的函數牛馬不相及。

算法作者提出算法時並不知道如何使鑒別者網絡變成給定類型函數,剪切網絡內部參數其實是一種權宜之計,是算法作者「試」出來的一種有效做法,就像愛迪生通過海量「遍歷」從而找到鎢絲作為燈絲那樣,上面代碼運行後生成者網絡生成的馬圖片質量如圖17-10所示:

來自數據集中的真實圖片如下所示:

從生成圖片與數據集圖片比較來看,生成圖片能準確的把握住馬的輪廓形態,皮毛特徵,也就是生成者網絡非常準確的把握住馬的內在關鍵特徵,因此它能學會如何繪製出形象的馬圖片,網絡存在的問題在於,其生成的圖片較為模糊,在下一節我們將研究如何進一步改進WGAN網絡。