AI+醫療:使用神經網路進行醫學影像識別分析 ⛵

💡 作者:韓信子@ShowMeAI
📘 電腦視覺實戰系列//www.showmeai.tech/tutorials/46
📘 行業名企應用系列//www.showmeai.tech/tutorials/63
📘 本文地址//www.showmeai.tech/article-detail/298
📢 聲明:版權所有,轉載請聯繫平台與作者並註明出處
📢 收藏ShowMeAI查看更多精彩內容

💡 深度學習+醫療科技

近年高速發展的人工智慧技術應用到了各個垂直領域,比如把深度學習應用於各種醫學診斷,效果顯著甚至在某些方面甚至超過了人類專家。典型的 CV 最新技術已經應用於阿爾茨海默病的分類、肺癌檢測、視網膜疾病檢測等醫學成像任務中。

💡 影像分割

影像分割是將影像按照內容物切分為不同組的過程,它定位出了影像中的對象和邊界。語義分割是像素級別的識別,我們在很多領域的典型應用,背後的技術支撐都是影像分割演算法,比如:醫學影像、無人駕駛可行駛區域檢測、背景虛化等。

本文涉及到的深度學習基礎知識,及電腦視覺詳細知識,推薦大家閱讀ShowMeAI的教程專欄:

💡 語義分割典型網路 U-Net

U-Net 是一種卷積網路架構,用於快速、精確地分割生物醫學影像。

關於語義分割的各類演算法原理及優缺點對比(包括U-Net),ShowMeAI 在過往文章 📘 深度學習與CV教程(14) | 影像分割 (FCN,SegNet,U-Net,PSPNet,DeepLab,RefineNet) 中有詳細詳解。

U-Net 的結構如下圖所示:


在 U-Net 中,與其他所有卷積神經網路一樣,它由卷積和最大池化等層次組成。

  • U-Net 簡單地將編碼器的特徵圖拼接至每個階段解碼器的上取樣特徵圖,從而形成一個梯形結構。該網路非常類似於 Ladder Network 類型的架構。
  • 通過跳躍 拼接 連接的架構,在每個階段都允許解碼器學習在編碼器池化中丟失的相關特徵。
  • 上取樣採用轉置卷積。

💡 使用 U-Net 進行肺部影像分割

我們這裡使用到的數據集是 🏆 蒙哥馬利縣 X 射線醫學數據集。 該數據集由肺部的各種 X 射線影像以及每個 X 射線的左肺和右肺的分段影像的影像組成。大家也可以直接通過ShowMeAI的百度網盤鏈接下載此數據集。

🏆 實戰數據集下載(百度網盤):公眾號『ShowMeAI研究中心』回復『實戰』,或者點擊 這裡 獲取本文 [10] 使用神經網路進行肺部醫學影像識別與分析masked montgomery county x-ray set 肺部醫學影像數據集

ShowMeAI官方GitHub//github.com/ShowMeAI-Hub

① 工具庫導入&環境設置

首先導入我們本次使用到的工具庫。

# 導入工具庫
import os
import numpy as np
import cv2
from glob import glob
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision

② 數據讀取

接下來我們完成數據讀取部分,這裡讀取的內容包括影像和蒙版(mask,即和圖片同樣大小的標籤)。我們會調整維度大小,以便可以作為 U-Net 的輸入。

# 讀取X射線影像
def imageread(path,width=512,height=512):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (width, height))
    x = x/255.0
    x = x.astype(np.float32)
    return x

# 讀取標籤蒙版
def maskread(path_l, path_r,width=512,height=512):
    x_l = cv2.imread(path_l, cv2.IMREAD_GRAYSCALE)
    x_r = cv2.imread(path_r, cv2.IMREAD_GRAYSCALE)
    x = x_l + x_r
    x = cv2.resize(x, (width, height))
    x = x/np.max(x)
    x = x > 0.5
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x

③ 數據切分

我們要對模型的效果進行有效評估,所以接下來我們進行數據劃分,我們把全部數據分為訓練集、驗證集和測試集。具體程式碼如下:

