协同过滤(Collaborative Filtering)作为推荐系统的核心技术,其核心思想可以用一个简单的日常场景来理解:假设你和几位口味相似的朋友经常一起看电影,当你想选择一部新电影时,很可能会优先考虑那些朋友喜欢但你还未看过的影片。这就是协同过滤最朴素的思想体现——通过群体智慧来辅助个人决策。
从技术角度看,协同过滤主要分为两大类型:
这种方法的实施步骤非常直观:
在实际应用中,我们首先需要构建用户-物品的交互矩阵。以电影推荐为例,这个矩阵的行代表用户,列代表电影,矩阵中的值则是用户对电影的评分(可以是显式评分,也可以是隐式的点击、观看时长等)。
与用户CF相对应,物品CF的流程是:
这两种方法看似相似,但在实际应用中有着明显的性能差异。用户CF更适合用户数量相对稳定、物品更新频繁的场景(如新闻推荐),而物品CF则在物品数量稳定、用户增长快速的场景(如电商平台)表现更好。
余弦相似度通过测量两个向量夹角的余弦值来评估它们的相似度。其数学表达式为:
sim(u,v) = (∑(r_u,i × r_v,i)) / (√(∑r_u,i²) × √(∑r_v,i²))
这个公式的本质是计算两个用户评分向量的夹角余弦值。当两个用户的评分模式完全一致时(无论绝对值高低),余弦相似度为1;完全相反时为-1;无关时为0。
在实际计算中,我们通常会忽略未评分的项目(即视作0),这被称为"非中心化"余弦相似度。这种处理简单高效,但对评分的绝对值敏感。
皮尔逊相关系数是对余弦相似度的改进,它考虑了用户评分均值的差异:
sim(u,v) = ∑[(r_u,i - r̄_u)(r_v,i - r̄_v)] / [√∑(r_u,i - r̄_u)² × √∑(r_v,i - r̄_v)²]
其中r̄_u和r̄_v分别是用户u和v的平均评分。皮尔逊相关系数的取值范围在-1到1之间,能够更好地捕捉评分模式上的相似性,而忽略评分尺度上的差异。
在实际工程中,相似度计算方法的选择需要考虑以下因素:
提示:在实现时,可以通过采样或维度约简技术来优化大规模相似度矩阵的计算效率。
预测用户u对物品i的评分可以通过以下公式计算:
r̂_u,i = r̄_u + [∑sim(u,v)(r_v,i - r̄_v)] / ∑|sim(u,v)|
这个公式的核心思想是:用相似用户的评分偏差(相对于他们自己的平均分)的加权平均来调整目标用户的平均评分。
类似的,基于物品的预测公式为:
r̂_u,i = r̄_i + [∑sim(i,j)(r_u,j - r̄_j)] / ∑|sim(i,j)|
这里使用的是物品相似度,计算思路与用户版本类似。
得到预测评分后,生成推荐列表的标准流程是:
在实际应用中,我们通常会加入一些业务规则:
典型的协同过滤实现需要以下数据准备步骤:
python复制# 读取原始数据
ratings = pd.read_csv('ratings.csv')
# 构建用户-物品矩阵
user_item_matrix = ratings.pivot_table(
index='user_id',
columns='item_id',
values='rating',
fill_value=0
)
# 数据标准化(可选)
# 用户中心化
user_mean = user_item_matrix.mean(axis=1)
user_item_matrix = user_item_matrix.sub(user_mean, axis=0)
使用scipy的优化实现可以大幅提升计算效率:
python复制from scipy.spatial.distance import cosine
from scipy.sparse import csr_matrix
# 转换为稀疏矩阵节省内存
sparse_matrix = csr_matrix(user_item_matrix.values)
# 计算余弦相似度
def sparse_cosine_similarity(matrix):
similarities = 1 - pairwise_distances(matrix, metric='cosine')
return similarities
user_similarity = sparse_cosine_similarity(sparse_matrix)
item_similarity = sparse_cosine_similarity(sparse_matrix.T)
完整的预测函数实现需要考虑边界条件:
python复制def predict_rating(user_id, item_id, user_sim, user_item, k=20):
if user_id not in user_item.index:
return user_item.mean().mean() # 全局平均
# 获取目标用户与其他用户的相似度
sim_scores = user_sim[user_id]
# 获取对该物品有评分的用户
rated_users = user_item[item_id][user_item[item_id] > 0].index
# 取相似度最高的k个有评分的用户
neighbors = sim_scores[rated_users].nlargest(k)
if len(neighbors) == 0:
return user_item.loc[user_id].mean() # 用户平均
# 计算加权平均
numerator = 0
denominator = 0
for neighbor, sim in neighbors.items():
numerator += sim * (user_item.loc[neighbor, item_id] - user_item.loc[neighbor].mean())
denominator += abs(sim)
if denominator == 0:
return user_item.loc[user_id].mean()
user_mean = user_item.loc[user_id].mean()
return user_mean + numerator / denominator
MovieLens数据集包含:
首先进行探索性分析:
python复制# 加载数据
ratings = pd.read_csv('ml-100k/u.data', sep='\t',
names=['user_id','item_id','rating','timestamp'])
movies = pd.read_csv('ml-100k/u.item', sep='|',
encoding='latin-1', header=None)
# 评分分布分析
plt.figure(figsize=(10,5))
sns.countplot(x='rating', data=ratings)
plt.title('Rating Distribution')
plt.show()
# 用户活跃度分析
user_activity = ratings['user_id'].value_counts()
plt.figure(figsize=(10,5))
sns.histplot(user_activity, bins=50)
plt.title('Number of Ratings per User')
plt.show()
使用surprise库进行快速原型开发:
python复制from surprise import Dataset, KNNBasic
from surprise.model_selection import cross_validate
# 加载数据
data = Dataset.load_builtin('ml-100k')
# 配置算法
sim_options = {
'name': 'pearson',
'user_based': True # 使用用户CF
}
algo = KNNBasic(sim_options=sim_options)
# 交叉验证
cross_validate(algo, data, measures=['RMSE', 'MAE'], cv=5, verbose=True)
对特定用户生成推荐:
python复制# 训练完整模型
trainset = data.build_full_trainset()
algo.fit(trainset)
# 获取用户未评分的电影
user_id = '100'
rated_movies = ratings[ratings['user_id'] == user_id]['item_id']
all_movies = ratings['item_id'].unique()
unrated_movies = [m for m in all_movies if m not in rated_movies]
# 预测评分
predictions = [algo.predict(user_id, m) for m in unrated_movies]
# 获取Top-10推荐
top10 = sorted(predictions, key=lambda x: x.est, reverse=True)[:10]
# 显示推荐结果
for pred in top10:
movie_title = movies[movies[0] == int(pred.iid)][1].values[0]
print(f"{movie_title}: {pred.est:.2f}")
当用户/物品规模达到百万级时,精确计算相似度变得不可行。这时可以使用近似算法:
python复制from sklearn.neighbors import LSHForest
# 使用LSH进行近似最近邻搜索
lshf = LSHForest(n_estimators=20)
lshf.fit(user_item_matrix)
# 查询相似用户
_, indices = lshf.kneighbors(user_item_matrix.iloc[user_id:user_id+1], n_neighbors=50)
similar_users = user_item_matrix.index[indices[0]]
在实际生产环境中,数据是不断变化的,我们需要增量更新模型:
用户相似度矩阵的增量更新:
物品相似度矩阵的增量更新:
使用Spark进行大规模协同过滤:
python复制from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
# 初始化Spark
spark = SparkSession.builder.appName("CF").getOrCreate()
# 加载数据
df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
# 训练ALS模型
als = ALS(
maxIter=5,
regParam=0.01,
userCol="user_id",
itemCol="item_id",
ratingCol="rating",
coldStartStrategy="drop"
)
model = als.fit(df)
# 生成推荐
userRecs = model.recommendForAllUsers(10)
结合协同过滤与内容推荐:
python复制def hybrid_recommend(user_id, user_data, cf_model, cb_model, alpha=0.7):
# 协同过滤推荐
cf_recs = cf_model.recommend(user_id, 20)
# 内容推荐
cb_recs = cb_model.recommend(user_data, 20)
# 混合结果
hybrid = {}
for item, score in cf_recs:
hybrid[item] = score * alpha
for item, score in cb_recs:
if item in hybrid:
hybrid[item] += score * (1 - alpha)
else:
hybrid[item] = score * (1 - alpha)
return sorted(hybrid.items(), key=lambda x: x[1], reverse=True)[:10]
使用神经网络学习用户和物品的嵌入表示:
python复制import tensorflow as tf
from tensorflow.keras.layers import Embedding, Flatten, Dot
# 定义模型
def create_model(num_users, num_items, embedding_size=50):
user_input = tf.keras.Input(shape=(1,))
item_input = tf.keras.Input(shape=(1,))
user_embedding = Embedding(num_users, embedding_size)(user_input)
user_vec = Flatten()(user_embedding)
item_embedding = Embedding(num_items, embedding_size)(item_input)
item_vec = Flatten()(item_embedding)
dot = Dot(axes=1)([user_vec, item_vec])
model = tf.keras.Model(inputs=[user_input, item_input], outputs=dot)
model.compile(optimizer='adam', loss='mse')
return model
# 训练模型
model = create_model(num_users, num_items)
model.fit([train_user, train_item], train_rating, epochs=10)
常用的离线评估指标包括:
准确度指标:
排序指标:
实现示例:
python复制from sklearn.metrics import mean_squared_error
import numpy as np
def rmse(y_true, y_pred):
return np.sqrt(mean_squared_error(y_true, y_pred))
def precision_at_k(y_true, y_pred, k=10):
topk = np.argsort(y_pred)[-k:]
hits = sum(1 for item in topk if item in y_true)
return hits / k
设计AB测试的关键点:
示例实现:
python复制import scipy.stats as stats
def ab_test(control_metrics, treatment_metrics):
# 计算均值差异
control_mean = np.mean(control_metrics)
treatment_mean = np.mean(treatment_metrics)
# 计算p-value
_, p_value = stats.ttest_ind(control_metrics, treatment_metrics)
# 计算提升比例
lift = (treatment_mean - control_mean) / control_mean
return {
'control_mean': control_mean,
'treatment_mean': treatment_mean,
'lift': lift,
'p_value': p_value,
'significant': p_value < 0.05
}
解决方案:
应对策略:
提升多样性的方法:
将用户-物品交互建模为图结构,利用GNN捕捉高阶关系:
python复制import torch
import torch_geometric
class GNNRec(torch.nn.Module):
def __init__(self, num_users, num_items, hidden_dim):
super().__init__()
self.user_emb = torch.nn.Embedding(num_users, hidden_dim)
self.item_emb = torch.nn.Embedding(num_items, hidden_dim)
self.conv1 = torch_geometric.nn.GCNConv(hidden_dim, hidden_dim)
self.conv2 = torch_geometric.nn.GCNConv(hidden_dim, hidden_dim)
def forward(self, edge_index):
user_emb = self.user_emb.weight
item_emb = self.item_emb.weight
x = torch.cat([user_emb, item_emb], dim=0)
x = self.conv1(x, edge_index)
x = torch.relu(x)
x = self.conv2(x, edge_index)
return x
将推荐过程建模为马尔可夫决策过程,使用强化学习优化长期收益:
python复制import gym
from stable_baselines3 import PPO
class RecEnv(gym.Env):
def __init__(self, user_data, item_data):
super().__init__()
# 初始化环境
def step(self, action):
# 执行推荐动作
# 返回观察、奖励、是否终止、额外信息
pass
def reset(self):
# 重置环境状态
pass
# 训练RL推荐智能体
env = RecEnv(user_data, item_data)
model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)
整合文本、图像、视频等多模态信息:
python复制import transformers
class MultiModalRec(torch.nn.Module):
def __init__(self):
super().__init__()
self.text_encoder = transformers.BertModel.from_pretrained('bert-base-uncased')
self.image_encoder = torchvision.models.resnet50(pretrained=True)
self.fusion = torch.nn.Linear(768+2048, 256)
def forward(self, text, image):
text_emb = self.text_encoder(**text).last_hidden_state.mean(dim=1)
image_emb = self.image_encoder(image)
joint_emb = torch.cat([text_emb, image_emb], dim=1)
return self.fusion(joint_emb)
协同过滤作为推荐系统的基石技术,虽然已有数十年历史,但仍在不断进化。从最初的简单矩阵分解,到如今与深度学习、强化学习等前沿技术的融合,协同过滤始终保持着强大的生命力。理解其核心原理并掌握现代实现方法,对于构建高效的推荐系统至关重要。