­

客戶流失?來看看大廠如何基於spark+機器學習構建千萬數據規模上的用戶留存模型 ⛵

💡 作者:韓信子@ShowMeAI
📘 大數據技術 ◉ 技能提升系列//www.showmeai.tech/tutorials/84
📘 行業名企應用系列//www.showmeai.tech/tutorials/63
📘 本文地址//www.showmeai.tech/article-detail/296
📢 聲明:版權所有,轉載請聯繫平台與作者並註明出處
📢 收藏ShowMeAI查看更多精彩內容

💡 背景

Sparkify 是一個音樂流媒體平台,用戶可以獲取部分免費音樂資源,也有不少用戶開啟了會員訂閱計劃(參考QQ音樂),在Sparkify中享受優質音樂內容。

用戶可以隨時對自己的會員訂閱計劃降級甚至取消,而當下極其內卷和競爭激烈的大環境下,獲取新客的成本非常高,因此維護現有用戶並確保他們長期會員訂閱至關重要。同時因為我們有很多用戶在平台的歷史使用記錄,基於這些數據支撐去挖掘客戶傾向,訂製合理的業務策略,也更加有保障和數據支撐。

但現在稍大一些的互聯網公司,數據動輒成百上千萬,我們要在這麼巨大的數據規模下完成挖掘與建模,又要藉助各種處理海量數據的大數據平台。在本文中ShowMeAI將結合 Sparkify 的業務場景和海量數據,講解基於 Spark 的客戶流失建模預測案例。

本文涉及到大數據處理分析及機器學習建模相關內容,ShowMeAI為這些內容製作了詳細的教程與工具速查手冊,大家可以通過如下內容展開學習或者回顧相關知識。

💡 數據

本文用到的 Sparkify 數據有3個大小的數據規格,大家可以根據自己的計算資源情況,選擇合適的大小,本文程式碼都兼容和匹配,對應的數據大家可以通過ShowMeAI的百度網盤地址獲取。

🏆 實戰數據集下載(百度網盤):公眾號『ShowMeAI研究中心』回復『實戰』,或者點擊 這裡 獲取本文 [9] Spark 海量數據上的用戶留存分析挖掘與建模sparkify 用戶流失數據集

⭐ ShowMeAI官方GitHub//github.com/ShowMeAI-Hub

  • mini_sparkify_event_data.json: 最小的數據子集 (125 MB)
  • medium-sparkify-event-data.json: 中型大小數據子集 (237 MB)
  • sparkify_event_data.json: 全量數據 (12 GB)

💡 探索性數據分析(EDA)

在進行建模之前,我們首先要深入了解我們的數據,這可以幫助我們更有針對性地構建特徵和選擇模型。也就是ShowMeAI之前提到過的「探索性數據分析(EDA)」的過程。

① 導入工具庫

# 基礎數據處理與繪圖
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import requests
from datetime import datetime
# spark相關
from pyspark.sql import SparkSession
from pyspark.sql import Window, Row
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, FloatType

② 初步數據探索

Sparkify 數據集中,每一個用戶的行為都被記錄成了一條帶有時間戳的操作記錄,包括用戶註銷、播放歌曲、點讚歌曲和降級訂閱計劃等。

# 初始化spark session
spark_session = SparkSession.builder \
                .master("local") \
                .appName("sparkify") \
                .getOrCreate()

# 載入數據與持久化
src = "data/mini_sparkify_event_data.json"
df = spark_session.read.json(src)
# 構建視圖(方便查詢)
df.createOrReplaceTempView("sparkify_table")
df.persist()

# 查看前5行數據
df . limit(5) . toPandas()

用全量數據集(12GB)做EDA可能會消耗大量的資源且很慢,所以這個過程我們選擇小子集(128MB)來完成,如果取樣方式合理,小子集上的數據分布能很大程度體現全量數據上的分布特性。

對於中小數據集上的EDA大家可以參考ShowMeAI分享過的自動化數據分析工具,可以更快捷地獲取一些數據資訊與分析結論。

📌 基礎數據維度資訊

# 查看數據維度資訊
print(f'數據集有 {len(df.columns)} 列')
print(f'數據集有 {df.count()} 行')

結果顯示有 18 列 和 286500 行。

實際這份小子集中只有 225 個唯一用戶 ID,這意味著平均每個客戶與平台有 286500/225≈1200 多個交互操作。

📌 欄位資訊

# 查看欄位資訊
df . printSchema()

我們通過上述命令查看數據欄位資訊,輸出結果如下,包含欄位名和類型等:

|-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

我們獲取的一些初步資訊如下:

  • 字元串類型的欄位包括 song, artist, genderlevel
  • 一些時間和ID類的欄位特徵 ts(時間戳),registration(時間戳),pageuserId
  • 可能作用不太大的一些欄位 firstName , lastName , method , status , userAgentauth等(等待進一步挖掘)

📌 時間跨度資訊

# 排序
df = df . sort('ts', ascending= False)
# 獲取最大最小時間戳
df . select(F . max(df . ts), F . min(df . ts)) . show()
# //www.programiz.com/python-programming/datetime/timestamp-datetime
# 轉換為日期
print("Min date =", datetime.fromtimestamp(1538352117000 / 1000))
print("Max date =", datetime.fromtimestamp(1543799476000 / 1000))
# 最早註冊時間
df.select(F.min(df.registration)).show()
print("Min register =", datetime.fromtimestamp(1521380675000 / 1000))

