如何構建一個生產環境的推薦系統
1.概述
前面介紹過什麼是推薦系統,以及推薦系統中的用例,比如基於用戶的協同過濾來構建推薦系統。今天給大家介紹如何構建一個生產環境的推薦系統。
2.內容
現在互聯網上的內容很多,我們可能每天都會接受來自不同消息。例如,電商網站、閱讀部落格、各類新聞文章等。但是,這些消息並不是所有的內容你都感興趣,可能你只對技術部落格感興趣,或者某些新聞感興趣等等。而這麼內容如何去滿足用戶的需求呢?我們需要一個精準的解決方案來簡化用戶的發現過程。
2.1 推薦系統的作用是啥?
簡而言之,推薦系統就是一個發現用戶喜好的系統。系統從數據中學習並向用戶提供有效的建議。如果用戶沒有特意搜索某項物品,則系統會自動將該項帶出。這樣看起很神奇,比如,你在電商網站上瀏覽過某個品牌的鞋子,當你在用一些社交軟體、短影片軟體、影片軟體時,你會驚奇的發現在你所使用的這些軟體中,會給你推薦你剛剛在電商網站上瀏覽的過的鞋子。
其實,這得益於推薦系統的過濾功能。我們來看看一張簡圖,如下圖所示:
從上圖中,我們可以簡單的總結出,整個數據流程如下:
- 數據來源:負責提供數據來源,比如用戶在電商網站、新聞、影片等上的用戶行為,作為推薦訓練的數據來源;
- 數據採集:用戶產生了數據,我們需要將這些數據進行收集,比如SDK埋點採集、Nginx上報、爬蟲等方式來獲取數據;
- 數據存儲:獲取這些數據後,需要對這些數據進行分類存儲、清洗等,比如大數據裡面用的最多的HDFS,或者構建數據倉庫Hive表等;
- 推薦系統:數據分類、清洗後好,有了推薦系統需要的數據,然後使用推薦系統中的各種模型、比如協同過濾、內容過濾、相似過濾、用戶矩陣等,來訓練這些用戶數據,得到訓練結果;
- 目標用戶:通過推薦系統,對用戶數據進行訓練後,得出訓練結果,將這些結果,推薦給目標用戶。
2.2 依賴準備
我們使用Python來夠構建推薦系統模型,需要依賴如下的Python依賴包:
pip install numpy pip install scipy pip install pandas pip install jupyter pip install requests
這裡為簡化Python的依賴環境,推薦使用Anaconda3。這裡面集成了很多Python的依賴庫,不用我們在額外去關注Python的環境準備。
接著,我們載入數據源,程式碼如下:
import pandas as pd import numpy as np df = pd.read_csv('resource/events.csv') df.shape print(df.head())
結果如下:
使用df.head()會列印數據前5行數據:
- timestamp:時間戳
- visitorid:用戶ID
- event:事件類型
- itemid:物品ID
- transactionid:事務ID
使用如下程式碼,查看事件類型有哪些:
print(df.event.unique())
結果如下:
從上圖可知,類型有三種,分別是:view、addtocart、transaction。
為了簡化起見,以transaction類型為例子。程式碼如下所示:
trans = df[df['event'] == 'transaction'] trans.shape print(trans.head())
結果如下圖所示:
接著,我們來看看用戶和物品的相關數據,程式碼如下:
visitors = trans['visitorid'].unique() items = trans['itemid'].unique() print(visitors.shape) print(items.shape)
我們可以獲得11719個去重用戶和12025個去重物品。
構建一個簡單而有效的推薦系統的經驗法則是在不損失精準度的情況下減少數據的樣本。這意味著,你只能為每個用戶獲取大約50個最新的事務樣本,並且我們仍然可以得到期望中的結果。
程式碼如下所示:
trans2 = trans.groupby(['visitorid']).head(50) print(trans2.shape)
真實場景中,用戶ID和物品ID是一個海量數字,人為很難記住,比如如下程式碼:
trans2['visitors'] = trans2['visitorid'].apply(lambda x : np.argwhere(visitors == x)[0][0]) trans2['items'] = trans2['itemid'].apply(lambda x : np.argwhere(items == x)[0][0]) print(trans2)
結果如下圖所示:
2.3 構建矩陣
2.3.1 構建用戶-物品矩陣
從上面的程式碼執行的結果來看,目前樣本數據中有11719個去重用戶和12025個去重物品,因此,我們接下來構建一個稀疏矩陣。需要用到如下Python依賴:
from scipy.sparse import csr_matrix
實現程式碼如下所示:
occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8') def set_occurences(visitor, item): occurences[visitor, item] += 1 trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1) print(occurences)
結果如下所示:


