在傳統(tǒng)的推薦模型中契邀,簡(jiǎn)單理解推薦算法召回部分的核心原理無非就是將特征以不同形式進(jìn)行組織苍蔬,并按照距離求解算法計(jì)算用戶或物品間的相似度煌恢,那么在開始之前我們需要先了解一些常用的距離求解算法砾层,這些算法在接下來的源碼實(shí)現(xiàn)部分會(huì)用到
場(chǎng)景 | 算法 | 優(yōu)勢(shì) |
---|---|---|
基于客戶需求的推薦 | 分類模型(邏輯回歸糯景、神經(jīng)網(wǎng)絡(luò)) | 對(duì)當(dāng)下用戶行為進(jìn)行意圖識(shí)別 |
基于購(gòu)物籃的推薦 | 關(guān)聯(lián)規(guī)則 | 提升商品活力枢贿,挖掘用戶潛在購(gòu)買力 |
基于物品相似性的推薦 | 基于Item的協(xié)同過濾 | 分析物品潛在相似性殉农,幫組用戶快速找到想要的商品 |
基于用戶相似性的推薦 | 基于User的協(xié)同過濾、KNN | 形成圈子文化局荚,發(fā)現(xiàn)用戶潛在興趣 |
基于內(nèi)容相似性的推薦 | TFIDF超凳、SVD | 算法可解釋性強(qiáng),為用戶提供更多相似商品的選擇 |
市場(chǎng)細(xì)分 | K-means | 物以類聚耀态、人以群分 |
常用推薦算法有以上幾種轮傍,本節(jié)我們以最經(jīng)典的協(xié)調(diào)過濾進(jìn)行講解:
1、距離算法
同現(xiàn)矩陣
- 計(jì)算公式
- 通過案例說明
N(i)與N(j)分別表示喜歡 i 物品的人數(shù)與喜歡 j 物品的人數(shù)茫陆,上述公式大致意思是求解喜歡物品 i 的人中又同時(shí)喜歡物品 j 的占比是多少金麸,比值越大越能說明兩個(gè)物品的關(guān)聯(lián)度高,那么當(dāng)其它用戶去購(gòu)買物品 i 時(shí)將在很大程度上喜歡物品 j 簿盅;不過需要注意的時(shí)如果物品 j 是一個(gè)熱門物品挥下,那么很多人都會(huì)喜歡物品 j揍魂,極端情況下所有喜歡物品 i 的用戶都喜歡物品 j , 那么計(jì)算出的物品 i 與物品 j 就是高度相似的,為了避免熱門物品的影響棚瘟,在分母上對(duì)熱門物品進(jìn)行了懲罰现斋,當(dāng)N(j)很大時(shí),相識(shí)度就會(huì)很低偎蘸。
另外該方式還有另一個(gè)優(yōu)勢(shì)庄蹋,那就是在計(jì)算相似度時(shí)不需要額外收集評(píng)分?jǐn)?shù)據(jù)
歐幾里得距離
- 計(jì)算公式
- 通過案例說明
該方法用來計(jì)算N維空間內(nèi)兩點(diǎn)之間的距離,當(dāng)N為2時(shí)歐幾里得距離即平面內(nèi)兩點(diǎn)之間的距離迷雪,假設(shè)用戶1與用戶2分別對(duì)物品1限书、物品2、物品3與物品4進(jìn)行的評(píng)價(jià):
那么根據(jù)歐幾里得公式將得到兩個(gè)用戶的相似度為:
- 通過sql實(shí)現(xiàn)歐氏距離
with tb1 as (
SELECT u_i as user_id, pid id, score as rating from tab
) ,
tb2 as
(SELECT u_i as user_id, pid id, score as rating from tab )
select title,recommend,sim from (
select sim_tab.*, c.title as recommend from (
sELECT item_i, item_j, 1/(1+sqrt(sum(item_ij))) sim from (
sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating-tb2.rating, 2)as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id
) as tab group by item_i, item_j
) as sim_tab left join app.eqs_merchandise_model as c on sim_tab.item_j=c.id
) as sim_tab2 left join app.eqs_merchandise_model as c on sim_tab2.item_i=c.id order by item_i,sim desc
- 通過spark實(shí)現(xiàn)
def euclidean2(v1: Vector, v2: Vector): Double = {
require(v1.size == v2.size, s"SimilarityAlgorithms:Vector dimensions do not match: Dim(v1)=${v1.size} and Dim(v2)" +
s"=${v2.size}.")
val x = v1.toArray
val y = v2.toArray
euclidean(x, y)
}
def euclidean(x: Array[Double], y: Array[Double]): Double = {
require(x.length == y.length, s"SimilarityAlgorithms:Array length do not match: Len(x)=${x.length} and Len(y)" +
s"=${y.length}.")
math.sqrt(x.zip(y).map(p => p._1 - p._2).map(d => d * d).sum)
}
def euclidean(v1: Vector, v2: Vector): Double = {
val sqdist = Vectors.sqdist(v1, v2)
math.sqrt(sqdist)
}
余弦距離
- 計(jì)算公式
- 通過案例說明
與歐幾里得距離類似章咧,也是用來求解N維空間內(nèi)兩個(gè)點(diǎn)的相似程度倦西,不同的是余弦相似度計(jì)算的是A、B兩個(gè)點(diǎn)與原點(diǎn)構(gòu)成的夾角赁严,夾角越小相似度越大扰柠;同樣以上面的例子進(jìn)行說明計(jì)算用戶1與用戶2的相似度:
- 通過sql實(shí)現(xiàn)余弦相似度計(jì)算
with tb1 as (
SELECT u_i as user_id, pid id, score as rating from tab
) ,
tb2 as
(SELECT u_i as user_id, pid id, score as rating from tab )
select title,recommend,sim from (
select sim_tab.*, c.title as recommend from (
sELECT item_i, item_j, sum(item_ij)/(sqrt(sum(item_i_pow)) * sqrt(sum(item_j_pow))) sim from (
sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating, 2) as item_i_pow, pow(tb2.rating, 2) as item_j_pow, tb1.rating * tb2.rating as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id
) as tab group by item_i, item_j
) as sim_tab left join app.eqs_merchandise_model as c on sim_tab.item_j=c.id
) as sim_tab2 left join app.eqs_merchandise_model as c on sim_tab2.item_i=c.id order by item_i,sim desc
皮爾遜系數(shù)
- 計(jì)算公式
- 通過案例說明
皮爾遜相關(guān)系數(shù)是一個(gè)介于-1和1之間的數(shù),它度量 兩個(gè)一一對(duì)應(yīng)數(shù)列之間的線性相關(guān)程度疼约。也就是說卤档,它表示兩個(gè)數(shù)列中對(duì)應(yīng)數(shù)字一起增大或者一起減小的可能性。它度量數(shù)字一起按比例改變的傾向性程剥,也就是說兩個(gè)數(shù)列中的數(shù)字存在一個(gè)大致的線性關(guān)系劝枣。當(dāng)該傾向性強(qiáng)時(shí),相關(guān)值趨于1倡缠。當(dāng)相關(guān)性很弱時(shí)哨免,相關(guān)值趨于0。在負(fù)相關(guān)的情況下一個(gè)序列的值很高而另一個(gè)序列的值低昙沦,相關(guān)性就低。假設(shè)用戶3喜歡 物品1 和 物品4载荔,評(píng)分分別為1盾饮、3,那么用戶1與用戶3的皮爾遜系數(shù)為:
盡量皮爾遜系數(shù)在很多推薦算法中進(jìn)行了應(yīng)用懒熙,但是它也有一些明顯的不足之處丘损,例如兩個(gè)看過200部相同的電影的用戶,即便他們給出的評(píng)分偶爾不一致工扎,但可能要比兩個(gè)看過相同兩部且評(píng)分一致的用戶更相似徘钥。
- 通過spark實(shí)現(xiàn)
def pearsonCorrelationSimilarity(arr1: Array[Double], arr2: Array[Double]): Double = {
require(arr1.length == arr2.length, s"SimilarityAlgorithms:Array length do not match: Len(x)=${arr1.length} and Len(y)" +
s"=${arr2.length}.")
val sum_vec1 = arr1.sum
val sum_vec2 = arr2.sum
val square_sum_vec1 = arr1.map(x => x * x).sum
val square_sum_vec2 = arr2.map(x => x * x).sum
val zipVec = arr1.zip(arr2)
val product = zipVec.map(x => x._1 * x._2).sum
val numerator = product - (sum_vec1 * sum_vec2 / arr1.length)
val dominator = math.pow((square_sum_vec1 - math.pow(sum_vec1, 2) / arr1.length) * (square_sum_vec2 - math.pow(sum_vec2, 2) / arr2.length), 0.5)
if (dominator == 0) Double.NaN else numerator / (dominator * 1.0)
}
2、召回算法
協(xié)同過濾
基于用戶的協(xié)同過濾
- 找到和當(dāng)前用戶相近的一批用戶
- 這批用戶看過肢娘,但當(dāng)前用戶沒有看過的商品評(píng)分乘以這個(gè)用戶與當(dāng)前用戶的相似度分值呈础,得到當(dāng)前用戶對(duì)新商品的預(yù)測(cè)分
- 將相同新商品預(yù)測(cè)分進(jìn)行累加
- 新商品列表按照預(yù)測(cè)分倒序排列舆驶,取Top推薦給當(dāng)前用戶
通過案例進(jìn)行說明:
User 1 看過 Item 1 和 Item 2,而 User 3 和 User 4 也看過 Item 1 和 Item 2而钞,那么 User 1 和 User 3沙廉、User 4 就是相似用戶。這樣一來臼节,如果 User 3 和 User 4 還分別看過 Item 3 和 Item 4撬陵,我們就可以將 Item 3 和 Item 4 都推薦給 User 1 了。但是這里有個(gè)問題网缝,我們并沒有預(yù)測(cè)User1對(duì) Item 3 和 Item 4的偏好程度巨税,所以也就不清楚User1更喜歡哪個(gè)。
基于上面的表格粉臊,我們可以把Item1到Item4看做是一個(gè)4維的空間向量垢夹,那么,每個(gè)用戶可以認(rèn)為是這4維空間中的一個(gè)向量维费,每一維的向量值就是當(dāng)前用戶對(duì)該物品的評(píng)分果元。n維空間求解向量相似度就可以用到我們上面說的余弦距離公式了。
分別計(jì)算與User1相似度:
預(yù)測(cè)User1對(duì)新商品Item3犀盟、Item4的偏好程度:
Item 3 的推薦打分是:1 * 0.73=0.73(User3對(duì)Item3 的喜好度 * User3 和 User1 的相似度)
Item 4 的推薦打分是:2 * 0.54 = 1.08(User4對(duì)Item4 的喜好度 * User4 和 User1 的相似度)
基于物品的協(xié)同過濾
- 基于用戶物品評(píng)價(jià)矩陣而晒,計(jì)算物品相似度矩陣
- 將物品相似度矩陣與當(dāng)前用戶的物品評(píng)分矩陣相乘
- 將新的矩陣按照預(yù)測(cè)評(píng)分進(jìn)行倒序排列,取Top推薦給當(dāng)前用戶
通過案例進(jìn)行說明:
為了便于大家理解阅畴,這里將上面的表格進(jìn)行了行列對(duì)換倡怎,這里我們假設(shè)以用戶作為維度,物品作為向量贱枣,求解兩個(gè)物品在n維空間中的相似程度监署。
計(jì)算方式和上面計(jì)算相似用戶的步驟是一致的,從上面表格我們大致可以看出Item3與Item1纽哥、Item2都相似钠乏,且User1曾經(jīng)對(duì)Item1、Item2進(jìn)行了評(píng)分春塌,那么User1對(duì)Item3的偏好程度為:
Item3的推薦打分 = Item3與Item1的相似度 * User1對(duì)Item1的評(píng)分 + Item3與Item2的相似度 * User1對(duì)Item2的評(píng)分
這里我們假設(shè)已完成物品相似矩陣的計(jì)算晓避,那么結(jié)合當(dāng)前用戶的評(píng)分列表就可以求出當(dāng)前用戶對(duì)推薦物品的偏好程度:
協(xié)同算法思考
- 用戶行為發(fā)生時(shí)間距離當(dāng)前時(shí)間越近俏拱,越能反應(yīng)用戶的興趣
- 相近兩個(gè)行為更能反應(yīng)出元素之間的相似性
基于以上假設(shè),如何對(duì)協(xié)同過濾公式進(jìn)行優(yōu)化吼句,增加如下時(shí)間衰減因子
1/(1 + α*(data[u][i][time] - data[v][i][time]))
1/(1 + α*(data[u][i][time] - data[u][j][time]))
3锅必、 基于Spark的實(shí)現(xiàn)
下面通過余弦距離計(jì)算物品間相似度,關(guān)于歐式距離惕艳、同現(xiàn)矩陣等方式大家可以嘗試修改步驟3來實(shí)現(xiàn)
/**
* 余弦相似度矩陣計(jì)算.
* T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i)))
*
* MovieLens 【數(shù)據(jù)地址:https://grouplens.org/datasets/movielens/】(1M搞隐、10M驹愚、20M 共三個(gè)數(shù)據(jù)集)
*/
// 1 數(shù)據(jù)準(zhǔn)備
val user_item_df = spark.read.options(Map(("delimiter",","),("header","true"))).csv("/tmp/ml-latest-small/ratings.csv")
val user_ds1 = user_item_df.groupBy("userId").agg(collect_set(concat_ws(":","movieId","rating")).as("item_set"))
// 2 物品:物品,上三角數(shù)據(jù)
val user_ds2 = user_ds1.flatMap { row =>
val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray.sorted
val result = new ArrayBuffer[(String, String, Double, Double)]()
for (i <- 0 to itemlist.length - 2) {
for (j <- i + 1 to itemlist.length - 1) {
result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
}
}
result
}.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")
// 3 按照距離公式求解相似度
// x*y = ∑x(i)y(i)
// |x|^2 = ∑(x(i)*x(i))
// |y|^2 = ∑(y(i)*y(i))
// result = x*y / sqrt(|x|^2) * sqrt(|y|^2)
val user_ds3 = user_ds2.
withColumn("cnt", lit(1)).
groupBy("itemidI", "itemidJ").
agg(sum(($"scoreI" * $"scoreJ")).as("sum_xy"),
sum(($"scoreI" * $"scoreI")).as("sum_x"),
sum(($"scoreJ" * $"scoreJ")).as("sum_y")).
withColumn("result", $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y")))
// 4 上尔许、下三角合并
val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result").
union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))
val user_prefer_ds2=user_ds8.join(user_item_df, $"itemidI"===$"movieId", "inner")
// 計(jì)算召回的用戶物品得分
val user_prefer_ds3 = user_prefer_ds2.withColumn("score", col("pref") * col("similar")).select("userid", "itemidJ", "score")
// user_prefer_ds3.show()
// 得分匯總
val user_prefer_ds4 = user_prefer_ds3.groupBy("userid", "itemidJ").agg(sum("score").as("score")).withColumnRenamed("itemidJ", "itemid")
// user_prefer_ds4.show()
// 用戶得分排序結(jié)果么鹤,去除用戶已評(píng)分物品
val user_prefer_ds5 = user_prefer_ds4.join(user_prefer_ds1, Seq("userid", "itemid"), "left").where("pref is null")
// user_prefer_ds5.show()
4、 基于python的實(shí)現(xiàn)
import math
user_items = {}
threshold = 5
def process_data(df):
for v in df.itertuples():
user_items.setdefault(v[1], {})
user_items[v[1]].setdefault(v[2], v[-1])
def user_similarity_cos():
W = {}
print(len(user_items.keys()))
c = 0
for user1 in user_items.keys():
#W.setdefault(user1, {})
c += 1
if(c % 2000 == 0):
print(c)
for user2 in user_items.keys():
if user1 == user2:
continue
#W[user1].setdefault(user2, 0)
if user2 in W.keys():
W[user1][user2] = W[user2][user1]
continue
cross_items = user_items[user1].keys() & user_items[user2].keys()
if len(cross_items) < threshold:
continue
#余弦距離
W.setdefault(user1, {})
#W[user1].setdefault(user2, 0)
sum_xy = sum([user_items[user1][v] * user_items[user2][v] for v in cross_items])
W[user1][user2] = sum_xy/(math.sqrt(sum([user_items[user1][v] * user_items[user1][v] for v in cross_items]))*math.sqrt(sum([user_items[user2][v] * user_items[user2][v] for v in cross_items])))
#歐氏距離
#dis = sum([pow(user_items[user1][v] - user_items[user2][v],2) for v in cross_items])
#W[user1][user2] = 1/(1+math.sqrt(dis))
return W
def recommend_cos(user, topN):
res = {}
items = user_items[user].keys()
for u, items_score in user_items.items():
if u == user or u not in user_simi[user].keys():
continue
for i in items_score.keys():
if i not in items:
res.setdefault(i, 0)
res[i] += user_simi[user][u] * items_score[i]
sort_val = dict(sorted(res.items(), key=lambda e: -e[1]))
data = {}
cn = 0
for i, s in sort_val.items():
cn += 1
if cn > topN:
break
data[i] = s
return data
user_simi = user_similarity_cos()
recommend('a38a23d5774a4e57bc8174928bac17d9', 5)
基于spark sql實(shí)現(xiàn)
- 構(gòu)建物品相似度
sql("with tb1 as (SELECT u_i as user_id, pid id, score as rating from tab ) , tb2 as (SELECT u_i as user_id, pid id, score as rating from tab )sELECT item_i, item_j, sum(item_ij)/(sqrt(sum(item_i_pow)) * sqrt(sum(item_j_pow))) sim from (sELECT tb1.id as item_i, tb2.id as item_j, pow(tb1.rating, 2) as item_i_pow, pow(tb2.rating, 2) as item_j_pow, tb1.rating * tb2.rating as item_ij from tb1 LEFT join tb2 on tb1.user_id=tb2.user_id where tb1.id <> tb2.id ) as tab group by item_i, item_j ").createOrReplaceTempView("sim")
- 計(jì)算推薦列表
sql(" select user, item, preference/icn preference,tab.score from(select user, item, sum(preference) preference from( select u_i as user, item_j as item, sum(sim*score) as preference from sim as a left join tab as b on a.item_i == b.pid group by user,item) cc left join tab dd on cc.item ==dd.pid group by user, item ) as c left join tab on tab.u_i==c.user and tab.pid == c.item ").where("score is not null").registerTempTable("rec_tab")