📌 欄位分布

# 統計欄位的不同取值數量
cols = df.columns
n_unique = []

for col in cols:
    n_unique.append(df.select(col).distinct().count())

pd.DataFrame(data={'col':cols, 'n_unique':n_unique}).sort_values('n_unique', ascending=False)

結果如下,ID類的屬性有最多的取值,其他的欄位屬性相對集中。

📌 類別型取值分布

我們來看看上面分析的尾部,分布比較集中的類別型欄位的取值有哪些。

 # method
df . select(['method']) . distinct() . show()
# level
df.select(['level']).distinct().show()
 # status
df . select(['status']) . distinct() . show()
# gender
df.select(['gender']).distinct().show()
 # auth
df . select(['auth']) . distinct() . show()

我們再看看取值中等和較多的欄位

 # page
df . select(['page']) . distinct() . show()
# userAgent
df.select(['userAgent']).distinct().show()
# artist
df.select(['artist']).distinct().show()
# song
df.select(['song']).distinct().show()

③ 缺失值分析

我們首先剔除掉userId為空的數據記錄,總共刪除了 8,346 行。

no_userId = df . where(df . userId == "")
no_userId . count()
no_userId . limit(10) . toPandas()
# 構建無userId缺失數據的視圖
df = df . where(df . userId != "")
df . createOrReplaceTempView("sparkify_table")

我們再統計一下其他欄位的缺失狀況

# 類別型欄位
general_string_type = ['auth', 'firstName', 'gender', 'lastName', 'level', 'location', 'method', 'page', 'userAgent', 'userId']
for col in general_string_type:
    null_vals = df.select(col).where(df[col].isNull()).count()
    print(f'{col}: {null_vals}')

# 數值型欄位
numerical_cols = ['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts']
for col in numerical_cols:
    null_vals = df.select(col).where(df[col] == np.nan).count()
    print(f'{col}: {null_vals}')
# 直接統計缺失值並輸出資訊
# Reference 
# //sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/

def make_missing_bool_index(c):
    '''
    Generates boolean index to check missing value/NULL values
    @param c (string) - string of column of dataframe
    returns boolean index created
    '''
    # removed checking these 2 since they would flag some incorrect rows, e.g. the song "None More Black" would be flagged
    # col(c).contains('None') | \
    # col(c).contains('NULL') | \

    bool_index = (F.col(c) == "") | \
    F.col(c).isNull() | \
    F.isnan(c)
    return bool_index

missing_count = [F.count(F.when(make_missing_bool_index(c), c)).alias(c)
                    for c in df.columns]

df.select(missing_count).toPandas()

④ EDA洞察&結論

由於我們的數據是基於各種有時間戳的交易來組織的,以事件為基礎(基於 “頁 “列),我們需要執行額外的特徵工程來訂製我們的數據以適應我們的機器學習模型。

📌 目標&問題

  • 用戶流失是什麼意思?是指取消訂閱嗎?

📌 重要欄位列

  • ts – 時間戳,在以下場景有用
    • 訂閱與取消之間的時間點資訊
    • 構建「聽歌的平均時間」特徵
    • 構建「聽歌之間的時間間隔」特徵
    • 基於時間戳構建數據樣本,比如選定用戶流失前的3個月或6個月
  • registration – 時間戳 – 用於識別交易的範圍
  • page – 用戶正在參與的事件
    • 本身並無用處
    • 需要進一步特徵工程,從頁面類型中提取資訊,或結合時間戳等資訊
  • userId
    • 本身並無用處
    • 基於用戶分組完成統計特徵

📌 配合特徵工程有用的欄位列

  • song – 歌名,可用於構建類似下述的特徵:
    • 用戶聽的不同歌曲數量
    • 用戶聽同一首歌的次數
  • artist– 歌手,可用於構建類似下述的特徵:
    • 每個用戶收聽的歌手數量
  • 因為是明文的歌名,我們甚至可以通過外部API補充資訊構建特徵:
    • 用戶收聽的音樂類型(並觀察類型是否影響流失率)。
  • gender – 性別
    • 不同性別的人可能有不同的音樂偏好。
  • level – 等級
    • 區分用戶是免費的還是付費的
  • location – 地區
    • 地域差別

📌 無用欄位列(我們會直接刪除)

  • firstNamelastName – 名字一般在模型中很難直接給到資訊。
  • method – 僅僅有PUT或GET取值,是網路請求類型,作用不大。
  • status– 僅僅是API響應,例如200/404,作用不大。
  • userAgent–指定用戶使用的瀏覽器類型
    • 有可能不同瀏覽器代表的用戶群體有差別,這個可以進一步調研
  • auth – 登入登出等資訊,作用不大

💡 數據處理

① 定義流失

我們的 page功能有 22 個獨特的標籤,代表用戶點擊或訪問的頁面,結合上面的數據分析大家可以看到頁面包括關於登錄註冊等。

可以幫助我們定義流失的頁面是 Cancellation Confirmation,表示 免費 和 付費 用戶均存在流媒體平台。

# 定義流失用戶
is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn("churn", is_churn(df.page))
df.createOrReplaceTempView("sparkify_table")