(0, 0) 1 (1, 1) 1 (1, 37) 1 (1, 72) 1 (1, 108) 1 (1, 130) 1 (1, 131) 1 (1, 132) 1 (1, 133) 1 (1, 162) 1 (1, 163) 1 (1, 164) 1 (2, 2) 1 (3, 3) 1 (3, 161) 1 (4, 4) 1 (4, 40) 1 (5, 5) 1 (5, 6) 1 (5, 18) 1 (5, 19) 1 (5, 54) 1 (5, 101) 1 (5, 111) 1 (5, 113) 1 : : (11695, 383) 1 (11696, 12007) 1 (11696, 12021) 1 (11697, 12008) 1 (11698, 12011) 1 (11699, 1190) 1 (11700, 506) 1 (11701, 11936) 1 (11702, 10796) 1 (11703, 12013) 1 (11704, 12016) 1 (11705, 12017) 1 (11706, 674) 1 (11707, 3653) 1 (11708, 12018) 1 (11709, 12019) 1 (11710, 1330) 1 (11711, 4184) 1 (11712, 3595) 1 (11713, 12023) 1 (11714, 3693) 1 (11715, 5690) 1 (11716, 6280) 1 (11717, 3246) 1 (11718, 2419) 1
View Code
2.3.2 構建物品-物品共生矩陣
構建一個物品與物品矩陣,其中每個元素表示一個用戶購買兩個物品的次數,可以認為是一個共生矩陣。要構建一個共生矩陣,需要將發生矩陣的轉置與自身進行點乘。
cooc = occurences.transpose().dot(occurences) cooc.setdiag(0) print(cooc)
結果如下所示:


