運用計算圖搭建卷積神經網路(CNN)

  • 2019 年 11 月 21 日
  • 筆記

文章作者:張覺非 360

編輯整理:Hoh Xil

內容來源:作者授權

出品社區:DataFun

註:歡迎轉載,轉載請註明出處

「您否認這座由花與礦物構成的永恆的藥房,專為治療被稱為人類的永恆的患者?!」 「我既不否認藥房,也不否認患者。我否認的是醫生。」 ——《巴黎聖母院》

幹嘛引這麼兩句?眾明公自行體味。由於加入了矩陣計算的運算元,我們的計算圖框架( VectorSlow )現在該改叫 MatrixSlow 了。也許哪一天我們會將它改進成 TensorSlow ,但總之 Slow 這個特性我們是不會放棄的。

在之前的文章中,我們介紹了計算圖和自動求導的原理及實現(這裡),用 MatrixSlow 搭建了一些可以被納入「非全連接神經網路」範疇的若干模型(這裡),還搭建了一些深度不一的多層全連接神經網路,並用動畫展示了它們的訓練過程(這裡)。現在,我們用 MatrixSlow 搭建卷積神經網路(CNN)並用來識別 MNIST 。關於 CNN 的介紹,可見這裡:

https://zhuanlan.zhihu.com/p/25249694

作者對本文程式碼保留後續修改的權利,完整程式碼請見碼云:

https://gitee.com/zhangjuefei/computing_graph_demo

▌1. 卷積

