機器學習建模高級用法!構建企業級AI建模流水線 ⛵

💡 作者:韓信子@ShowMeAI
📘 機器學習實戰系列: //www.showmeai.tech/tutorials/41
📘 本文地址//www.showmeai.tech/article-detail/287
📢 聲明:版權所有,轉載請聯繫平台與作者並註明出處
📢 收藏ShowMeAI查看更多精彩內容

機器學習與流水線(pipeline)簡介

我們知道機器學習應用過程包含很多步驟,如圖所示『標準機器學習應用流程』,有數據預處理、特徵工程、模型訓練、模型迭代優化、部署預估等環節。

在簡單分析與建模時,可以對每個板塊進行單獨的構建和應用。但在企業級應用中,我們更希望機器學習項目中的不同環節有序地構建成工作流(pipeline),這樣不同流程步驟更易於理解、可重現、也可以防止數據泄漏等問題。

常用的機器學習建模工具,比如 Scikit-Learn,它的高級功能就覆蓋了 pipeline,包含轉換器、模型和其他模塊等。

關於 Scikit-Learn 的應用方法可以參考ShowMeAI 📘機器學習實戰教程 中的文章 📘SKLearn最全應用指南,也可以前往 Scikit-Learn 速查表 獲取高密度的知識點清單。

但是,SKLearn 的簡易用法下,如果我們把外部工具庫,比如處理數據樣本不均衡的 imblearn合併到 pipeline 中,卻可能出現不兼容問題,比如有如下報錯:

TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 『passthrough』 『SMOTE()』 (type <class 『imblearn.over_sampling._smote.base.SMOTE』>) doesn』t

本文以『客戶流失』為例,講解如何構建 SKLearn 流水線,具體地說包含:

  • 構建一個流水線(pipeline) ,會覆蓋到 Scikit-Learn、 imblearn 和 feature-engine 工具的應用
  • 在編碼步驟(例如 one-hot 編碼)之後提取特徵
  • 構建特徵重要度圖

最終解決方案如下圖所示:在一個管道中組合來自不同包的多個模塊。

我們下面的方案流程,覆蓋了上述的不同環節:

  • 步驟 ①:數據預處理:數據清洗
  • 步驟 ②:特徵工程:數值型和類別型特徵處理
  • 步驟 ③:樣本處理:類別非均衡處理
  • 步驟 ④:邏輯回歸、xgboost、隨機森林 及 投票集成
  • 步驟 ⑤:超參數調優與特徵重要度分析

💡 步驟0:準備和加載數據

我們先導入所需的工具庫。

# 數據處理與繪圖
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Sklearn工具庫
from sklearn.model_selection import train_test_split, RandomizedSearchCV, RepeatedStratifiedKFold, cross_validate

# pipeline流水線相關
from sklearn import set_config
from sklearn.pipeline import make_pipeline, Pipeline
from imblearn.pipeline import Pipeline as imbPipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler

# 常數列、缺失列、重複列 等處理
from feature_engine.selection import DropFeatures, DropConstantFeatures, DropDuplicateFeatures

# 非均衡處理、樣本採樣
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

# 建模模型
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.inspection import permutation_importance
from scipy.stats import loguniform

# 流水線可視化
set_config(display="diagram")

如果你之前沒有聽說過 imblearn 和 feature-engine 工具包,我們做一個簡單的說明:

  • 📘Imblearn 可以處理類別不平衡的分類問題,內置不同的採樣策略
  • 📘feature-engine 用於特徵列的處理(常數列、缺失列、重複列 等)

數據集:報紙訂閱用戶流失

我們這裡用到的數據集來自 Kaggle 比賽 Newspaper churn。數據集包括15856條現在或曾經訂閱該報紙的個人記錄。

🏆 實戰數據集下載(百度網盤):公眾號『ShowMeAI研究中心』回復『實戰』,或者點擊 這裡 獲取本文 [14] 機器學習建模應用流水線 pipelineNewspaper churn 數據集

ShowMeAI官方GitHub//github.com/ShowMeAI-Hub

數據集包含人口統計信息,如代表家庭收入的HH信息、房屋所有權、小孩信息、種族、居住年份、年齡範圍、語言;地理信息如地址、州、市、縣和郵政編碼。另外,用戶選擇的訂閱期長,以及與之相關的收費數據。該數據集還包括用戶的來源渠道。最後會有字段表徵客戶是否仍然是我們的訂戶(是否流失)。

數據預處理與切分