user_window = Window \
    .partitionBy('userId') \
    .orderBy(F.desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

# manually define schema
# //stackoverflow.com/questions/40517553/pyspark-valueerror-some-of-types-cannot-be-determined-after-inferring
tmp_row = spark_local.sparkContext.parallelize(Row(second_row)).toDF(schema=df.schema)
df.where(df.userId == 100001).union(tmp_row).withColumn('pre_churn', F.sum('churn').over(user_window)).limit(5).toPandas()

df = df.withColumn('preChurn', F.sum('churn').over(user_window))
df.createOrReplaceTempView("sparkify_table")

對用戶流失情況做簡單分析

spark_local.sql('''
    SELECT SUM(churn)
        FROM sparkify_table
        GROUP BY userId
''').toPandas().value_counts()

在我們取樣出來的小數據集中:有225 個用戶, 23%(52 個用戶)流失 。

② 特徵工程

關於特徵工程可以參考ShowMeAI的以下文章詳解

本文中所使用到的特徵工程如下:

  • ① 歌曲和歌手相關: uniqueSongs, uniqueArtists, uniqueSongArtist.
  • ② 用戶服務時長: dayServiceLen(註冊到上次與網站互動之間的天數)
  • ③ 用戶行為統計: countListen(收聽次數), countSession(session數量), lengthListen(聽的總時長)
  • ④ 使用②和③的組合 lengthListenPerDay, countListenPerDay, sessionPerDay
  • ⑤ 針對一些統計值(countListen , countSession, 和 lengthListen等)計算的差異度。

📌 清理數據

# 清理數據
def clean_data(df):
    '''
    Cleans raw dataframe to:
    i. sort values
    ii. remove null userId rows
    @param df: raw spark dataframe
    returns updated spark dataframe
    '''
    # sort values
    df = df.sort('ts', ascending=False)
    # remove null userIds
    df = df.where(df.userId != "")
    return df

📌 定義用戶流失標籤

# 定義用戶流失
def define_churn(df):
    '''
    Define churn
    @param df - spark dataframe
    returns updated spark dataframe
    '''
    # define churn as cancellation confirmation
    is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
    df = df.withColumn("churn", is_churn(df.page))
    return df

📌 清理臟數據

有一部分用戶在流失之後,還有一些數據資訊,這可能是時間戳的問題,我們把這部分數據清理掉

# 清理臟數據
def remove_post_churn_rows(df, spark, sql_table):
    '''
    Remove post-churn rows
    @param df - spark dataframe
    @param spark - SparkSession instance
    @param sql_table - string representing name of sql table
    returns updated spark dataframe
    '''
    # define window function to mark non-churn related rows
    user_window = Window \
        .partitionBy('userId') \
        .orderBy(F.desc('ts')) \
        .rangeBetween(Window.unboundedPreceding, 0)
    df = df.withColumn('preChurn', F.sum('churn').over(user_window))
    # remove rows for userIds which are marked as churn but have a timestamp after the 'Cancellation Confirmation' page
    # define GROUP BY and merge against larger df
    churn_df = spark.sql(f'''
        SELECT
            userId AS tmpId,
            MAX(churn) AS tmpChurn
        FROM {sql_table}
        GROUP BY userId
    ''')
    df = df.join(churn_df, df.userId == churn_df.tmpId, "left")
    # remove instances where churned userIds have transctions post Cancellation Confirmation
    df = df.where(~((df.preChurn == 0) & (df.tmpChurn == 1)))
    # remove tmp rows
    df = df.drop('tmpId', 'tmpChurn')
    return df

📌 時間特徵

def prelim_feature_eng(df):
    '''
    Feature engineer columns:
    i timeSinceRegister
    ii. columns representing time scope of entry
    @param df: raw spark dataframe
    returns updated spark dataframe
    '''
    # create new column representing time since registration (ms)
    time_since_register = F.col('ts') - F.col('registration')
    df = df.withColumn("timeSinceRegister", time_since_register)
    
    # create 3 new columns representing when row data relates to
    mth_3 = 60 * 60 * 24 * 90
    mth_6 = 60 * 60 * 24 * 180
    mth_12 = 60 * 60 * 24 * 365
    mth_3_f = F.udf(lambda x : 1 if x / 1000 <= mth_3 else 0, IntegerType())
    mth_6_f = F.udf(lambda x : 1 if x / 1000 <= mth_6 else 0, IntegerType())
    mth_12_f = F.udf(lambda x : 1 if x / 1000 <= mth_12 else 0, IntegerType())
    df = df.withColumn("month3", mth_3_f(df.timeSinceRegister))\
        .withColumn("month6", mth_6_f(df.timeSinceRegister))\
        .withColumn("month12", mth_12_f(df.timeSinceRegister))
    return df

📌 統計&組合特徵

def melt_data(df, spark, sql_table):
    '''
    Melts data to show entries on a user basis for the following columns:
    - userId
    - gender
    - level
    - location
    - uniqueSongs
    - uniqueArtists
    - dayServiceLen
    - countListen1H,
    - countSession1H,
    - lengthListen1H,
    - countListen2H,
    - countSession2H,
    - lengthListen2H
    - churn
    @param df - spark dataframe
    @param spark - SparkSession instance
    @param sql_table - string representing name of sql table
    returns updated spark datafraem
    '''
    melt1 = spark.sql(f'''
    SELECT  userId, 
            MIN(gender) AS gender,
            MIN(level) AS level,
            MAX(location) AS location,
            COUNT(DISTINCT(song)) AS uniqueSongs,
            COUNT(DISTINCT(artist)) AS uniqueArtists,
            COUNT(DISTINCT(song, artist)) AS uniqueSongArtist,
            MAX(Churn) AS churn
        FROM {sql_table}
        GROUP BY userId
    ''')
    melt2 = spark.sql(f'''
    WITH sparkify_table_upt AS (
        SELECT * FROM {sql_table}
            WHERE page = "NextSong"
    ),
    msServiceTable AS (
        SELECT userId, 
            MAX(ts) - MIN(ts) AS msServiceLen,
            MIN(ts) + (MAX(ts) - MIN(ts)) / 2 AS midTs
        FROM sparkify_table_upt
        GROUP BY userId
    ),
    earlyHalfTable AS (
        SELECT  a.userId,
                COUNT(1) AS countListen1H,
                COUNT(DISTINCT(a.sessionId)) AS countSession1H,
                SUM(a.length) AS lengthListen1H
            FROM sparkify_table_upt AS a
            LEFT JOIN msServiceTable AS b ON b.userId = a.userId
            WHERE a.ts < b.midTs
            GROUP BY a.userId
    ),
    lateHalfTable AS (
        SELECT  a.userId,
                COUNT(1) AS countListen2H,
                COUNT(DISTINCT(a.sessionId)) AS countSession2H,
                SUM(a.length) AS lengthListen2H
            FROM sparkify_table_upt AS a
            LEFT JOIN msServiceTable AS b ON b.userId = a.userId
            WHERE a.ts >= b.midTs
            GROUP BY a.userId
    ),
    concatTable AS (
        SELECT m.userId AS tmpUserId,
                milisecToDay(msServiceLen) AS dayServiceLen,
                countListen1H + countListen2H AS countListen,
                countSession1H + countSession2H AS countSession,
                lengthListen1H + lengthListen2H AS lengthListen,
                countListen2H - countListen1H AS countListenDiff,
                countSession2H - countSession1H AS countSessionDiff,
                lengthListen2H - lengthListen1H AS lengthListenDiff
            FROM msServiceTable as m
            LEFT JOIN earlyHalfTable as e ON e.userId = m.userId
            LEFT JOIN lateHalfTable AS l ON l.userId = m.userId
    )
    SELECT *,
        lengthListen / dayServiceLen AS lengthListenPerDay,
        countListen / dayServiceLen AS countListenPerDay,
        countSession / dayServiceLen AS sessionPerDay,
        lengthListen / countListen AS lengthPerListen,
        lengthListen / countSession AS lengthPerSession
        FROM concatTable
    
    ''')
    melt_concat = melt1.join(melt2, melt1.userId == melt2.tmpUserId, "Left")
    melt_concat = melt_concat.drop('tmpUserId')
    return melt_concat

📌 位置資訊

def location_feature_eng(df, census):
    '''
    Create 2 new columns from location -> Region and Division
    @param df: raw spark dataframe
    @param census: csv file containing location mapping based on state code
    returns updated spark dataframe
    '''
    # some census data contains two states, for simplicity, selecting last location
    map_region = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Region'].iloc[0], StringType())
    map_division = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Division'].iloc[0], StringType())

    df = df.withColumn("region", map_region(df.location))\
        .withColumn("division", map_division(df.location))
    return df

📌 組織數據&特徵流水線

# 讀數據
df_train = spark_session.read.json(src)
# 剔除無用欄位
df_train = df_train.drop('firstName', 'lastName', 'method', 'status', 'userAgent', 'auth')
# 清理數據
df_train = clean_data(df_train)
df_train = define_churn(df_train)
df_train.createOrReplaceTempView("table")
# 清除臟數據
df_train = remove_post_churn_rows(df_train, spark_local, "table")
# 基礎特徵
df_train = prelim_feature_eng(df_train)
# 更新表
df_train.createOrReplaceTempView("table")
# 添加更多特徵
df_melt = melt_data(df_train, spark_local, "table")
df_melt = location_feature_eng(df_melt, census)

📌 查看數據特徵

pd_melt = df_melt . toPandas()
pd_melt . describe()

💡 進一步數據探索

① 流失率

predictor = pd_melt['churn'].value_counts()

print(predictor)

plt.title('Churn distribution')
predictor.plot.pie(autopct='%.0f%%')
plt.show()

② 數值vs類別型特徵

label = 'churn'
categorical = ['gender', 'level' , 'location', 'region', 'division']
numerical = ['uniqueSongs', 'uniqueArtists', 'uniqueSongArtist', 'dayServiceLen', \
               'countListen', 'countSession', 'lengthListen', 'countListenDiff', 'countSessionDiff',\
               'lengthListenDiff', 'lengthListenPerDay', 'countListenPerDay',\
               'sessionPerDay', 'lengthPerListen', 'lengthPerSession']

plt.title('Distribution of numerical/categorical features')
plt.pie([len(categorical), len(numerical)], labels=['categorical', 'numerical'], autopct='%.0f%%')
plt.show()

在我們所有的特徵中,25% 是類別型的。

③ 數值型特徵分布

📌 數值特徵&流失分布

def plot_distribution(df, hue, filter_col=None, bins='auto'):
    '''
    Plots distribution of numerical columns
    By default, exclude object, datetime, timedelta and bool types and only consider numerical columns
    @param df (DataFrame) - dataset
    @param hue (str) - column of dataset to apply hue (useful for classification)
    @param filter_col (array) - optional argument, features to be included in plot
    @param bins (int) - defaults to auto for seaborn, sets number of bins of histograms
    '''
    if filter_col == None:
        filter_col = df.select_dtypes(exclude=['object', 'datetime', 'timedelta', 'bool']).columns
    num_cols = len(list(filter_col))
    width = 3
    height = num_cols // width if num_cols % width == 0 else num_cols // width + 1
    plt.figure(figsize=(18, height * 3))
    for i, col in zip(range(num_cols), filter_col):
        plt.subplot(height, width, i + 1)
        plt.xlabel(col)
        plt.ylabel('Count')
        plt.title(f'Distribution of {col}')
        sns.histplot(df, x=col, hue=hue, element="step", stat="count", common_norm=False, bins=bins)
    plt.tight_layout()
    plt.show()
  
  # 繪製數值型特徵分布圖
  plot_distribution(pd_melt, 'churn', filter_col=numerical)

我們的數值型特徵上可以看出:

  • 流失與非流失用戶都有右偏傾向的分布
  • dayServiceLen欄位有最明顯的流失客戶和非流失客戶分布差異。

📌 數值型特徵相關度

# 定義數值型特徵
numerical_churn = numerical + ['churn']
# 計算相關性
corr_data = pd_melt[numerical_churn].corr()

# 繪製熱力圖顯示相關性
plt.figure(figsize=(16,16))
plt.title('Heat map of correlation for all variables')
matrix = np.triu(corr_data)
sns.heatmap(corr_data, cmap='Blues', annot=True, mask=matrix)
plt.show()
  • 我們從熱力圖上沒有看到有數值型特徵流失標籤列有明顯的高相關性。
  • 有幾組特徵,uniqueArtists、uniqueSongArtist、countListen、countSession和lengthListen,它們之間有非常高的相關性。如果大家使用線性模型,可以考慮做特徵選擇,我們後續使用非線性模型的話,可以考慮保留。

④ 類別型特徵的分布

def plot_cat_distribution(data, colname):
    '''
    Plots barplot for categorical columns and piechart showing proportions of churned vs non-churned customers
    @param - data (panas dataframe)
    @param - colname (str) - column of dataframe referenced
    '''
    # //www.statology.org/seaborn-stacked-bar-plot/
    plt.figure(figsize=(15,5))
    ax1 = plt.subplot(1, 3, 1)
    tmp = data.copy()
    tmp['count'] = 1
    x = tmp.groupby([colname, 'churn']).count().reset_index()[[colname, 'churn','count']]
    # churn index 0, 1 doesn't relate to No, Yes, relates to pivoted index only
    x = x.pivot(index='churn', columns=colname).transpose().reset_index().drop('level_0', axis=1)
    x = x.fillna(0)

    plt.title(f'Distribution of {colname}')
    plt.ylabel('Count')
    x.plot.bar(x=colname, stacked=True, ax=ax1, color=['green', 'lightgreen'])

    ax2 = plt.subplot(1, 3, 2)
    plt.title(f'Proportion of {colname} for churned customers')
    plt.pie(x['Yes'], labels=x[colname], autopct='%.0f%%')

    plt.subplot(1, 3, 3)
    plt.title(f'Proportion of {colname} for non-churned customers')
    plt.pie(x['No'], labels=x[colname], autopct='%.0f%%')

    plt.tight_layout()
    plt.show()

    x.index.rename('index', inplace=True)
    print(x)
    tmp_sum = x[['No','Yes']].sum(axis=1)
    x['No'] = x['No'] / tmp_sum
    x['Yes'] = x['Yes'] / tmp_sum
    print(x)
    print(tmp_sum / tmp_sum.sum())


tmp_pd_melt = pd_melt.copy()
tmp_pd_melt['churn'] = tmp_pd_melt['churn'].apply(lambda x: 'Yes' if x == 1 else 'No')

📌 性別&流失分布

plot_cat_distribution(tmp_pd_melt, 'gender')

流失客戶的男性比例更高。

📌 等級&流失分布

plot_cat_distribution(tmp_pd_melt, 'level')

免費和付費客戶的流失比例幾乎沒有差異(差2%),雖然圖上表明付費客戶流失的可能性稍小一點,但這個特徵在建模過程中可能作用不大。

📌 地區&流失分布

plot_cat_distribution(tmp_pd_melt, 'region')

圖上可以看出地區有一些差異,南部地區的流失要嚴重一些,相比之下北部地區的流失用戶少一些。

可以進一步對地區細化和繪圖

plot_cat_distribution(tmp_pd_melt, 'division')

📌 類別型特徵取值數量分布

def cardinality_plot(df, filter_col=None):
    '''
    Input list of categorical variables to filter
    Default is None where it would only consider columns which have type 'Object'
    @param df (DataFrame) - dataset
    @param filter_col (array) - optional argument to specify columns we want to filter
    '''
    if filter_col == None:
        filter_col = df.select_dtypes(include='object').columns
    num_unique = []
    for col in filter_col:
        num_unique.append(len(df[col].unique()))
    plt.bar(list(filter_col), num_unique)
    plt.title('Number of unique categorical variables')
    plt.xlabel('Column name')
    plt.ylabel('Num unique')
    plt.xticks(rotation=90)
    plt.yticks([0, 1, 2, 3, 4])
    plt.show()
    return pd.Series(num_unique, index=filter_col).sort_values(ascending=False)

cardinality_plot(pd_melt, categorical)

直接看最喜歡的location,取值數量有點太多了,我們可以考慮用粗粒度的地理位置資訊,可能區分能力會強一些。

下述部分,我們會使用spark進行特徵工程&大數據建模與調優,相關內容可以閱讀ShowMeAI的以下文章,我們對它的用法做了詳細的講解

💡 建模優化

我們先對數值型特徵做一點小小的數據變換(這裡用到的是log變換),這樣我們的原始數值型特徵分布可以得到一定程度的校正。

def log_transform(df, columns):
    '''
    Log trasform columns in dataframe
    @df - spark dataframe
    @columns - array of string of column names to be log transformed
    returns updated spark dataframe
    '''
    log_transform_func = F.udf(lambda x: np.log10(x + 1), FloatType())
    for col in columns:
        df = df.withColumn(col, log_transform_func(df[col]))
    return df

# 數值型特徵log變換
df_melt = log_transform(df_melt, numerical)

① 數據切分

接下來我們把數據集拆分為 60:20:20 的3部分,分別用於訓練、驗證和測試。

df_melt_copy = df_melt . withColumn("label", df_melt . churn)
rest, test = df_melt_copy.randomSplit([0.8, 0.2], seed=42)
train, val = rest.randomSplit([0.75, 0.25], seed=42)

② 建模流水線

# 導入工具庫
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, OneHotEncoder, StringIndexer 
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix, ConfusionMatrixDisplay

import re

# 數值型特徵處理流水線
numerical_assembler = VectorAssembler(inputCols=numerical, outputCol="numericalFeatures")
standardise = StandardScaler(inputCol="numericalFeatures", outputCol="standardNumFeatures", withStd=True, withMean=True)
minmax = MinMaxScaler(inputCol="standardNumFeatures", outputCol="minmaxNumFeatures")

# 類別型特徵處理流水線
inputCols = ['gender', 'level', 'region', 'division']
outputColsIndexer = [x + 'SI' for x in inputCols]
indexer = StringIndexer(inputCols = inputCols, outputCols=outputColsIndexer)
outputColsOH = [x + 'OH' for x in inputCols]
onehot = OneHotEncoder(inputCols=outputColsIndexer, outputCols=outputColsOH)
categorical_assembler = VectorAssembler(inputCols=outputColsOH, outputCol="categoricalFeatures")

# 組合兩類特徵
total_assembler = VectorAssembler(inputCols=['minmaxNumFeatures', 'categoricalFeatures'], outputCol='features')
pipeline = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler])
# 運行流水線對數據進行處理
pipeline . fit(train) . transform(train) . head()