"""載入與切分數據"""
def load_data(path, split=0.1):
    images = sorted(glob(os.path.join(path, "CXR_png", "*.png")))
    masks_l = sorted(glob(os.path.join(path, "ManualMask", "leftMask", "*.png")))
    masks_r = sorted(glob(os.path.join(path, "ManualMask", "rightMask", "*.png")))
    split_size = int(len(images) * split) # 9:1的比例切分
    train_x, val_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y_l, val_y_l = train_test_split(masks_l, test_size=split_size, random_state=42)
    train_y_r, val_y_r = train_test_split(masks_r, test_size=split_size, random_state=42)
    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
    train_y_l, test_y_l = train_test_split(train_y_l, test_size=split_size, random_state=42)
    train_y_r, test_y_r = train_test_split(train_y_r, test_size=split_size, random_state=42)

    return (train_x, train_y_l, train_y_r), (val_x, val_y_l, val_y_r), (test_x, test_y_l, test_y_r)

④ TensorFlow IO準備

我們會使用到 TensorFlow 進行訓練和預估,我們用 TensorFlow 讀取 numpy array 格式的數據,轉為 TensorFlow 的 tensor 形式,並構建方便以 batch 形態讀取和訓練的 dataset 格式。

# tensor格式轉換
def tf_parse(x, y_l, y_r):
    def _parse(x, y_l, y_r):
        x = x.decode()
        y_l = y_l.decode()
        y_r = y_r.decode()
        x = imageread(x)
        y = maskread(y_l, y_r)
        return x, y
    x, y = tf.numpy_function(_parse, [x, y_l, y_r], [tf.float32, tf.float32])
    x.set_shape([512, 512, 3])
    y.set_shape([512, 512, 1])
    return x, y

# 構建tensorflow dataset
def tf_dataset(X, Y_l, Y_r, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y_l, Y_r))
    dataset = dataset.shuffle(buffer_size=200)
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(4)
    return dataset

⑤ U-Net 網路構建

下面我們構建 U-Net 網路。

from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model

# 一個卷積塊結構
def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

# 編碼器模組
def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p

# 解碼器模組
def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

# 完整的U-Net
def build_unet(input_shape):
    inputs = Input(input_shape)
    
    # 編碼器部分
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024)
    
    # 解碼器部分
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)
    
    # 輸出
    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model

⑥ 評估準則與損失函數

我們針對語義分割場景,編寫評估準則 IoU 的計算方式,並構建 Dice Loss 損失函數以便在醫療場景語義分割下更針對性地訓練學習。

關於IoU、mIoU等評估準則可以查看ShowMeAI的文章 📘 深度學習與CV教程(14) | 影像分割 (FCN,SegNet,U-Net,PSPNet,DeepLab,RefineNet) 做更多了解。

關於Dice Loss損失函數的解釋如下:

📌 Dice 係數

根據 Lee Raymond Dice 命名,是一種集合相似度度量函數,通常用於計算兩個樣本的相似度(值範圍為 \([0, 1]\)):

\[s = \frac{2|X \cap Y|}{|X|+|Y|}
\]

\(|X \cap Y|\)表示 \(X\)\(Y\) 之間的交集;\(|X|\)\(|Y|\) 分別表示 \(X\)\(Y\) 的元素個數。其中,分子中的係數 \(2\),是因為分母存在重複計算 \(X\)\(Y\) 之間的共同元素的原因。

針對,語義分割問題而言,\(X\) 為分割影像標準答案 GT,\(Y\) 為分割影像預測標籤 Pred。

📌 Dice 係數差異函數(Dice loss)

\[s =1- \frac{2|X \cap Y|}{|X|+|Y|}
\]

評估準則與損失函數的程式碼實現如下:

# IoU計算
def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

# Dice Loss定義
smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

⑦ 超參數設置與模型編譯

接下來在開始模型訓練之前,我們先敲定一些超參數,如下:

  • 批次大型 batch size = 2
  • 學習率 learning rate= 1e-5
  • 迭代輪次 epoch = 30