我們先加載數據並進行預處理(例如將所有列名都小寫並將目標變量轉換為布爾值)。

# 讀取數據
data = pd.read_excel("NewspaperChurn new version.xlsx")

#數據預處理
data.columns = [k.lower().replace(" ", "_") for k in data.columns]
data.rename(columns={'subscriber':'churn'}, inplace=True)
data['churn'].replace({'NO':False, 'YES':True}, inplace=True)

# 類型轉換
data[data.select_dtypes(['object']).columns] = data.select_dtypes(['object']).apply(lambda x: x.astype('category'))

# 取出特徵列和標籤列
X = data.drop("churn", axis=1)
y = data["churn"]

# 訓練集驗證集切分
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)

預處理過後的數據應如下所示:

💡 步驟1:數據清洗

我們構建的 pipeline 流程的第一步是『數據清洗』,刪除對預測沒有幫助的列(比如 id 類字段,恆定值字段,或者重複的字段)。

# 步驟1:數據清洗+字段處理
ppl = Pipeline([
    ('drop_columns', DropFeatures(['subscriptionid'])),
    ('drop_constant_values', DropConstantFeatures(tol=1, missing_values='ignore')),
    ('drop_duplicates', DropDuplicateFeatures())
])

上面的代碼創建了一個 pipeline 對象,它包含 3 個步驟:drop_columnsdrop_constant_valuesdrop_duplicates

這些步驟是元組形態的,第一個元素定義了步驟的名稱(如 drop_columns),第二個元素定義了轉換器(如 DropFeatures())。

這些簡單的步驟,大家也可以通過 pandas 之類的外部工具輕鬆完成。 但是,我們在組裝流水線時的想法是在pipeline中集成儘可能多的功能。

💡 步驟2:特徵工程與數據變換

在前面剔除不相關的列之後,我們接下來做一下缺失值處理和特徵工程。 可以看到數據集包含不同類型的列(數值型和類別型 ),我們會針對這兩個類型定義兩個獨立的工作流程。

關於特徵工程,可以查看ShowMeAI 📘機器學習實戰教程 中的文章 📘機器學習特徵工程最全解讀

# 數據處理與特徵工程pipeline

ppl = Pipeline([
    # ① 剔除無關列
    ('drop_columns', DropFeatures(['subscriptionid'])),
    ('drop_constant_values', DropConstantFeatures(tol=1, missing_values='ignore')),
    ('drop_duplicates', DropDuplicateFeatures()),
    
    # ② 缺失值填充與數值/類別型特徵處理
    ('cleaning', ColumnTransformer([
        # 2.1: 數值型字段缺失值填充與幅度縮放
        ('num',make_pipeline(
            SimpleImputer(strategy='mean'),
            MinMaxScaler()),
         make_column_selector(dtype_include='int64')
        ),
        # 2.2:類別型字段缺失值填充與獨熱向量編碼
        ('cat',make_pipeline(
            SimpleImputer(strategy='most_frequent'),
            OneHotEncoder(sparse=False, handle_unknown='ignore')),
         make_column_selector(dtype_include='category')
        )])
    )
])

添加一個名為clearning 的步驟,對應一個 ColumnTransformer 對象。

ColumnTransformer 中,設置了兩個新 pipeline:一個用於處理數值型,一個用於類別型處理。 通過 make_column_selector 函數確保每次選出的字段類型是對的。

這裡使用 dtype_include 參數選擇對應類型的列,這個函數也可以提供列名列表或正則表達式來選擇。

💡 步驟3:類別非均衡處理(數據採樣)

在『用戶流失』和『欺詐識別』這樣的問題場景中,一個非常大的挑戰就是『類別不平衡』——也就是說,流失用戶相對於非流失用戶來說,數量較少。

這裡我們會採用到一個叫做 im``blearn 的工具庫來處理類別非均衡問題,它提供了一系列數據生成與採樣的方法來緩解上述問題。 本次選用 SMOTE 採樣方法來對少的類別樣本進行重採樣。

SMOTE類別非均衡處理

添加 SMOTE 步驟後的 pipeline 如下:

# 總體處理pipeline