得到如下結果

Row(userId='10', gender='M', level='paid', location='Laurel, MS', uniqueSongs=629, uniqueArtists=565, uniqueSongArtist=633, churn=0, dayServiceLen=42.43672561645508, countListen=673, countSession=6, lengthListen=166866.37250999993, countListenDiff=-203, countSessionDiff=2, lengthListenDiff=-48180.54478999992, lengthListenPerDay=3932.121766842835, countListenPerDay=15.858904998528928, sessionPerDay=0.14138696878331883, lengthPerListen=247.94408991084686, lengthPerSession=27811.062084999987, region='South', division='East South Central', label=0, numericalFeatures=DenseVector([629.0, 565.0, 633.0, 42.4367, 673.0, 6.0, 166866.3725, -203.0, 2.0, -48180.5448, 3932.1218, 15.8589, 0.1414, 247.9441, 27811.0621]), standardNumFeatures=DenseVector([-0.3973, -0.331, -0.3968, -0.016, -0.3968, -0.6026, -0.3993, -0.6779, 0.6836, -0.6549, -0.3678, -0.3625, -0.1256, -0.1374, 1.1354]), minmaxNumFeatures=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56]), genderSI=0.0, levelSI=1.0, regionSI=0.0, divisionSI=4.0, genderOH=SparseVector(1, {0: 1.0}), levelOH=SparseVector(1, {}), regionOH=SparseVector(3, {0: 1.0}), divisionOH=SparseVector(8, {4: 1.0}), categoricalFeatures=SparseVector(13, {0: 1.0, 2: 1.0, 9: 1.0}), features=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]))