(0, 0) 0 (164, 1) 1 (163, 1) 1 (162, 1) 1 (133, 1) 1 (132, 1) 1 (131, 1) 1 (130, 1) 1 (108, 1) 1 (72, 1) 1 (37, 1) 1 (1, 1) 0 (2, 2) 0 (161, 3) 1 (3, 3) 0 (40, 4) 1 (4, 4) 0 (8228, 5) 1 (8197, 5) 1 (8041, 5) 1 (8019, 5) 1 (8014, 5) 1 (8009, 5) 1 (8008, 5) 1 (7985, 5) 1 : : (11997, 12022) 1 (2891, 12022) 1 (12023, 12023) 0 (12024, 12024) 0 (11971, 12024) 1 (11880, 12024) 1 (10726, 12024) 1 (8694, 12024) 1 (4984, 12024) 1 (4770, 12024) 1 (4767, 12024) 1 (4765, 12024) 1 (4739, 12024) 1 (4720, 12024) 1 (4716, 12024) 1 (4715, 12024) 1 (4306, 12024) 1 (2630, 12024) 1 (2133, 12024) 1 (978, 12024) 1 (887, 12024) 1 (851, 12024) 1 (768, 12024) 1 (734, 12024) 1 (220, 12024) 1
View Code
這樣一個稀疏矩陣就構建好了,並使用setdiag函數將對角線設置為0(即忽略第一項的值)。
接下來會用到一個和餘弦相似度的演算法類似的演算法LLR(Log-Likelihood Ratio)。LLR演算法的核心是分析事件的計數,特別是事件同時發生的計數。而我們需要的技術一般包括:
- 兩個事件同時發生的次數(K_11)
- 一個事件發生而另外一個事件沒有發生的次數(K_12、K_21)
- 兩個事件都沒有發生(K_22)
表格表示如下:
事件A | 事件B | |
事件B | A和B同時發生(K_11) | B發生,單A不發生(K_12) |
任何事件但不包含B | A發生,但是B不發生(K_21) | A和B都不發生(K_22) |
通過上述表格描述,我們可以較為簡單的計算LLR的分數,公式如下所示:
LLR=2 sum(k)(H(k)-H(rowSums(k))-H(colSums(k)))
那回到本案例來,實現程式碼如下所示:
def xLogX(x): return x * np.log(x) if x != 0 else 0.0 def entropy(x1, x2=0, x3=0, x4=0): return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4) def LLR(k11, k12, k21, k22): rowEntropy = entropy(k11 + k12, k21 + k22) columnEntropy = entropy(k11 + k21, k12 + k22) matrixEntropy = entropy(k11, k12, k21, k22) if rowEntropy + columnEntropy < matrixEntropy: return 0.0 return 2.0 * (rowEntropy + columnEntropy - matrixEntropy) def rootLLR(k11, k12, k21, k22): llr = LLR(k11, k12, k21, k22) sqrt = np.sqrt(llr) if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22): sqrt = -sqrt return sqrt
程式碼中的K11、K12、K21、K22分別代表的含義如下:
- K11:兩個事件都發送
- K12:事件B發送,而事件A不發生
- K21:事件A發送,而事件B不發生
- K22:事件A和B都不發生
那我們計算的公式,實現的程式碼如下所示:
row_sum = np.sum(cooc, axis=0).A.flatten() column_sum = np.sum(cooc, axis=1).A.flatten() total = np.sum(row_sum, axis=0) pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double') cx = cooc.tocoo() for i,j,v in zip(cx.row, cx.col, cx.data): if v != 0: k11 = v k12 = row_sum[i] - k11 k21 = column_sum[j] - k11 k22 = total - k11 - k12 - k21 pp_score[i,j] = rootLLR(k11, k12, k21, k22)
然後,我們對結果進行排序,讓每一項的最高LLR分數位於每行的第一列,實現程式碼如下所示:
result = np.flip(np.sort(pp_score.A, axis=1), axis=1) result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1)
例如我們來看看其中一項結果,程式碼如下:
print(result[8456]) print(result_indices[8456])
結果如下所示:
實際情況中,我們會根據經驗對LLR分數進行一些限制,因此將不重要的指標會進行刪除。
minLLR = 5 indicators = result[:, :50] indicators[indicators < minLLR] = 0.0 indicators_indices = result_indices[:, :50] max_indicator_indices = (indicators==0).argmax(axis=1) max = max_indicator_indices.max() indicators = indicators[:, :max+1] indicators_indices = indicators_indices[:, :max+1]
訓練出結果後,我們可以將其放入到ElasticSearch中進行實時檢索。使用到的Python依賴庫如下:
import requests
import json
這裡使用ElasticSearch的批量更新API,創建一個新的索引,實現程式碼如下:
actions = [] for i in range(indicators.shape[0]): length = indicators[i].nonzero()[0].shape[0] real_indicators = items[indicators_indices[i, :length]].astype("int").tolist() id = items[i] action = { "index" : { "_index" : "items2", "_id" : str(id) } } data = { "id": int(id), "indicators": real_indicators } actions.append(json.dumps(action)) actions.append(json.dumps(data)) if len(actions) == 200: actions_string = "\n".join(actions) + "\n" actions = [] url = "//127.0.0.1:9200/_bulk/" headers = { "Content-Type" : "application/x-ndjson" } requests.post(url, headers=headers, data=actions_string) if len(actions) > 0: actions_string = "\n".join(actions) + "\n" actions = [] url = "//127.0.0.1:9200/_bulk/" headers = { "Content-Type" : "application/x-ndjson" } requests.post(url, headers=headers, data=actions_string)
在瀏覽器中訪問地址//127.0.0.1:9200/items2/_count,結果如下所示:
接下來,我們可以嘗試將訪問地址切換為這個//127.0.0.1:9200/items2/240708,結果如下所示:
3.總結
構建一個面向生產環境的推薦系統並不困難,目前現有的技術組件可以滿足我們構建這樣一個生產環境的推薦系統。比如Hadoop、Hive、HBase、Kafka、ElasticSearch等這些成熟的開源組件來構建我們的生產環境推薦系統。本案例的完整程式碼如下所示:


