推薦系統實踐 0x06 基於鄰域的算法(1)
基於鄰域的算法(1)
基於鄰域的算法主要分為兩類,一類是基於用戶的協同過濾算法,另一類是基於物品的協同過濾算法。我們首先介紹基於用戶的協同過濾算法。
基於用戶的協同過濾算法(UserCF)
基於用戶的協同過濾算法是最古老的算法了,它標誌着推薦系統的誕生。當一個用戶甲需要個性化推薦時,首先找到那些跟他興趣相似的用戶,然後把那些用戶喜歡的,甲沒有聽說過的物品推薦給用戶甲,那麼這種方式就叫做基於用戶的協同過濾算法。
那麼,這個算法包含兩個步驟:
- 找到和目標用戶興趣相似的用戶集合。
- 找到這個集合中的用戶喜歡的,且目標用戶沒有聽說過的物品推薦給目標用戶。
我們用用戶行為的相似度來表示興趣的相似度。對於用戶\(u\)和用戶\(v\),\(N(u)\)和\(N(v)\)表示各自有過正反饋的物品集合。那麼我們用Jaccard公式表示用戶\(u\)和用戶\(v\)之間的興趣相似度。
\]
另外也可以通過餘弦相似度進行計算
\]
餘弦相似度的計算代碼為
def UserSimilarity(train):
W = dict()
for u in train.keys():
for v in train.keys():
if u == v:
continue
W[u][v] = len(train[u] & train[v])
W[u][v] /= math.sqrt(len(train[u]) * len(train[v]) * 1.0)
return W
如果這樣去計算的話,在用戶非常大的時候回非常耗時,因為很多用戶之間並沒有對相同的物品產生過行為,算法也把時間浪費在計算用戶興趣相似度上。那麼我們可以對公式分子部分交集不為空的部分。
建立物品到用戶的倒排表,對於每個物品都保存對該物品產生過行為的用戶列表。
def UserSimilarity(train):
# build inverse table for item_users
item_users = dict()
for u, items in train.items():
for i in items.keys():
if i not in item_users:
item_users[i] = set()
item_users[i].add(u)
#calculate co-rated items between users
C = dict()
N = dict()
for i, users in item_users.items():
for u in users:
N[u] += 1
for v in users:
if u == v:
continue
C[u][v] += 1
# calculate finial similarity matrix W
W = dict()
for u, related_users in C.items():
for v, cuv in related_users.items():
W[u][v] = cuv / math.sqrt(N[u] * N[v])
return W
有了其他用戶的對某個物品\(i\)感興趣的評分,那麼根據相似度可以計算出用戶\(u\)對物品\(i\)的感興趣評分為:
\]
其中\(S(u,K)\)是與用戶\(u\)最相似的K個用戶。因為使用的是單一行為的隱反饋數據,所以所有的評分都為1。
我們用上一篇介紹的MovieLens數據集,以及以前介紹的評測方式來把代碼串起來,總體代碼:
import random
import math
import time
from tqdm import tqdm
def timmer(func):
def wrapper(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
stop_time = time.time()
print('Func %s, run time: %s' %
(func.__name__, stop_time - start_time))
return res
return wrapper
class Dataset():
def __init__(self, fp):
# fp: data file path
self.data = self.loadData(fp)
@timmer
def loadData(self, fp):
data = []
for l in open(fp):
data.append(tuple(map(int, l.strip().split('::')[:2])))
return data
@timmer
def splitData(self, M, k, seed=1):
'''
:params: data, 加載的所有(user, item)數據條目
:params: M, 劃分的數目,最後需要取M折的平均
:params: k, 本次是第幾次劃分,k~[0, M)
:params: seed, random的種子數,對於不同的k應設置成一樣的
:return: train, test
'''
train, test = [], []
random.seed(seed)
for user, item in self.data:
# 這裡與書中的不一致,本人認為取M-1較為合理,因randint是左右都覆蓋的
if random.randint(0, M - 1) == k:
test.append((user, item))
else:
train.append((user, item))
# 處理成字典的形式,user->set(items)
def convert_dict(data):
data_dict = {}
for user, item in data:
if user not in data_dict:
data_dict[user] = set()
data_dict[user].add(item)
data_dict = {k: list(data_dict[k]) for k in data_dict}
return data_dict
return convert_dict(train), convert_dict(test)
class Metric():
def __init__(self, train, test, GetRecommendation):
'''
:params: train, 訓練數據
:params: test, 測試數據
:params: GetRecommendation, 為某個用戶獲取推薦物品的接口函數
'''
self.train = train
self.test = test
self.GetRecommendation = GetRecommendation
self.recs = self.getRec()
# 為test中的每個用戶進行推薦
def getRec(self):
recs = {}
for user in self.test:
rank = self.GetRecommendation(user)
recs[user] = rank
return recs
# 定義精確率指標計算方式
def precision(self):
all, hit = 0, 0
for user in self.test:
test_items = set(self.test[user])
rank = self.recs[user]
for item, score in rank:
if item in test_items:
hit += 1
all += len(rank)
return round(hit / all * 100, 2)
# 定義召回率指標計算方式
def recall(self):
all, hit = 0, 0
for user in self.test:
test_items = set(self.test[user])
rank = self.recs[user]
for item, score in rank:
if item in test_items:
hit += 1
all += len(test_items)
return round(hit / all * 100, 2)
# 定義覆蓋率指標計算方式
def coverage(self):
all_item, recom_item = set(), set()
for user in self.test:
for item in self.train[user]:
all_item.add(item)
rank = self.recs[user]
for item, score in rank:
recom_item.add(item)
return round(len(recom_item) / len(all_item) * 100, 2)
# 定義新穎度指標計算方式
def popularity(self):
# 計算物品的流行度
item_pop = {}
for user in self.train:
for item in self.train[user]:
if item not in item_pop:
item_pop[item] = 0
item_pop[item] += 1
num, pop = 0, 0
for user in self.test:
rank = self.recs[user]
for item, score in rank:
# 取對數,防止因長尾問題帶來的被流行物品所主導
pop += math.log(1 + item_pop[item])
num += 1
return round(pop / num, 6)
def eval(self):
metric = {
'Precision': self.precision(),
'Recall': self.recall(),
'Coverage': self.coverage(),
'Popularity': self.popularity()
}
print('Metric:', metric)
return metric
# 1. 隨機推薦
def Random(train, K, N):
'''
:params: train, 訓練數據集
:params: K, 可忽略
:params: N, 超參數,設置取TopN推薦物品數目
:return: GetRecommendation,推薦接口函數
'''
items = {}
for user in train:
for item in train[user]:
items[item] = 1
def GetRecommendation(user):
# 隨機推薦N個未見過的
user_items = set(train[user])
rec_items = {k: items[k] for k in items if k not in user_items}
rec_items = list(rec_items.items())
random.shuffle(rec_items)
return rec_items[:N]
return GetRecommendation
# 2. 熱門推薦
def MostPopular(train, K, N):
'''
:params: train, 訓練數據集
:params: K, 可忽略
:params: N, 超參數,設置取TopN推薦物品數目
:return: GetRecommendation, 推薦接口函數
'''
items = {}
for user in train:
for item in train[user]:
if item not in items:
items[item] = 0
items[item] += 1
def GetRecommendation(user):
# 隨機推薦N個沒見過的最熱門的
user_items = set(train[user])
rec_items = {k: items[k] for k in items if k not in user_items}
rec_items = list(
sorted(rec_items.items(), key=lambda x: x[1], reverse=True))
return rec_items[:N]
return GetRecommendation
# 3. 基於用戶餘弦相似度的推薦
def UserCF(train, K, N):
'''
:params: train, 訓練數據集
:params: K, 超參數,設置取TopK相似用戶數目
:params: N, 超參數,設置取TopN推薦物品數目
:return: GetRecommendation, 推薦接口函數
'''
# 計算item->user的倒排索引
item_users = {}
for user in train:
for item in train[user]:
if item not in item_users:
item_users[item] = []
item_users[item].append(user)
# 計算用戶相似度矩陣
sim = {}
num = {}
for item in item_users:
users = item_users[item]
for i in range(len(users)):
u = users[i]
if u not in num:
num[u] = 0
num[u] += 1
if u not in sim:
sim[u] = {}
for j in range(len(users)):
if j == i: continue
v = users[j]
if v not in sim[u]:
sim[u][v] = 0
sim[u][v] += 1
for u in sim:
for v in sim[u]:
sim[u][v] /= math.sqrt(num[u] * num[v])
# 按照相似度排序
sorted_user_sim = {k: list(sorted(v.items(), \
key=lambda x: x[1], reverse=True)) \
for k, v in sim.items()}
# 獲取接口函數
def GetRecommendation(user):
items = {}
seen_items = set(train[user])
for u, _ in sorted_user_sim[user][:K]:
for item in train[u]:
# 要去掉用戶見過的
if item not in seen_items:
if item not in items:
items[item] = 0
items[item] += sim[user][u]
recs = list(sorted(items.items(), key=lambda x: x[1],
reverse=True))[:N]
return recs
return GetRecommendation
# 4. 基於改進的用戶餘弦相似度的推薦
def UserIIF(train, K, N):
'''
:params: train, 訓練數據集
:params: K, 超參數,設置取TopK相似用戶數目
:params: N, 超參數,設置取TopN推薦物品數目
:return: GetRecommendation, 推薦接口函數
'''
# 計算item->user的倒排索引
item_users = {}
for user in train:
for item in train[user]:
if item not in item_users:
item_users[item] = []
item_users[item].append(user)
# 計算用戶相似度矩陣
sim = {}
num = {}
for item in item_users:
users = item_users[item]
for i in range(len(users)):
u = users[i]
if u not in num:
num[u] = 0
num[u] += 1
if u not in sim:
sim[u] = {}
for j in range(len(users)):
if j == i: continue
v = users[j]
if v not in sim[u]:
sim[u][v] = 0
# 相比UserCF,主要是改進了這裡
sim[u][v] += 1 / math.log(1 + len(users))
for u in sim:
for v in sim[u]:
sim[u][v] /= math.sqrt(num[u] * num[v])
# 按照相似度排序
sorted_user_sim = {k: list(sorted(v.items(), \
key=lambda x: x[1], reverse=True)) \
for k, v in sim.items()}
# 獲取接口函數
def GetRecommendation(user):
items = {}
seen_items = set(train[user])
for u, _ in sorted_user_sim[user][:K]:
for item in train[u]:
# 要去掉用戶見過的
if item not in seen_items:
if item not in items:
items[item] = 0
items[item] += sim[user][u]
recs = list(sorted(items.items(), key=lambda x: x[1],
reverse=True))[:N]
return recs
return GetRecommendation
class Experiment():
def __init__(self, M, K, N, fp='./ml-1m/ratings.dat',
rt='UserCF'):
'''
:params: M, 進行多少次實驗
:params: K, TopK相似用戶的個數
:params: N, TopN推薦物品的個數
:params: fp, 數據文件路徑
:params: rt, 推薦算法類型
'''
self.M = M
self.K = K
self.N = N
self.fp = fp
self.rt = rt
self.alg = {'Random': Random, 'MostPopular': MostPopular, \
'UserCF': UserCF, 'UserIIF': UserIIF}
# 定義單次實驗
@timmer
def worker(self, train, test):
'''
:params: train, 訓練數據集
:params: test, 測試數據集
:return: 各指標的值
'''
getRecommendation = self.alg[self.rt](train, self.K, self.N)
metric = Metric(train, test, getRecommendation)
return metric.eval()
# 多次實驗取平均
@timmer
def run(self):
metrics = {'Precision': 0, 'Recall': 0, 'Coverage': 0, 'Popularity': 0}
dataset = Dataset(self.fp)
for ii in range(self.M):
train, test = dataset.splitData(self.M, ii)
print('Experiment {}:'.format(ii))
metric = self.worker(train, test)
metrics = {k: metrics[k] + metric[k] for k in metrics}
metrics = {k: metrics[k] / self.M for k in metrics}
print('Average Result (M={}, K={}, N={}): {}'.format(\
self.M, self.K, self.N, metrics))
# 1. random實驗
M, N = 8, 10
K = 0 # 為保持一致而設置,隨便填一個值
random_exp = Experiment(M, K, N, rt='Random')
random_exp.run()
# 2. MostPopular實驗
M, N = 8, 10
K = 0 # 為保持一致而設置,隨便填一個值
mp_exp = Experiment(M, K, N, rt='MostPopular')
mp_exp.run()
# 3. UserCF實驗
M, N = 8, 10
for K in [5, 10, 20, 40, 80, 160]:
cf_exp = Experiment(M, K, N, rt='UserCF')
cf_exp.run()
# 4. UserIIF實驗
M, N = 8, 10
K = 80 # 與書中保持一致
iif_exp = Experiment(M, K, N, rt='UserIIF')
iif_exp.run()