③ 初步建模&評估

我們先定義一個模型評估函數,因為是類別非均衡場景,我們這裡覆蓋比較多的評估準則,包括常用的precision、recall以及排序準則auc等。

# 模型評估函數
def evaluate_model(y_trueTrain, y_predTrain, y_trueTest, y_predTest, y_testProba):
    '''
    Wrapper function for evaluating classification results
    '''
    train_acc = accuracy_score(y_trueTrain, y_predTrain)
    test_acc = accuracy_score(y_trueTest, y_predTest)
    fscore = f1_score(y_trueTest, y_predTest, zero_division=0)
    precision = precision_score(y_trueTest, y_predTest, zero_division=0)
    recall = recall_score(y_trueTest, y_predTest, zero_division=0)
    # linear models would not have .predict_proba method so, if fails, append 0 to roc_auc
    try:
        roc_auc = roc_auc_score(y_trueTest, y_testProba)
    except:
        roc_auc = 0
    return {
        'train_acc': train_acc, 
        'test_acc' : test_acc, 
        'fscore': fscore, 
        'precision': precision, 
        'recall': recall, 
        'roc_auc': roc_auc
    }

📌 邏輯回歸

# 定義模型
lr = LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)
pipeline_lr = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, lr])

# 擬合
lrModel = pipeline_lr.fit(train)
lr_res_test = lrModel.transform(val).select('label', 'prediction', 'probability').toPandas()
lr_res_train = lrModel.transform(train).select('label', 'prediction', 'probability').toPandas()

