運用計算圖搭建卷積神經網路(CNN)
- 2019 年 11 月 21 日
- 筆記

文章作者:張覺非 360
編輯整理:Hoh Xil
內容來源:作者授權
出品社區:DataFun
註:歡迎轉載,轉載請註明出處
「您否認這座由花與礦物構成的永恆的藥房,專為治療被稱為人類的永恆的患者?!」 「我既不否認藥房,也不否認患者。我否認的是醫生。」 ——《巴黎聖母院》
幹嘛引這麼兩句?眾明公自行體味。由於加入了矩陣計算的運算元,我們的計算圖框架( VectorSlow )現在該改叫 MatrixSlow 了。也許哪一天我們會將它改進成 TensorSlow ,但總之 Slow 這個特性我們是不會放棄的。
在之前的文章中,我們介紹了計算圖和自動求導的原理及實現(這裡),用 MatrixSlow 搭建了一些可以被納入「非全連接神經網路」範疇的若干模型(這裡),還搭建了一些深度不一的多層全連接神經網路,並用動畫展示了它們的訓練過程(這裡)。現在,我們用 MatrixSlow 搭建卷積神經網路(CNN)並用來識別 MNIST 。關於 CNN 的介紹,可見這裡:
https://zhuanlan.zhihu.com/p/25249694
作者對本文程式碼保留後續修改的權利,完整程式碼請見碼云:
https://gitee.com/zhangjuefei/computing_graph_demo
▌1. 卷積
我們首先實現卷積運算元,程式碼如下(node.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py
class Convolve(Node): """ 以第二個父節點的值為卷積核,對第一個父節點的值做二維離散卷積 """ def __init__(self, *parents): assert len(parents) == 2 Node.__init__(self, *parents) self.padded = None def compute(self): data = self.parents[0].value # 輸入特徵圖 kernel = self.parents[1].value # 卷積核 w, h = data.shape # 輸入特徵圖的寬和高 kw, kh = kernel.shape # 卷積核尺寸 hkw, hkh = int(kw / 2), int(kh / 2) # 卷積核長寬的一半 # 補齊數據邊緣 pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2))) self.padded = np.mat(np.zeros((pw, ph))) self.padded[hkw:hkw + w, hkh:hkh + h] = data self.value = np.mat(np.zeros((w, h))) # 二維離散卷積 for i in np.arange(hkw, hkw + w): for j in np.arange(hkh, hkh + h): self.value[i - hkw, j - hkh] = np.sum( np.multiply(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh], kernel)) def get_jacobi(self, parent): data = self.parents[0].value # 輸入特徵圖 kernel = self.parents[1].value # 卷積核 w, h = data.shape # 輸入特徵圖的寬和高 kw, kh = kernel.shape # 卷積核尺寸 hkw, hkh = int(kw / 2), int(kh / 2) # 卷積核長寬的一半 # 補齊數據邊緣 pw, ph = tuple(np.add(data.shape, np.multiply((hkw, hkh), 2))) jacobi = [] mask = np.mat(np.zeros((pw, ph))) if parent is self.parents[0]: for i in np.arange(hkw, hkw + w): for j in np.arange(hkh, hkh + h): mask *= 0 mask[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh] = kernel jacobi.append(mask[hkw:hkw+w, hkh:hkh+h].A1) elif parent is self.parents[1]: for i in np.arange(hkw, hkw + w): for j in np.arange(hkh, hkh + h): jacobi.append(self.padded[i - hkw:i - hkw + kw, j - hkh:j - hkh + kh].A1) else: raise Exception("You're not my father") return np.mat(jacobi)
Convolve 接受兩個父節點:影像(或叫特徵圖)節點,它是一個二維矩陣;卷積核節點也是一個二維矩陣。Convolve 暫時不支援設置步幅(stride)和填充方式(padding),步幅一律為 1 ,使用補零填充。compute 以第二個父節點的值為濾波器,對第一個父節點的值做二維離散卷積(濾波)。get_jacobi 函數返回當前本節點對特徵圖或卷積核的雅克比矩陣。
▌2. 最大值池化
我們來實現最大值池化運算元,程式碼如下(node.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py
class MaxPooling(Node): """ 最大值池化 """ def __init__(self, parent, size, stride): Node.__init__(self, parent) assert isinstance(stride, tuple) and len(stride) == 2 self.stride = stride assert isinstance(size, tuple) and len(size) == 2 self.size = size self.flag = None def compute(self): data = self.parents[0].value # 輸入特徵圖 w, h = data.shape # 輸入特徵圖的寬和高 dim = w * h sw, sh = self.stride kw, kh = self.size # 池化核尺寸 hkw, hkh = int(kw / 2), int(kh / 2) # 池化核長寬的一半 result = [] flag = [] for i in np.arange(0, w, sw): row = [] for j in np.arange(0, h, sh): # 取池化窗口中的最大值 top, bottom = max(0, i - hkw), min(w, i + hkw + 1) left, right = max(0, j - hkh), min(h, j + hkh + 1) window = data[top:bottom, left:right] row.append( np.max(window) ) # 記錄最大值在原特徵圖中的位置 pos = np.argmax(window) w_width = right - left offset_w, offset_h = top + pos // w_width, left + pos % w_width offset = offset_w * w + offset_h tmp = np.zeros(dim) tmp[offset] = 1 flag.append(tmp) result.append(row) self.flag = np.mat(flag) self.value = np.mat(result) def get_jacobi(self, parent): assert parent is self.parents[0] and self.jacobi is not None return self.flag
MaxPooling 節點接受 size 參數指定池化窗口(池化核)大小,它是一個包含寬和高的 tuple 。MaxPooling 節點還接受 stride 參數,它也是一個 tuple ,指定兩個方向的步幅。compute 方法移動池化窗口,取窗口中的最大值,同時以標誌位的形式記錄下被取的最大值在原特徵圖點的位置,get_jacobi 就以這個標誌位作為雅可比,具體見程式碼。
▌3. 輔助性節點
我們需要一個 reshape 運算元改變數據的形狀(將二維特徵圖轉成一維向量,連接全連接層),程式碼如下(node.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py
class Reshape(Node): """ 改變父節點的值(矩陣)的形狀 """ def __init__(self, parent, shape): Node.__init__(self, parent) assert isinstance(shape, tuple) and len(shape) == 2 self.to_shape = shape def compute(self): self.value = self.parents[0].value.reshape(self.to_shape) def get_jacobi(self, parent): assert parent is self.parents[0] return np.mat(np.eye(self.dimension()))
卷積部分結束時,數據的形狀是一組多個特徵圖。這時若要連接全連接層,需要將這些特徵圖展平並連接成一個向量。Flatten 節點肩負這個任務,程式碼如下(node.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/node.py
class Flatten(Node): """ 將多個父節點的值連接成向量 """ def compute(self): assert len(self.parents) > 0 # 將所有負矩陣展平並連接成一個向量 self.value = np.concatenate( [p.value.flatten() for p in self.parents], axis=1 ).T def get_jacobi(self, parent): assert parent in self.parents dimensions = [p.dimension() for p in self.parents] # 各個父節點的元素數量 pos = self.parents.index(parent) # 當前是第幾個父節點 dimension = parent.dimension() # 當前父節點的元素數量 assert dimension == dimensions[pos] jacobi = np.mat(np.zeros((self.dimension(), dimension))) start_row = int(np.sum(dimensions[:pos])) jacobi[start_row:start_row + dimension, 0:dimension] = np.eye(dimension) return jacobi
▌4. 構造「層」的輔助函數
我們寫幾個構造 CNN 的「層」(layer)的函數,以方便網路的構建,程式碼如下(layer.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/layer.py
from node import * def conv(feature_maps, input_shape, kernels, kernel_shape, activation): """ :param feature_maps: 數組,包含多個輸入特徵圖,它們應該是值為同形狀的矩陣的節點 :param input_shape: tuple ,包含輸入特徵圖的形狀(寬和高) :param kernels: 整數,卷積層的卷積核數量 :param kernel_shape: tuple ,卷積核的形狀(寬和高) :param activation: 激活函數類型 :return: 數組,包含多個輸出特徵圖,它們是值為同形狀的矩陣的節點 """ outputs = [] for i in range(kernels): channels = [] for fm in feature_maps: kernel = Variable(kernel_shape, init=True, trainable=True) conv = Convolve(fm, kernel) channels.append(conv) channles = Add(*channels) bias = Variable(input_shape, init=True, trainable=True) affine = Add(channles, bias) if activation == "ReLU": outputs.append(ReLU(affine)) elif activation == "Logistic": outputs.append(Logistic(affine)) else: outputs.append(affine) assert len(outputs) == kernels return outputs
conv 函數接受保存多個輸入特徵圖(或者稱作輸入影像的多個通道)的數組、輸入特徵圖的尺寸、卷積核數、卷積核尺寸、激活函數種類("ReLU" 或 "Logistic" 以及未來會有的其他種類)。conv 的返回值也是一個數組,包含卷積層的多個輸出特徵圖(通道),該數量與卷積核數量一致。
def pooling(feature_maps, kernel_shape, stride): """ :param feature_maps: 數組,包含多個輸入特徵圖,它們應該是值為同形狀的矩陣的節點 :param kernel_shape: tuple ,池化核(窗口)的形狀(寬和高) :param stride: tuple ,包含橫向和縱向步幅 :return: 數組,包含多個輸出特徵圖,它們是值為同形狀的矩陣的節點 """ outputs = [] for fm in feature_maps: outputs.append(MaxPooling(fm, size=kernel_shape, stride=stride)) return outputs
pooling 函數構造一個池化層(目前只支援最大值池化)。該函數接受保存多個輸入特徵圖的數組、池化核(即池化窗口)尺寸、橫向和縱向的步幅。pooling 函數返回一個數組,包含多個輸出特徵圖。
def fc(input, input_size, size, activation): """ :param input: 輸入向量 :param input_size: 輸入向量的維度 :param size: 神經元個數,即輸出個數(輸出向量的維度) :param activation: 激活函數類型 :return: 輸出向量 """ weights = Variable((size, input_size), init=True, trainable=True) bias = Variable((size, 1), init=True, trainable=True) affine = Add(MatMul(weights, input), bias) if activation == "ReLU": return ReLU(affine) elif activation == "Logistic": return Logistic(affine) else: return affine
fc 函數構造全連接層,接受輸入數量(輸入向量的維數),神經元個數(輸出向量的維數)以及激活函數的種類。
▌5. 構造卷積神經網路
現在我們就可以搭建一個簡單的卷積神經網路了,程式碼如下(test_cnn.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/test_cnn.py
from sklearn.metrics import accuracy_score from node import * from util import mnist from optimizer import * from layer import * print(__file__) train_x, train_y, test_x, test_y = mnist('./data/MNIST') test_x = test_x[:1000] test_y = test_y[:1000] # train_x = train_x[:10000] # train_y = train_y[:10000] img_shape = (28, 28) img = Variable(img_shape, init=False, trainable=False) # 佔位符,28x28 的影像 conv1 = conv([img], img_shape, 6, (3, 3), "ReLU") # 第一卷積層 pooling1 = pooling(conv1, (3, 3), (2, 2)) # 第一池化層 conv2 = conv(pooling1, (14, 14), 6, (3, 3), "ReLU") # 第二卷積層 pooling2 = pooling(conv2, (3, 3), (2, 2)) # 第二池化層 fc1 = fc(Flatten(*pooling2), 294, 100, "ReLU") # 第一全連接層 logits = fc(fc1, 100, 10, "None") # 第二全連接層,無激活函數 # 分類概率 prob = SoftMax(logits) # 訓練標籤 label = Variable((10, 1), trainable=False) # 交叉熵損失 loss = CrossEntropyWithSoftMax(logits, label) # Adam 優化器 optimizer = Adam(default_graph, loss, 0.005, batch_size=64) # 訓練 print("start training") for e in range(10): for i in range(len(train_x)): img.set_value(np.mat(train_x[i, :]).reshape(28, 28)) label.set_value(np.mat(train_y[i, :]).T) # 執行一步優化 optimizer.one_step() # 顯示進度 loss.forward() percent = int((i + 1) / len(train_x) * 100) print("".join(["="] * percent) + "> loss:{:.3f} {:d}({:.0f}%)".format(loss.value[0, 0], i + 1, percent)) if i > 1 and (i + 1) % 5000 == 0: # 在測試集上評估模型正確率 probs = [] losses = [] for j in range(len(test_x)): img.set_value(np.mat(test_x[j, :]).reshape(28, 28)) label.set_value(np.mat(test_y[j, :]).T) # 前向傳播計算概率 prob.forward() probs.append(prob.value.A1) # 計算損失值 loss.forward() losses.append(loss.value[0, 0]) # print("test instance: {:d}".format(j)) # 取概率最大的類別為預測類別 pred = np.argmax(np.array(probs), axis=1) truth = np.argmax(test_y, axis=1) accuracy = accuracy_score(truth, pred) default_graph.draw() print("epoch: {:d}, iter: {:d}, loss: {:.3f}, accuracy: {:.2f}%".format(e + 1, i + 1, np.mean(losses), accuracy * 100))
這個卷積神經網路包含兩個卷積層,第一個卷積層有 6 個卷積核,第 2 個卷積層也有 6 個卷積核。卷積核的尺寸都是3X3。每個卷積層之後是一個 stride 為 2 的最大值池化層,它們將特徵圖尺寸縮小一半。經過第二個最大值池化層後,特徵圖尺寸成為 7X7=49。之後用 Flatten 運算元將 6 個特徵圖展平成 294 維向量,後接全連接層。第一個全連接層的輸出是 100 維向量,第二個全連接層輸出 10 維向量,作為十分類的 logits 。接下來是損失值部分,然後是訓練主循環。我們構造的 CNN 的計算圖如下:

我們構造的 CNN
▌6. 訓練 Sobel 濾波器
在兩年前的博文《卷積神經網路簡介》中,我們從可訓練濾波器的角度引入 CNN 。那時我們舉了一個例子:訓練 Sobel 運算元。我們用 (縱向)Sobel 運算元對萊娜圖做濾波,效果是強化原圖中的縱向邊緣。之後,以原圖為輸入,以 Sobel 運算元濾出來的圖為 target ,構造一個計算圖,對輸入施加 3X3 的濾波,輸出與原圖同尺寸的影像,以輸出影像與 target 影像所有像素的誤差平方和作為損失,將計算圖的隨機卷積核訓練成 Sobel 濾波器,我們看程式碼(test_sobel.py):
https://gitee.com/zhangjuefei/computing_graph_demo/blob/master/lost%20&%20found/test_sobel.py
import skimage, os import numpy as np from node import * from optimizer import * from scipy.ndimage.filters import convolve import matplotlib.pyplot as plt # Sobel 濾波器 filter = np.mat([ [1., 0., -1.], [2., 0., -2.], [1., 0., -1.]]) # 影像尺寸 w, h = 128, 128 # 搭建計算圖 input_img = Variable((w, h), init=False, trainable=False) # 輸入影像佔位變數 target_img = Variable((w, h), init=False, trainable=False) # target 影像佔位變數 kernel = Variable((3, 3), init=True, trainable=True) # 被訓練的卷積核 # 對輸入影像施加濾波 conv_img = Convolve(input_img, kernel) # 將輸入影像和 target 影像展成向量 target_img_flat = Reshape(target_img, shape=(w * h, 1)) conv_img_flat = Reshape(conv_img, shape=(w * h, 1)) # 常數(矩陣)[[-1]] minus = Variable((1, 1), init=False, trainable=False) minus.set_value(np.mat(-1)) # 常數(矩陣):影像總像素數的倒數 n = Variable((1, 1), init=False, trainable=False) n.set_value(np.mat(1.0 / (w * h))) # 損失值:輸出影像與 target 影像的平均像素平方誤差 loss = MatMul(Dot( Add(target_img_flat, MatMul(conv_img_flat, minus)), Add(target_img_flat, MatMul(conv_img_flat, minus)) ), n) # RMSProp 優化器 optimizer = Adam(default_graph, loss, 0.06, batch_size=1) # 讀取 lena 圖,將 rgb 影像轉成單通道灰度圖,並 resize 成指定大小 img = skimage.transform.resize( skimage.color.rgb2gray( skimage.io.imread("lena.png") ), (w, h) ) # 製作目標影像:對原影像施加 Sobel 濾波器 sobel = np.mat([[1,0,-1],[2,0,-2],[1,0,-1]]) target = np.zeros((w, h)) convolve(input=img, output=target, weights=filter, mode="constant", cval=0.0) # 保存原圖和經過 Sobel 濾波的影像 skimage.io.imsave("origin.png", np.minimum(np.maximum(img, 0.0), 1.0)) skimage.io.imsave("target.png", np.minimum(np.maximum(target, 0.0), 1.0)) # 以原圖為輸入,以經過 Sobel 濾波的影像為 target input_img.set_value(np.mat(img)) target_img.set_value(np.mat(target)) i = 0 for e in range(1000): # 一次迭代 optimizer.one_step() # 計算損失值 loss.forward() print("pic:{:s},loss:{:.6f}".format(p, loss.value[0, 0])) # 保存當前卷積核對輸入影像做濾波的結果 fname = "{:d}.png".format(i) if os.path.exists(fname): os.remove(fname) skimage.io.imsave(fname, np.minimum(np.maximum(conv_img.value, 0.0), 1.0)) i += 1
迭代進行到 302 次時,損失值已經不怎麼下降了,我們手動終止了進程。看看每次迭代輸出的影像與 target 影像:

輸出影像與 target 影像的比較
可見,輸出影像已經很接近 Sobel 濾波的 target 影像了。這時,被訓練的 kernel 節點的值是:

被訓練的卷積核節點(kernel)的值
卷積核已經被訓練得比較接近 Sobel 運算元了。
作者介紹
張覺非,本科畢業於復旦大學,碩士畢業於中國科學院大學,先後任職於新浪微博、阿里,目前就職於奇虎360,任機器學習技術專家。
對作者感興趣的小夥伴,歡迎點擊文末閱讀原文,與作者交流。
——END——
文章推薦:
點下「在看」,給文章蓋個戳吧!?