ppl = Pipeline([
    # ① 剔除無關列
    ('drop_columns', DropFeatures(['subscriptionid'])),
    ('drop_constant_values', DropConstantFeatures(tol=1, missing_values='ignore')),
    ('drop_duplicates', DropDuplicateFeatures()),
    
    # ② 缺失值填充與數值/類別型特徵處理
    ('cleaning', ColumnTransformer([
        # 2.1: 數值型字段缺失值填充與幅度縮放
        ('num',make_pipeline(
            SimpleImputer(strategy='mean'),
            MinMaxScaler()),
         make_column_selector(dtype_include='int64')
        ),
        # 2.2:類別型字段缺失值填充與獨熱向量編碼
        ('cat',make_pipeline(
            SimpleImputer(strategy='most_frequent'),
            OneHotEncoder(sparse=False, handle_unknown='ignore')),
         make_column_selector(dtype_include='category')
        )])
    ),
    # ③ 類別非均衡處理:重採樣
    ('smote', SMOTE())
])

pipeline 特徵校驗

在最終構建集成分類器模型之前,我們查看一下經過 pipeline 處理得到的特徵名稱和其他信息。

pipeline 對象提供了一個名為 get_feature_names_out() 的函數,我們可以通過它獲取特徵名稱。但在使用它之前,我們必須在數據集上擬合。 由於第 ③ 步 SMOTE 處理僅關注我們的標籤 y 數據,我們暫時忽略它並專註於第 ① 和 ② 步。

# 擬合數據,獲取pipeline構建的特徵名稱和信息
ppl_fts = ppl[0:4]
ppl_fts.fit(X_train, y_train)
features = ppl_fts.get_feature_names_out()
pd.Series(features)

結果如下所示:

0                    num__year_of_residence
1                             num__zip_code
2                       num__reward_program
3        cat__hh_income_$  20,000 - $29,999
4        cat__hh_income_$  30,000 - $39,999
                        ...                
12122               cat__source_channel_TMC
12123            cat__source_channel_TeleIn
12124           cat__source_channel_TeleOut
12125               cat__source_channel_VRU
12126          cat__source_channel_iSrvices
Length: 12127, dtype: object

由於獨熱向量編碼,許多帶着 cat_ 開頭(代表 category)的特徵名已被創建。

如果大家想得到上面流程圖一樣的 pipeline 可視化,只需在代碼中做一點小小的修改,在調用 pipeline 對象之前在您的代碼中添加 set_config(display="diagram")

💡 步驟4:構建集成分類器

下一步我們訓練多個模型,並使用功能強大的集成模型(投票分類器)來解決當前問題。

關於這裡使用到的邏輯回歸、隨機森林和 xgboost 模型,大家可以在 ShowMeAI 的 📘圖解機器學習算法教程 中看到詳細的原理講解。

# 邏輯回歸模型
lr = LogisticRegression(warm_start=True, max_iter=400)
# 隨機森林模型
rf = RandomForestClassifier()
# xgboost
xgb = XGBClassifier(tree_method="hist", verbosity=0, silent=True)
# 用投票器進行集成
lr_xgb_rf = VotingClassifier(estimators=[('lr', lr), ('xgb', xgb), ('rf', rf)], 
                             voting='soft')

定義集成模型後,我們也把它集成到我們的 pipeline 中。

# 總體處理pipeline

ppl = imbPipeline([
    # ① 剔除無關列
    ('drop_columns', DropFeatures(['subscriptionid'])),
    ('drop_constant_values', DropConstantFeatures(tol=1, missing_values='ignore')),
    ('drop_duplicates', DropDuplicateFeatures()),
    
    # ② 缺失值填充與數值/類別型特徵處理
    ('cleaning', ColumnTransformer([
        # 2.1: 數值型字段缺失值填充與幅度縮放
        ('num',make_pipeline(
            SimpleImputer(strategy='mean'),
            MinMaxScaler()),
         make_column_selector(dtype_include='int64')
        ),
        # 2.2:類別型字段缺失值填充與獨熱向量編碼
        ('cat',make_pipeline(
            SimpleImputer(strategy='most_frequent'),
            OneHotEncoder(sparse=False, handle_unknown='ignore')),
         make_column_selector(dtype_include='category')
        )])
    ),
    # ③ 類別非均衡處理:重採樣
    ('smote', SMOTE()),
    # ④ 投票器集成
    ('ensemble', lr_xgb_rf)
])

大家可能會注意到,我們在第1行中使用到的 Pipeline 替換成了 imblearn 的 imbPipeline 。這是很關鍵的一個處理,如果我們使用 SKLearn 的 pipeline,在擬合時會出現文初提到的錯誤:

TypeError: All intermediate steps should be transformers and implement fit and transform or be the string 'passthrough' 'SMOTE()' (type <class 'imblearn.over_sampling._smote.base.SMOTE'>) doesn't