# 評估
lr_results = evaluate_model(lr_res_train['label'],lr_res_train['prediction'],lr_res_test['label'],lr_res_test['prediction'], lr_res_test['probability'].apply(lambda x: x[1]))
lr_results

結果如下

{'train_acc': 0.8456375838926175,
 'test_acc': 0.8780487804878049,
 'fscore': 0.7368421052631579,
 'precision': 0.5833333333333334,
 'recall': 1.0,
 'roc_auc': 0.9579831932773109}

📌 梯度提升樹GBT

# 定義模型
gbt = GBTClassifier()
pipeline_gbt = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, gbt])

# 擬合
gbtModel = pipeline_gbt.fit(train)
gbt_res_test = gbtModel.transform(val).select('label', 'prediction', 'probability').toPandas()
gbt_res_train = gbtModel.transform(train).select('label', 'prediction', 'probability').toPandas()

# 評估
gbt_results = evaluate_model(gbt_res_train['label'],gbt_res_train['prediction'],gbt_res_test['label'],gbt_res_test['prediction'],\
               gbt_res_test['probability'].apply(lambda x: x[1]))
gbt_results

結果如下

{'train_acc': 1.0,
 'test_acc': 0.8048780487804879,
 'fscore': 0.6,
 'precision': 0.46153846153846156,
 'recall': 0.8571428571428571,
 'roc_auc': 0.8193277310924371}