我們首先實現卷積運算元,程式碼如下(node.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

class Convolve(Node):      """      以第二個父節點的值為卷積核,對第一個父節點的值做二維離散卷積      """      def __init__(self, *parents):          assert len(parents) == 2          Node.__init__(self, *parents)            self.padded = None        def compute(self):            data = self.parents[0].value  # 輸入特徵圖          kernel = self.parents[1].value  # 卷積核            w, h = data.shape  # 輸入特徵圖的寬和高          kw, kh = kernel.shape  # 卷積核尺寸          hkw, hkh = int(kw / 2), int(kh / 2)  # 卷積核長寬的一半            # 補齊數據邊緣          pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2)))          self.padded = np.mat(np.zeros((pw, ph)))          self.padded[hkw:hkw + w, hkh:hkh + h] = data            self.value = np.mat(np.zeros((w, h)))            # 二維離散卷積          for i in np.arange(hkw, hkw + w):              for j in np.arange(hkh, hkh + h):                  self.value[i - hkw, j - hkh] = np.sum(                      np.multiply(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh], kernel))        def get_jacobi(self, parent):            data = self.parents[0].value  # 輸入特徵圖          kernel = self.parents[1].value  # 卷積核            w, h = data.shape  # 輸入特徵圖的寬和高          kw, kh = kernel.shape  # 卷積核尺寸          hkw, hkh = int(kw / 2), int(kh / 2)  # 卷積核長寬的一半            # 補齊數據邊緣          pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2)))            jacobi = []          mask = np.mat(np.zeros((pw, ph)))          if parent is self.parents[0]:              for i in np.arange(hkw, hkw + w):                  for j in np.arange(hkh, hkh + h):                      mask *= 0                      mask[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh] = kernel                      jacobi.append(mask[hkw:hkw+w, hkh:hkh+h].A1)          elif parent is self.parents[1]:              for i in np.arange(hkw, hkw + w):                  for j in np.arange(hkh, hkh + h):                      jacobi.append(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh].A1)          else:              raise Exception("You're not my father")            return np.mat(jacobi)

Convolve 接受兩個父節點:影像(或叫特徵圖)節點,它是一個二維矩陣;卷積核節點也是一個二維矩陣。Convolve 暫時不支援設置步幅(stride)和填充方式(padding),步幅一律為 1 ,使用補零填充。compute 以第二個父節點的值為濾波器,對第一個父節點的值做二維離散卷積(濾波)。get_jacobi 函數返回當前本節點對特徵圖或卷積核的雅克比矩陣。

▌2. 最大值池化

我們來實現最大值池化運算元,程式碼如下(node.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

class MaxPooling(Node):      """      最大值池化      """      def __init__(self, parent, size, stride):          Node.__init__(self, parent)            assert isinstance(stride, tuple) and len(stride) == 2          self.stride = stride            assert isinstance(size, tuple) and len(size) == 2          self.size = size            self.flag = None        def compute(self):          data = self.parents[0].value  # 輸入特徵圖          w, h = data.shape  # 輸入特徵圖的寬和高          dim = w * h          sw, sh = self.stride          kw, kh = self.size  # 池化核尺寸          hkw, hkh = int(kw / 2), int(kh / 2)  # 池化核長寬的一半            result = []          flag = []            for i in np.arange(0, w, sw):              row = []              for j in np.arange(0, h, sh):                    # 取池化窗口中的最大值                  top, bottom = max(0, i - hkw), min(w, i + hkw + 1)                  left, right = max(0, j - hkh), min(h, j + hkh + 1)                  window = data[top:bottom, left:right]                  row.append(                      np.max(window)                  )                    # 記錄最大值在原特徵圖中的位置                  pos = np.argmax(window)                  w_width = right - left                  offset_w, offset_h = top + pos // w_width, left + pos % w_width                  offset = offset_w * w + offset_h                  tmp = np.zeros(dim)                  tmp[offset] = 1                  flag.append(tmp)                result.append(row)            self.flag = np.mat(flag)          self.value = np.mat(result)        def get_jacobi(self, parent):            assert parent is self.parents[0] and self.jacobi is not None          return self.flag

MaxPooling 節點接受 size 參數指定池化窗口(池化核)大小,它是一個包含寬和高的 tuple 。MaxPooling 節點還接受 stride 參數,它也是一個 tuple ,指定兩個方向的步幅。compute 方法移動池化窗口,取窗口中的最大值,同時以標誌位的形式記錄下被取的最大值在原特徵圖點的位置,get_jacobi 就以這個標誌位作為雅可比,具體見程式碼。

▌3. 輔助性節點

我們需要一個 reshape 運算元改變數據的形狀(將二維特徵圖轉成一維向量,連接全連接層),程式碼如下(node.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

class Reshape(Node):      """      改變父節點的值(矩陣)的形狀      """        def __init__(self, parent, shape):          Node.__init__(self, parent)            assert isinstance(shape, tuple) and len(shape) == 2          self.to_shape = shape        def compute(self):          self.value = self.parents[0].value.reshape(self.to_shape)        def get_jacobi(self, parent):            assert parent is self.parents[0]          return np.mat(np.eye(self.dimension()))

卷積部分結束時,數據的形狀是一組多個特徵圖。這時若要連接全連接層,需要將這些特徵圖展平並連接成一個向量。Flatten 節點肩負這個任務,程式碼如下(node.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py

class Flatten(Node):      """      將多個父節點的值連接成向量      """        def compute(self):            assert len(self.parents) > 0            # 將所有負矩陣展平並連接成一個向量          self.value = np.concatenate(              [p.value.flatten() for p in self.parents],              axis=1          ).T        def get_jacobi(self, parent):            assert parent in self.parents            dimensions = [p.dimension() for p in self.parents]  # 各個父節點的元素數量          pos = self.parents.index(parent)  # 當前是第幾個父節點          dimension = parent.dimension()  # 當前父節點的元素數量            assert dimension == dimensions[pos]            jacobi = np.mat(np.zeros((self.dimension(), dimension)))          start_row = int(np.sum(dimensions[:pos]))          jacobi[start_row:start_row + dimension, 0:dimension] = np.eye(dimension)            return jacobi

▌4. 構造「層」的輔助函數

我們寫幾個構造 CNN 的「層」(layer)的函數,以方便網路的構建,程式碼如下(layer.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/layer.py

from node import *      def conv(feature_maps, input_shape, kernels, kernel_shape, activation):      """      :param feature_maps: 數組,包含多個輸入特徵圖,它們應該是值為同形狀的矩陣的節點      :param input_shape: tuple ,包含輸入特徵圖的形狀(寬和高)      :param kernels: 整數,卷積層的卷積核數量      :param kernel_shape: tuple ,卷積核的形狀(寬和高)      :param activation: 激活函數類型      :return: 數組,包含多個輸出特徵圖,它們是值為同形狀的矩陣的節點      """      outputs = []      for i in range(kernels):            channels = []          for fm in feature_maps:              kernel = Variable(kernel_shape, init=True, trainable=True)              conv = Convolve(fm, kernel)              channels.append(conv)            channles = Add(*channels)          bias = Variable(input_shape, init=True, trainable=True)          affine = Add(channles, bias)            if activation == "ReLU":              outputs.append(ReLU(affine))          elif activation == "Logistic":              outputs.append(Logistic(affine))          else:              outputs.append(affine)        assert len(outputs) == kernels      return outputs

conv 函數接受保存多個輸入特徵圖(或者稱作輸入影像的多個通道)的數組、輸入特徵圖的尺寸、卷積核數、卷積核尺寸、激活函數種類("ReLU" 或 "Logistic" 以及未來會有的其他種類)。conv 的返回值也是一個數組,包含卷積層的多個輸出特徵圖(通道),該數量與卷積核數量一致。

def pooling(feature_maps, kernel_shape, stride):      """      :param feature_maps: 數組,包含多個輸入特徵圖,它們應該是值為同形狀的矩陣的節點      :param kernel_shape: tuple ,池化核(窗口)的形狀(寬和高)      :param stride: tuple ,包含橫向和縱向步幅      :return: 數組,包含多個輸出特徵圖,它們是值為同形狀的矩陣的節點      """      outputs = []      for fm in feature_maps:          outputs.append(MaxPooling(fm, size=kernel_shape, stride=stride))        return outputs

pooling 函數構造一個池化層(目前只支援最大值池化)。該函數接受保存多個輸入特徵圖的數組、池化核(即池化窗口)尺寸、橫向和縱向的步幅。pooling 函數返回一個數組,包含多個輸出特徵圖。

def fc(input, input_size, size, activation):      """      :param input: 輸入向量      :param input_size: 輸入向量的維度      :param size: 神經元個數,即輸出個數(輸出向量的維度)      :param activation: 激活函數類型      :return: 輸出向量      """      weights = Variable((size, input_size), init=True, trainable=True)      bias = Variable((size, 1), init=True, trainable=True)      affine = Add(MatMul(weights, input), bias)        if activation == "ReLU":          return ReLU(affine)      elif activation == "Logistic":          return Logistic(affine)      else:          return affine

fc 函數構造全連接層,接受輸入數量(輸入向量的維數),神經元個數(輸出向量的維數)以及激活函數的種類。

▌5. 構造卷積神經網路

現在我們就可以搭建一個簡單的卷積神經網路了,程式碼如下(test_cnn.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/test_cnn.py

from sklearn.metrics import accuracy_score    from node import *  from util import mnist  from optimizer import *  from layer import *    print(__file__)  train_x, train_y, test_x, test_y = mnist('./data/MNIST')  test_x = test_x[:1000]  test_y = test_y[:1000]    # train_x = train_x[:10000]  # train_y = train_y[:10000]    img_shape = (28, 28)    img = Variable(img_shape, init=False, trainable=False)  # 佔位符,28x28 的影像  conv1 = conv([img], img_shape, 6, (3, 3), "ReLU")  # 第一卷積層  pooling1 = pooling(conv1, (3, 3), (2, 2))  # 第一池化層    conv2 = conv(pooling1, (14, 14), 6, (3, 3), "ReLU")  # 第二卷積層  pooling2 = pooling(conv2, (3, 3), (2, 2))  # 第二池化層    fc1 = fc(Flatten(*pooling2), 294, 100, "ReLU")  # 第一全連接層  logits = fc(fc1, 100, 10, "None")  # 第二全連接層,無激活函數    # 分類概率  prob = SoftMax(logits)    # 訓練標籤  label = Variable((10, 1), trainable=False)    # 交叉熵損失  loss = CrossEntropyWithSoftMax(logits, label)    # Adam 優化器  optimizer = Adam(default_graph, loss, 0.005, batch_size=64)      # 訓練  print("start training")  for e in range(10):        for i in range(len(train_x)):          img.set_value(np.mat(train_x[i, :]).reshape(28, 28))          label.set_value(np.mat(train_y[i, :]).T)            # 執行一步優化          optimizer.one_step()            # 顯示進度          loss.forward()          percent = int((i + 1) / len(train_x) * 100)          print("".join(["="] * percent) + "> loss:{:.3f} {:d}({:.0f}%)".format(loss.value[0, 0], i + 1, percent))            if i > 1 and (i + 1) % 5000 == 0:                # 在測試集上評估模型正確率              probs = []              losses = []              for j in range(len(test_x)):                  img.set_value(np.mat(test_x[j, :]).reshape(28, 28))                  label.set_value(np.mat(test_y[j, :]).T)                    # 前向傳播計算概率                  prob.forward()                  probs.append(prob.value.A1)                    # 計算損失值                  loss.forward()                  losses.append(loss.value[0, 0])                    # print("test instance: {:d}".format(j))                # 取概率最大的類別為預測類別              pred = np.argmax(np.array(probs), axis=1)              truth = np.argmax(test_y, axis=1)              accuracy = accuracy_score(truth, pred)                default_graph.draw()              print("epoch: {:d}, iter: {:d}, loss: {:.3f}, accuracy: {:.2f}%".format(e + 1, i + 1, np.mean(losses), accuracy * 100))

這個卷積神經網路包含兩個卷積層,第一個卷積層有 6 個卷積核,第 2 個卷積層也有 6 個卷積核。卷積核的尺寸都是3X3。每個卷積層之後是一個 stride 為 2 的最大值池化層,它們將特徵圖尺寸縮小一半。經過第二個最大值池化層後,特徵圖尺寸成為 7X7=49。之後用 Flatten 運算元將 6 個特徵圖展平成 294 維向量,後接全連接層。第一個全連接層的輸出是 100 維向量,第二個全連接層輸出 10 維向量,作為十分類的 logits 。接下來是損失值部分,然後是訓練主循環。我們構造的 CNN 的計算圖如下:

我們構造的 CNN

▌6. 訓練 Sobel 濾波器

在兩年前的博文《卷積神經網路簡介》中,我們從可訓練濾波器的角度引入 CNN 。那時我們舉了一個例子:訓練 Sobel 運算元。我們用 (縱向)Sobel 運算元對萊娜圖做濾波,效果是強化原圖中的縱向邊緣。之後,以原圖為輸入,以 Sobel 運算元濾出來的圖為 target ,構造一個計算圖,對輸入施加 3X3 的濾波,輸出與原圖同尺寸的影像,以輸出影像與 target 影像所有像素的誤差平方和作為損失,將計算圖的隨機卷積核訓練成 Sobel 濾波器,我們看程式碼(test_sobel.py):

https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/lost%20&%20found/test_sobel.py

import skimage, os  import numpy as np    from node import *  from optimizer import *  from scipy.ndimage.filters import convolve  import matplotlib.pyplot as plt    # Sobel 濾波器  filter = np.mat([          [1.,  0.,   -1.],          [2.,  0.,   -2.],          [1.,  0.,   -1.]])    # 影像尺寸  w, h = 128, 128    # 搭建計算圖  input_img = Variable((w, h), init=False, trainable=False)  # 輸入影像佔位變數  target_img = Variable((w, h), init=False, trainable=False)  # target 影像佔位變數  kernel = Variable((3, 3), init=True, trainable=True)  # 被訓練的卷積核    # 對輸入影像施加濾波  conv_img = Convolve(input_img, kernel)    # 將輸入影像和 target 影像展成向量  target_img_flat = Reshape(target_img, shape=(w * h, 1))  conv_img_flat = Reshape(conv_img, shape=(w * h, 1))    # 常數(矩陣)[[-1]]  minus = Variable((1, 1), init=False, trainable=False)  minus.set_value(np.mat(-1))    # 常數(矩陣):影像總像素數的倒數  n = Variable((1, 1), init=False, trainable=False)  n.set_value(np.mat(1.0 / (w * h)))    # 損失值:輸出影像與 target 影像的平均像素平方誤差  loss = MatMul(Dot(          Add(target_img_flat, MatMul(conv_img_flat, minus)),          Add(target_img_flat, MatMul(conv_img_flat, minus))      ), n)    # RMSProp 優化器  optimizer = Adam(default_graph, loss, 0.06, batch_size=1)    # 讀取 lena 圖,將 rgb 影像轉成單通道灰度圖,並 resize 成指定大小  img = skimage.transform.resize(                      skimage.color.rgb2gray(                              skimage.io.imread("lena.png")                              ),                      (w, h)                      )    # 製作目標影像:對原影像施加 Sobel 濾波器  sobel = np.mat([[1,0,-1],[2,0,-2],[1,0,-1]])  target =  np.zeros((w, h))  convolve(input=img, output=target, weights=filter, mode="constant", cval=0.0)    # 保存原圖和經過 Sobel 濾波的影像  skimage.io.imsave("origin.png", np.minimum(np.maximum(img, 0.0), 1.0))  skimage.io.imsave("target.png", np.minimum(np.maximum(target, 0.0), 1.0))    # 以原圖為輸入,以經過 Sobel 濾波的影像為 target  input_img.set_value(np.mat(img))  target_img.set_value(np.mat(target))    i = 0  for e in range(1000):        # 一次迭代      optimizer.one_step()        # 計算損失值      loss.forward()      print("pic:{:s},loss:{:.6f}".format(p, loss.value[0, 0]))        # 保存當前卷積核對輸入影像做濾波的結果      fname = "{:d}.png".format(i)      if os.path.exists(fname):          os.remove(fname)      skimage.io.imsave(fname, np.minimum(np.maximum(conv_img.value, 0.0), 1.0))      i += 1

迭代進行到 302 次時,損失值已經不怎麼下降了,我們手動終止了進程。看看每次迭代輸出的影像與 target 影像:

輸出影像與 target 影像的比較

可見,輸出影像已經很接近 Sobel 濾波的 target 影像了。這時,被訓練的 kernel 節點的值是:

被訓練的卷積核節點(kernel)的值

卷積核已經被訓練得比較接近 Sobel 運算元了。

作者介紹

張覺非,本科畢業於復旦大學,碩士畢業於中國科學院大學,先後任職於新浪微博、阿里,目前就職於奇虎360,任機器學習技術專家。

對作者感興趣的小夥伴,歡迎點擊文末閱讀原文,與作者交流。

——END——

文章推薦:

深度神經網路的梯度消失(動畫)

計算圖反向傳播的原理及實現

從線性模型到神經網路

點下「在看」,給文章蓋個戳吧!?