到這一步,我們就把基本的 pipeline 流程構建好了。

💡 步驟5:超參數調整和特徵重要性

超參數調優

我們構建的整條建模流水線中,很多組件都有超參數可以調整,這些超參數會影響最終的模型效果。對 pipeline 如何進行超參數調優呢,我們選用隨機搜索 RandomizedSearchCV 對超參數進行調優,代碼如下。

關於搜索調參的詳細原理知識,大家可以查看 ShowMeAI 在文章 📘網絡優化: 超參數調優、正則化、批歸一化和程序框架 中的介紹。

大家特別注意代碼中的命名規則。

# 超參數調優
params = {
    'ensemble__lr__solver': ['newton-cg', 'lbfgs', 'liblinear'],
    'ensemble__lr__penalty': ['none', 'l1', 'l2', 'elasticnet'],
    'ensemble__lr__C': loguniform(1e-5, 100),
    'ensemble__xgb__learning_rate': [0.1],
    'ensemble__xgb__max_depth': [7, 10, 15, 20],
    'ensemble__xgb__min_child_weight': [10, 15, 20, 25],
    'ensemble__xgb__colsample_bytree': [0.8, 0.9, 1],
    'ensemble__xgb__n_estimators': [300, 400, 500, 600],
    'ensemble__xgb__reg_alpha': [0.5, 0.2, 1],
    'ensemble__xgb__reg_lambda': [2, 3, 5],
    'ensemble__xgb__gamma': [1, 2, 3],
    'ensemble__rf__max_depth': [7, 10, 15, 20],
    'ensemble__rf__min_samples_leaf': [1, 2, 4],
    'ensemble__rf__min_samples_split': [2, 5, 10],
    'ensemble__rf__n_estimators': [300, 400, 500, 600],
}

# 隨機搜索調參
rsf = RepeatedStratifiedKFold(random_state=42)
clf = RandomizedSearchCV(ppl, params,scoring='roc_auc', verbose=2, cv=rsf)
clf.fit(X_train, y_train)

# 輸出信息
print("Best Score: ", clf.best_score_)
print("Best Params: ", clf.best_params_)
print("AUC:", roc_auc_score(y_val, clf.predict(X_val)))

解釋一下上面代碼中的超參數命名:

  • 第一個參數( ensemble__ ):我們的 VotingClassifier 的名稱
  • 第二個參數( lr__ ):我們集成中使用的模型的名稱
  • 第三個參數( solver ):模型相關超參數的名稱

因為這裡是類別不平衡場景,我們使用重複分層 k-fold ( RepeatedStratifiedKFold)。

超參數調優這一步也不是必要的,在簡單的場景下,大家可以直接使用默認參數,或者在定義模型的時候敲定超參數。

特徵重要度圖

為了不讓我們的模型成為黑箱模型,我們希望對模型做一些解釋,其中最重要的是歸因分析,我們希望了解哪些特徵是重要的,這裡我們對特徵重要度進行繪製。

# //inria.github.io/scikit-learn-mooc/python_scripts/dev_features_importance.html
# 繪製特徵重要度
def plot_feature_importances(perm_importance_result, feat_name):
    """ bar plot the feature importance """
    fig, ax = plt.subplots()


    indices = perm_importance_result['importances_mean'].argsort()
    plt.barh(range(len(indices)),
             perm_importance_result['importances_mean'][indices],
             xerr=perm_importance_result['importances_std'][indices])
    ax.set_yticks(range(len(indices)))
    ax.set_title("Permutation importance")
    
    tmp = np.array(feat_name)
    _ = ax.set_yticklabels(tmp[indices])


# 獲取特徵名稱
ppl_fts = ppl[0:4]
ppl_fts.fit(X_train, y_train)
features = ppl_fts.get_feature_names_out()


# 用亂序法進行特徵重要度計算和排列,以及繪圖
perm_importance_result_train = permutation_importance(clf, X_train, y_train, random_state=42)
plot_feature_importances(perm_importance_result_train, features)

上述代碼運行後的結果圖如下,我們可以看到特徵 hh_income 在預測中佔主導地位。 由於這個特徵其實是可以排序的(比如 30-40k 比 150-175k 要小),我們可以使用不同的編碼方式(比如使用 LabelEncoding 標籤編碼)。

以上就是完整的機器學習流水線構建過程,大家可以看到,pipeline 可以把不同的環節集成在一起,一次性運行與調優,代碼和流程都更為簡潔緊湊,效率也更高。

參考資料