📌 隨機森林

# 定義模型
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, rf])

# 擬合
rfModel = pipeline_rf.fit(train)
rf_res_test = rfModel.transform(val).select('label', 'prediction', 'probability').toPandas()
rf_res_train = rfModel.transform(train).select('label', 'prediction', 'probability').toPandas()

# 評估
rf_results = evaluate_model(rf_res_train['label'],rf_res_train['prediction'],rf_res_test['label'],rf_res_test['prediction'], rf_res_test['probability'].apply(lambda x: x[1]))
rf_results

結果如下

{'train_acc': 0.959731543624161,
 'test_acc': 0.8780487804878049,
 'fscore': 0.6666666666666666,
 'precision': 0.625,
 'recall': 0.7142857142857143,
 'roc_auc': 0.9243697478991597}

📌 綜合對比

cv_results = pd.DataFrame(columns=['accuracy_train','accuracy_cv','fscore_cv','precision_cv','recall_cv', 'roc_auc_cv'])
cv_results.loc['LogisticRegression'] = lr_results.values()
cv_results.loc['GradientBoostingTree'] = gbt_results.values()
cv_results.loc['RandomForest'] = rf_results.values()

cv_results.style.apply(lambda x: ["background: lightgreen" if abs(v) == max(x) else "" for v in x], axis = 0)

綜合對比結果如下:

我們在上述建模與評估過程中,綜合對比了訓練集和驗證集的結果。關於評估準則:

  • accuracy通常不是衡量類別非均衡場景下的分類好指標。 極端的情況下,僅預測我們所有的客戶「不流失」就達到 77% 的accuracy。
  • recall衡量我們的正樣本中有多少被模型預估為正樣本,即TP / (TP + FN),我們上述建模過程中,LogisticRegression正確識別所有會流失的客戶。
  • recall還需要結合precision一起看,例如,上述LogisticRegression預估的流失客戶中,只有 58% 真正流失了。 (這意味著如果我們要開展營銷活動來解決客戶流失問題,有42% (1 – 0.58) 的成本會浪費在未流失客戶身上)。
  • 可以使用 fscore 指標來綜合考慮recall和precision。
  • ROC_AUC 衡量我們的真陽性與假陽性率。 我們的 AUC 越高,模型在區分正類和負類方面的性能就越好。

上述指標中,我們優先關注ROC_AUC,其次是 fscore,我們上述指標中LogisticRegression效果良好,下面我們基於它進一步調優。

④ 超參數調優

📌 交叉驗證

我們上面的建模只是敲定了一組超參數,超參數會影響模型的最終效果,我們可以使用spark的CrossValidator進行超參數調優,選出最優的超參數。

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .addGrid(lr.maxIter,[50, 100]) \
    .build()

crossval = CrossValidator(estimator=pipeline_lr,
                         estimatorParamMaps=paramGrid,
                         evaluator=MulticlassClassificationEvaluator(),
                         numFolds=3)

# 交叉驗證調參
cvModel = crossval . fit(rest)
cvModel . avgMetrics

輸出結果如下

[0.8011084544393228,
 0.8222872837788751,
 0.7284659848286738,
 0.7284659848286738]

我們對測試集做評估

# 交叉驗證評估
cv_res_test = cvModel.transform(test).select('label', 'prediction', 'probability').toPandas()
cv_res_train = cvModel.transform(rest).select('label', 'prediction', 'probability').toPandas()
cv_metrics = evaluate_model(cv_res_train['label'],cv_res_train['prediction'],cv_res_test['label'],cv_res_test['prediction'], cv_res_test['probability'].apply(lambda x: x[1]))

cv_metrics
{'train_acc': 0.8894736842105263,
 'test_acc': 0.8571428571428571,
 'fscore': 0.7368421052631577,
 'precision': 0.7,
 'recall': 0.7777777777777778,
 'roc_auc': 0.858974358974359}

📌 最優超參數