import pandas as pd import numpy as np from scipy.sparse import csr_matrix import requests import json df = pd.read_csv('resource/events.csv') # print(df.shape) # print(df.head()) # print(df.event.unique()) trans = df[df['event'] == 'transaction'] # print(trans.shape) # print(trans.head()) visitors = trans['visitorid'].unique() items = trans['itemid'].unique() # print(visitors.shape) # print(items.shape) trans2 = trans.groupby(['visitorid']).head(50) # print(trans2.shape) trans2['visitors'] = trans2['visitorid'].apply(lambda x : np.argwhere(visitors == x)[0][0]) trans2['items'] = trans2['itemid'].apply(lambda x : np.argwhere(items == x)[0][0]) # print(trans2) occurences = csr_matrix((visitors.shape[0], items.shape[0]), dtype='int8') def set_occurences(visitor, item): occurences[visitor, item] += 1 trans2.apply(lambda row: set_occurences(row['visitors'], row['items']), axis=1) # print(occurences) cooc = occurences.transpose().dot(occurences) cooc.setdiag(0) # print(cooc) def xLogX(x): return x * np.log(x) if x != 0 else 0.0 def entropy(x1, x2=0, x3=0, x4=0): return xLogX(x1 + x2 + x3 + x4) - xLogX(x1) - xLogX(x2) - xLogX(x3) - xLogX(x4) def LLR(k11, k12, k21, k22): rowEntropy = entropy(k11 + k12, k21 + k22) columnEntropy = entropy(k11 + k21, k12 + k22) matrixEntropy = entropy(k11, k12, k21, k22) if rowEntropy + columnEntropy < matrixEntropy: return 0.0 return 2.0 * (rowEntropy + columnEntropy - matrixEntropy) def rootLLR(k11, k12, k21, k22): llr = LLR(k11, k12, k21, k22) sqrt = np.sqrt(llr) if k11 * 1.0 / (k11 + k12) < k21 * 1.0 / (k21 + k22): sqrt = -sqrt return sqrt row_sum = np.sum(cooc, axis=0).A.flatten() column_sum = np.sum(cooc, axis=1).A.flatten() total = np.sum(row_sum, axis=0) pp_score = csr_matrix((cooc.shape[0], cooc.shape[1]), dtype='double') cx = cooc.tocoo() for i,j,v in zip(cx.row, cx.col, cx.data): if v != 0: k11 = v k12 = row_sum[i] - k11 k21 = column_sum[j] - k11 k22 = total - k11 - k12 - k21 pp_score[i,j] = rootLLR(k11, k12, k21, k22) result = np.flip(np.sort(pp_score.A, axis=1), axis=1) result_indices = np.flip(np.argsort(pp_score.A, axis=1), axis=1) print(result.shape) print(result[8456]) print(result_indices[8456]) minLLR = 5 indicators = result[:, :50] indicators[indicators < minLLR] = 0.0 indicators_indices = result_indices[:, :50] max_indicator_indices = (indicators==0).argmax(axis=1) max = max_indicator_indices.max() indicators = indicators[:, :max+1] indicators_indices = indicators_indices[:, :max+1] actions = [] for i in range(indicators.shape[0]): length = indicators[i].nonzero()[0].shape[0] real_indicators = items[indicators_indices[i, :length]].astype("int").tolist() id = items[i] action = { "index" : { "_index" : "items2", "_id" : str(id) } } data = { "id": int(id), "indicators": real_indicators } actions.append(json.dumps(action)) actions.append(json.dumps(data)) if len(actions) == 200: actions_string = "\n".join(actions) + "\n" actions = [] url = "//127.0.0.1:9200/_bulk/" headers = { "Content-Type" : "application/x-ndjson" } requests.post(url, headers=headers, data=actions_string) if len(actions) > 0: actions_string = "\n".join(actions) + "\n" actions = [] url = "//127.0.0.1:9200/_bulk/" headers = { "Content-Type" : "application/x-ndjson" } requests.post(url, headers=headers, data=actions_string)
View Code
4.結束語
這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,部落客出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買部落客的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學影片。