我們使用 Adam 優化器進行訓練,使用的評估指標包括 Dice 係數、IoU、召回率和精度。

# 超參數
batch_size = 2
lr = 1e-5
epochs = 30
model_path = "models/model.h5"

# 讀取數據
dataset_path = './NLM-MontgomeryCXRSet/MontgomerySet'
(train_x, train_y_l, train_y_r), (val_x, val_y_l, val_y_r), (test_x, test_y_l, test_y_r) = load_data(dataset_path)

# 訓練集與驗證集
train_dataset = tf_dataset(train_x, train_y_l, train_y_r, batch=batch_size)
val_dataset = tf_dataset(val_x, val_y_l, val_y_r, batch=batch_size)

# 構建模型
model = build_unet((512, 512, 3))
# 評估準則
metrics = [dice_coef, iou, Recall(), Precision()]
# 編譯模型
model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=metrics)

可以使用model.summary查看模型結構資訊與參數量:

model . summary()

結果如下圖所示(部分內容截圖,全部模型資訊較長):

⑧ 回調函數&模型訓練

我們在回調函數中設置模型存儲相關設置,學習率調整策略等,之後在數據集上進行訓練。

# 回調函數
callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-8, verbose=1)
        ]

# 模型訓練
history = model.fit(
        train_dataset,
        epochs=epochs,
        validation_data=val_dataset,
        callbacks=callbacks
    )

訓練部分中間資訊如下圖所示。


在訓練模型超過 30 個 epoch 後,保存的模型(驗證損失為 0.10216)相關的評估指標結果如下:

  • dice coef:0.9148
  • iou:0.8441
  • recall:0.9865
  • precision:0.9781
  • val_loss:0.1022
  • val_dice_coef: 0.9002
  • val_iou:0.8198
  • val_recall:0.9629
  • val_precision:0.9577

⑨ 模型載入與新數據預估

我們可以把剛才保存好的模型重新載入入記憶體,並對沒有見過的測試數據集進行預估,程式碼如下:

# 重新載入模型
from tensorflow.keras.utils import CustomObjectScope
with CustomObjectScope({'iou': iou, 'dice_coef': dice_coef, 'dice_loss': dice_loss}):
  model = tf.keras.models.load_model("/content/model.h5")


# 測試集預估
from tqdm import tqdm
import matplotlib.pyplot as plt
ct=0

# 遍歷測試集
for x, y_l, y_r in tqdm(zip(test_x, test_y_l, test_y_r), total=len(test_x)):
    """ Extracing the image name. """
    image_name = x.split("/")[-1]

    # 讀取測試圖片集
    ori_x = cv2.imread(x, cv2.IMREAD_COLOR)
    ori_x = cv2.resize(ori_x, (512, 512))
    x = ori_x/255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=0)

    # 讀取標籤資訊
    ori_y_l = cv2.imread(y_l, cv2.IMREAD_GRAYSCALE)
    ori_y_r = cv2.imread(y_r, cv2.IMREAD_GRAYSCALE)
    ori_y = ori_y_l + ori_y_r
    ori_y = cv2.resize(ori_y, (512, 512))
    ori_y = np.expand_dims(ori_y, axis=-1)  # (512, 512, 1)
    ori_y = np.concatenate([ori_y, ori_y, ori_y], axis=-1)  # (512, 512, 3)

    # 預估
    y_pred = model.predict(x)[0] > 0.5
    y_pred = y_pred.astype(np.int32)
    #plt.imshow(y_pred)

    # 存儲預估結果mask
    save_image_path = "./"+str(ct)+".png"
    ct+=1
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    sep_line = np.ones((512, 10, 3)) * 255
    cat_image = np.concatenate([ori_x, sep_line, ori_y, sep_line, y_pred*255], axis=1)
    cv2.imwrite(save_image_path, cat_image)

部分結果可視化:

下面為2個測試樣本的原始影像、原始掩碼(標準答案)和預測掩碼的組合影像:

測試用例的輸入影像(左側)、原始掩碼標籤(中間)、預測掩碼(右側)

參考資料