cvModel . getEstimatorParamMaps()[np . argmax(cvModel . avgMetrics)]
# 輸出結果
{Param(parent='LogisticRegression_e765de70ec6a', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
 Param(parent='LogisticRegression_e765de70ec6a', name='maxIter', doc='max number of iterations (>= 0).'): 100}

💡 結果評估

我們的 ROC_AUC 從 95.7 下降到 85.9。 這並不奇怪,因為我懷疑 95.7 的結果是由於過度擬合造成的。

{'train_acc': 0.8894736842105263,
 'test_acc': 0.8571428571428571,
 'fscore': 0.7368421052631577,
 'precision': 0.7,
 'recall': 0.7777777777777778,
 'roc_auc': 0.858974358974359}

最好的參數是 regParam為 0 和 maxIter100 個。

① 混淆矩陣

我們定一個函數來繪製一下混淆矩陣(即對正負樣本和預估結果劃分4個象限進行評估)。

def plot_confusion_matrix(y_true, y_pred, title):
    '''
    Plots confusion matrix
    @param y_true - array of actual labels
    @param y_pred - array of predictions
    @title title - string of title
    '''
    conf_matrix = confusion_matrix(y_true, y_pred)
    matrix_display = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["No Churn", "Churn"])
    matrix_display.plot(cmap='Greens')
    # adding title - //github.com/scikit-learn/scikit-learn/discussions/20690
    matrix_display.ax_.set_title(title)
    plt.grid(False)
    plt.show()

    # Calculating recall (sensitivity), precision and specificity
    tn = conf_matrix[0][0]
    tp = conf_matrix[1][1]
    fn = conf_matrix[1][0]
    fp = conf_matrix[0][1]
    print(f'True Positive Rate/Recall/Sensitivity: {round(tp/(tp+fn), 6)}')
    # basically inverse of TPR
    print(f'False Positive Rate/(1 - Specificity): {round(fp/(tn+fp), 6)}')
    print(f'Precision                            : {round(tp/(tp+fp), 6)}')

# 繪製混淆矩陣
plot_confusion_matrix(cv_res_test['label'], cv_res_test['prediction'], "Confusion matrix at 50% threshold (default)")

查看下面的混淆矩陣,用0.5的默認概率閾值能夠正確預測 77.78% 的流失客戶 (7/(7+2)),也具有 70% 的不錯的precision (7/(7+3))

② ROC_AUC 曲線

# 預測概率
test_proba = cv_res_test['probability'] . apply(lambda x: x[1])

# fpr = false positive rate
# tpr = true positive rate
fpr, tpr, _ = roc_curve(cv_res_test['label'], test_proba)

# 繪圖
plt.figure(figsize=(10,8))
plt.title('ROC AUC Curve for customer churn')
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Postive Rate (FPR) / Recall')
plt.plot(fpr, tpr, marker='.', label='LR')
plt.plot([0, 1], [0, 1])
plt.show()

下面的 ROC AUC 曲線清楚地顯示了召回率(真陽性率)和假陽性率之間的權衡。

③ PR 曲線

lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)
# 繪製PR曲線
plt.figure(figsize=(10,8))
plt.title('Recall/Precision curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.plot(lr_recall, lr_precision, marker='.', label='LR')
plt.axhline(y=cv_metrics['precision'], color='r')
plt.axvline(x=cv_metrics['recall'], color='r')
plt.show()

下面的召回/精度圖中的交點代表了我們調整後的LogisticRegression模型的召回-精度。默認的50%的決策閾值得出了77.8%/70%的召回率-精確度的權衡。

通過調整我們的決策閾值,我們可以訂製我們想要的召回/精確率。

💡 總結&業務思考

我們可以調整我們的決策(概率)閾值,以獲得一個最滿意的召回率或精確度。比如在我們的場景下,使用了0.72的閾值取代默認的0.5,結果是在召回率沒有下降的基礎上,提升了精度。

現實中,召回率和精確度之間肯定會有權衡,特別是當我們在比較大的數據集上建模應用時。

def classify_custom_threshold(y_true, y_pred_proba, threshold=0.5):
    '''
    Identifies custom threshold and plots confusion matrix
    @y_true - array of actual labels
    @y_pred_proba - array of probabilities of predictions
    @threshold - decision threshold which is defaulted to 50%
    '''
    y_pred = y_pred_proba >= threshold
    plot_confusion_matrix(y_true, y_pred, f'Confusion matrix at {round(threshold * 100, 1)}% decision threshold')
    
classify_custom_threshold(cv_res_test['label'], test_proba, 0.72)

我們還需要與業務管理人員積極溝通,了解他們更有傾向性的指標(更看重precision還是recall):

  • 優先考慮recall意味著我們能判斷出大部分實際流失的客戶,但這可能會降低精度,就像我們之前提到的,這可能會導致成本增加。
  • 我們當前的結果已經很不錯了,如果業務負責人想追求更高的召回率,並願意為此花費一些成本,我們可以降低決策(概率)門檻。

舉例來說,在我們當前的例子中,如果我們將決策判定概率從0.5降低到0.25,可以把召回率提升到88.9%,但隨之發生變化的是精度降低到47%。

lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)

plt.figure(figsize=(10,8))
plt.title('Recall/Precision curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.plot(lr_recall, lr_precision, marker='.', label='LR')
plt.axhline(y=cv_metrics['precision'], color='r', alpha=0.3)
plt.axvline(x=cv_metrics['recall'], color='r', alpha=0.3)
plt.axhline(y=0.470588, color='r')
plt.axvline(x=0.888889, color='r')
plt.show()

classify_custom_threshold(cv_res_test['label'], test_proba, 0.25)

參考資料