思維導(dǎo)圖
系列總目錄
推薦系統(tǒng)簡介
1. 推薦系統(tǒng)分類
1.1 根據(jù)實(shí)時性分類
- 離線推薦
- 實(shí)時推薦
1.2 推薦原則分類
- 基于相似度的推薦
- 基于知識的推薦
- 基于模型的推薦
1.3 是否個性化分類
- 基于統(tǒng)計(jì)的推薦
- 個性化推薦
1.4 數(shù)據(jù)源分類
- 基于人口統(tǒng)計(jì)學(xué)的推薦
- 基于內(nèi)容的推薦
- 基于協(xié)同過濾的推薦
2. 推薦系統(tǒng)算法
2.1 基于人口學(xué)統(tǒng)計(jì)
-
判斷用戶a的年齡段跟用戶c相似辜王,而用戶a喜歡物品A情龄,那給用戶c推薦物品A
2.2 基于內(nèi)容推薦
-
電影A和電影C的類型都是愛情宰掉,動作判斷這兩個電影相似逞壁,那用戶a喜歡電影A則給用戶a推薦電影C
2.3 基于協(xié)同過濾
- 基于內(nèi)容推薦主要是利于用戶評價過的物品特征故痊,而協(xié)同過濾還可以根據(jù)其他用戶的評價。協(xié)同過濾的優(yōu)勢在于不受限與內(nèi)容質(zhì)量判斷,當(dāng)物品內(nèi)容難以獲得,協(xié)同過濾還是有用武之地
-
基于近鄰的協(xié)同過濾
1.1 基于用戶協(xié)同過濾(User-CF)
1.2 基于物品協(xié)同過濾(Item-CF)
基于模型的協(xié)同過濾
2.1 奇異值分解(SVD)
2.2 潛在語義分析(LSA)
2.3 支撐向量機(jī)(SVM)
2.4 混合推薦
- 實(shí)際網(wǎng)站的推薦系統(tǒng)往往都不是單純只采用了某一種推薦的機(jī)制和策略伍茄,往往是將多個方法混合在一起,從而達(dá)到更好的推薦效果
機(jī)器學(xué)習(xí)基礎(chǔ)
1. 定義
- 機(jī)器學(xué)習(xí)是一門人工智能的科學(xué)施逾,該領(lǐng)域的主要研究對象是人工智能敷矫,特別是如何在經(jīng)驗(yàn)學(xué)習(xí)中改善具體算法的性能
- 機(jī)器學(xué)習(xí)是對能通過經(jīng)驗(yàn)自動改進(jìn)的計(jì)算機(jī)算法的研究
- 機(jī)器學(xué)習(xí)是用數(shù)據(jù)或以往的經(jīng)驗(yàn),以此優(yōu)化計(jì)算機(jī)程序的性能標(biāo)準(zhǔn)
- 深度學(xué)習(xí)是機(jī)器學(xué)習(xí)的一個重要分支汉额,推薦算法是機(jī)器學(xué)習(xí)的一個重要應(yīng)用
2. 過程
3.1 分類--有監(jiān)督學(xué)習(xí)
- 有監(jiān)督學(xué)習(xí)是提供數(shù)據(jù)并提供相應(yīng)結(jié)果的機(jī)器學(xué)習(xí)過程
3.1.1 分類算法
- 輸出被限制有限的離散數(shù)值曹仗,比如根據(jù)房屋特性判斷某個房屋是否出售,這里是否就是離散數(shù)組
- 分類模型包含分類學(xué)習(xí)過程蠕搜,學(xué)習(xí)過程利用學(xué)習(xí)方法學(xué)習(xí)一個分類器怎茫,分類過程是對已獲取的分類器對新輸入的進(jìn)行分類。分類器性能評估:
- 召回率: 所有實(shí)際為正類的數(shù)據(jù)中讥脐,被正確預(yù)測找出的比例
- 精確率: 所有實(shí)際為正類(關(guān)注的類)的數(shù)據(jù)中遭居,預(yù)測正確比例
3.1.2 分類模型
- 輸出被限制連續(xù)數(shù)值啼器,比如根據(jù)房屋特性判斷某個房屋出售概率
- KNN
- 決策樹
- 邏輯斯蒂回歸
3.1.3 回歸算法
- 輸出連續(xù)的數(shù)值旬渠,比如根據(jù)房屋特性判斷某個房屋出售概率,這里概率就是連續(xù)數(shù)值
3.1.4 回歸模型算法
- 輸入到輸出的映射函數(shù)
- 線性回歸
- 非線性回歸
- 最小二乘法:推薦算法中ALS介紹了最小二乘法端壳,后面篇幅會有具體代碼舉例
3.1.5 監(jiān)督學(xué)習(xí)模型評估策略
- 監(jiān)督學(xué)習(xí)三要素: 選出合適的模型告丢,選出合適的評價模型策略,選出具體算法
- 模型:總結(jié)內(nèi)在規(guī)律用數(shù)學(xué)函數(shù)表示
1.1 模型評估
1.1.1 損失函數(shù): 用來衡量模型測誤差大小损谦,評價損失就叫經(jīng)驗(yàn)風(fēng)險
1.1.2 過擬合和欠擬合: 欠擬合可能是訓(xùn)練集太小岖免,沒有很好的捕獲到信息岳颇,比如誤認(rèn)為綠色的就是樹葉。 過擬合就是學(xué)習(xí)的太徹底颅湘,把很多噪聲都學(xué)進(jìn)去了话侧,比如誤認(rèn)為樹葉必要是橢圓形的。 所以模型的復(fù)雜度要適中 - 算法: 選取最優(yōu)模型的具體方法
- 策略:選取最優(yōu)模型評價準(zhǔn)則
- 一般步驟
- 得到一個有限的數(shù)據(jù)集合闯参,確定包含所有學(xué)習(xí)模型集合
- 確定模型選擇準(zhǔn)則----學(xué)習(xí)策略
- 實(shí)現(xiàn)求解最優(yōu)模型算法---學(xué)習(xí)算法
- 通過學(xué)習(xí)算法選擇最優(yōu)模型瞻鹏,得到最優(yōu)模型進(jìn)行預(yù)測分析
3.1.6 反饋常見處理
3.2 分類--無監(jiān)督學(xué)習(xí)
- 無監(jiān)督學(xué)習(xí)是提供數(shù)據(jù)不提供相應(yīng)結(jié)果的機(jī)器學(xué)習(xí)過程,核心應(yīng)該是密度估計(jì)和聚類分析鹿寨,比如google新聞內(nèi)容會分組新博,按照不同主題呈現(xiàn)給用戶
- 有監(jiān)督學(xué)習(xí)使用的是有類別的數(shù)據(jù)
- 除了聚類還有降維, 需要使用降維的原因是數(shù)據(jù)壓縮,數(shù)據(jù)壓縮不僅可以對數(shù)據(jù)進(jìn)行壓縮脚草,使得數(shù)據(jù)占用較少的內(nèi)存或硬盤空間赫悄,還能對學(xué)習(xí)算法進(jìn)行加速
聚類模型-k均值
- 如果能對顧客數(shù)據(jù)(消費(fèi)金額及購物時間段等)進(jìn)行聚類,那么輸出的類別將是家庭主婦或者上班族等馏慨,顧客將被表示為不同的類別埂淮,這樣就可以針對不同的類別實(shí)施不同的銷售策略
- 實(shí)現(xiàn)步驟
-
原始數(shù)據(jù)集合
-
選擇K個隨機(jī)的點(diǎn),稱為聚類中心(cluster centroids)熏纯,K就是“K-均值”中的K同诫,表示的是樣本要進(jìn)行分類的數(shù)目,在本例中K=2樟澜。我們隨機(jī)地選擇連個聚類中心误窖,分別用紅色的叉和藍(lán)色的叉表示
-
對于數(shù)據(jù)集中的每一個數(shù)據(jù),按照距離聚類中心點(diǎn)的距離秩贰,將其與距離最近的中心點(diǎn)關(guān)聯(lián)起來霹俺,組成一個類。如下圖所示毒费,與紅色的聚類中心距離近的點(diǎn)被分為紅色的類丙唧,與藍(lán)色的聚類中心距離近的點(diǎn)被分為藍(lán)色的類
3.計(jì)算每一個類中樣本的平均值,將該類的聚類中心移動到平均值的位置觅玻。如下圖所示想际,聚類中心進(jìn)行了相應(yīng)的移動
4.重復(fù)步驟②,將樣本進(jìn)行重新分類溪厘,如下圖所示:
5.重復(fù)步驟③胡本,再次移動聚類中心
-
重復(fù)步驟②,將樣本進(jìn)行重分類
- 依次類推畸悬,重復(fù)步驟②③侧甫,一直迭代,直到聚類中心不在變化
3.3 推薦協(xié)同過濾推薦
- 基于內(nèi)容推薦主要是用戶評價過的物品特征,協(xié)同推薦還可以利用其它用戶評價過的物品內(nèi)容
- 基于協(xié)同推薦解決物品內(nèi)容難以獲得披粟,可以基于其它用戶推薦
- 協(xié)同推薦可以推薦差異很大的物品咒锻,但是又有內(nèi)在聯(lián)系
3.3.1 基于近鄰協(xié)同推薦(類似分類,是否推薦)
- 基于用戶協(xié)同: 根據(jù)所有用戶對物品偏好守屉,找出口味相同鄰居惑艇,并根據(jù)近鄰?fù)扑]拇泛,可以用K-近鄰算法,基于K個鄰居推薦
- 基于用戶推薦與人口統(tǒng)計(jì)差別:基于人口統(tǒng)計(jì)學(xué)只考慮用戶本身特性兢卵,基于用戶協(xié)同過濾考慮用戶歷史偏好
- 基于物品協(xié)同推薦,類似基于用戶協(xié)同過濾秽荤,使用所有用戶對物品偏好柠横,發(fā)現(xiàn)物品與物品相似度窃款,根據(jù)用戶歷史偏好進(jìn)行推薦
- 基于物品協(xié)同推薦與基于內(nèi)容推薦差別: 基于內(nèi)容推薦是基于物品特征本身,協(xié)同推薦則會考慮歷史
- 使用場景: 基于物品協(xié)同場景是在web站點(diǎn)中晨继,物品遠(yuǎn)小于用戶紊扬,所以物品會穩(wěn)定些唉擂。基于用戶協(xié)同: 新聞推薦中新聞數(shù)量大于用戶數(shù)量腹缩,這時候用戶會穩(wěn)定些
- knn算法舉例:
-
首先獲取訓(xùn)練集藏鹊,每行數(shù)據(jù)包含多個特征和分類標(biāo)簽
-
輸入沒有標(biāo)簽但有多個特征的新數(shù)據(jù)
-
將新數(shù)據(jù)的每個特征與樣本中每條數(shù)據(jù)對應(yīng)的特征進(jìn)行比較盘寡,然后提取出樣本中與新數(shù)據(jù)最相似的K條數(shù)據(jù)
- 如何得到相似度:可以用歐式距離求解, 將上述訓(xùn)練集中的數(shù)據(jù)特征用來對應(yīng)A或B的坐標(biāo)宴抚,即大眼睛甫煞、高鼻梁、細(xì)腰常潮、... 對應(yīng) a1,a2喊式。萧朝。。献联,大眼睛何址、高鼻梁都是字符用爪,這怎么進(jìn)行計(jì)算呢? 將字符型數(shù)據(jù)轉(zhuǎn)化為數(shù)值型數(shù)據(jù)以及其它對數(shù)據(jù)的預(yù)處理操作也是機(jī)器學(xué)習(xí)中的關(guān)鍵步驟,可以將眼睛的大小級別設(shè)為1诸衔,2颇玷,3個等級亚隙,3表示為大眼睛,1表示為小眼睛诊霹,鼻梁渣淳、身高等特征同理
3.3.2 基于模型協(xié)同推薦(類似回歸)
- 基于樣本的用戶偏好入愧,訓(xùn)練一個推薦模型,根據(jù)用戶實(shí)時偏好怔蚌,進(jìn)行新物品預(yù)測,計(jì)算得分
- 與基于近鄰區(qū)別: 基于近鄰是使用已有用戶偏好椅野,通過近鄰數(shù)據(jù)預(yù)測對新數(shù)據(jù)偏好竟闪,類似分類杖狼〉基于模型是訓(xùn)練模型,根據(jù)模型預(yù)測暗挑,類似回歸
- 訓(xùn)練模型可以使用LFM訓(xùn)練隱語義: 協(xié)同過濾非常依賴歷史數(shù)據(jù)炸裆,而歷史數(shù)據(jù)一般是稀疏的鲜屏,這就需要降維處理洛史,分解矩陣之后得到用戶和物品的隱藏特征
-
矩陣因式分解,分解出需要訓(xùn)練的模型土思,得到LFM, 并得到隱藏特征f1 f2忆嗜。捆毫。绩卤。江醇, SPARK ML中有ALS算法解決模型的數(shù)據(jù)表達(dá)陶夜,里面加入平方損失函數(shù)赖晶,并加入正則化遏插,防止過擬合
- 模型的求解: 最小交替二乘法ALS或者隨機(jī)梯度下降算法
- ALS:由于模擬的矩陣P Q都位置那就先固定一個P0, 通過損失函數(shù)求出Q胳嘲,這是典型的最小二乘法問題扣草,然后反過來固定Q0求出P辰妙,如此交替直到達(dá)到誤差滿足閾值密浑,后面篇幅會有具體代碼舉例
電影推薦項(xiàng)目
- 該項(xiàng)目是尚硅谷的尚硅谷機(jī)器學(xué)習(xí)和推薦系統(tǒng)項(xiàng)目實(shí)戰(zhàn)教程(初學(xué)者零基礎(chǔ)快速入門), 資料可以在blibli對應(yīng)評論里面找打, 包含項(xiàng)目代碼街图,資料等
1. 架構(gòu)
1.1 大數(shù)據(jù)處理流程
- 1.1.1 實(shí)時計(jì)算
- 用戶接口: 網(wǎng)站或者APP, 前端可以通過埋點(diǎn)產(chǎn)生數(shù)據(jù)
- 后端服務(wù)器: SpringBoot項(xiàng)目懒构,通過打log形式產(chǎn)生數(shù)據(jù)
- 日志文件: 后端服務(wù)器集群部署胆剧,所以有可能一個后端服務(wù)有多份日志文件
- 日志采集:每收集一分鐘秩霍,或者一點(diǎn)數(shù)據(jù)就放入文件,然后可以轉(zhuǎn)移到flume中辕近,或者直接通過定制api打入flume中移宅,可以配置flume寫入kafka中
- 數(shù)據(jù)總線: 通常由kafka等來的消息漏峰,實(shí)時數(shù)據(jù),實(shí)時log, 寫入kafka, 再由Flink等實(shí)時處理讀取
- 實(shí)時計(jì)算: Flink等倔喂,可以封裝大量業(yè)務(wù)靖苇,甚至進(jìn)行機(jī)器學(xué)習(xí)贤壁,智能推薦等
- 數(shù)據(jù)存儲:計(jì)算完存儲進(jìn)數(shù)據(jù)庫
-
數(shù)據(jù)可視化:大屏展示等
- 1.1.2 離線數(shù)倉
- 用戶接口: 網(wǎng)站或者APP, 前端可以通過埋點(diǎn)產(chǎn)生數(shù)據(jù)
- 后端服務(wù)器: SpringBoot項(xiàng)目脾拆,通過打log形式產(chǎn)生數(shù)據(jù)
- 日志文件: 后端服務(wù)器集群部署名船,所以有可能一個后端服務(wù)有多份日志文件
- 日志采集:可以用python腳本渠驼,或者自己寫java服務(wù),利用定時任務(wù)疯趟,將當(dāng)天所有數(shù)據(jù)采集起來谋梭,用文件日志轉(zhuǎn)移到flume agent監(jiān)控的目錄瓮床,然后flume agent可以sink到HDFS, flume agent是單個jvm進(jìn)程
- 日志存儲: 可以存儲在Hadoop上用于大數(shù)據(jù)分析
- 日志清洗: 可以用Azkaban來進(jìn)行定時調(diào)度隘庄,可以用corn定時工具調(diào)度丑掺,將HDFS文件寫入另外個HDFS文件
- 數(shù)據(jù)加載: 將清洗后的HDFS文件放入HIVE表中,HIVE表分區(qū)兼丰,每個分區(qū)存一天數(shù)據(jù)
- 數(shù)據(jù)倉庫: 數(shù)倉處理
- 數(shù)據(jù)計(jì)算: Spark計(jì)算
- 數(shù)據(jù)存儲:計(jì)算完存儲進(jìn)數(shù)據(jù)庫
-
數(shù)據(jù)可視化:大屏展示等
1.2 系統(tǒng)模塊設(shè)計(jì)
-
項(xiàng)目分為 實(shí)時推薦服務(wù)鳍征,離線推薦服務(wù)艳丛,離線統(tǒng)計(jì)服務(wù)氮双,內(nèi)容檢索服務(wù)其中各個服務(wù)細(xì)分又分為基于內(nèi)容,基于協(xié)同砰粹,基于模型的推薦
1.3 項(xiàng)目系統(tǒng)架構(gòu)
- 離線部分: Azkaban調(diào)度系統(tǒng)將每日的HDF定時S進(jìn)行清洗加載,Spark離線統(tǒng)計(jì)服務(wù)進(jìn)行離線統(tǒng)計(jì)計(jì)算饭入,Spark Ml lib機(jī)器學(xué)習(xí)中ALS實(shí)現(xiàn)離線推薦
- 在線部分: 從綜合業(yè)務(wù)服務(wù)谐丢,一般是spring boot服務(wù)乾忱,使用flume采集到kafka, 使用spark stream(Flink可以)進(jìn)行實(shí)時推薦來補(bǔ)充離線推薦的信息滯后性
-
近線部分: 離線窄瘟,在線都有可能直接寫業(yè)務(wù)數(shù)據(jù)庫趟卸,綜合業(yè)務(wù)服務(wù)可以從業(yè)務(wù)數(shù)據(jù)庫锄列,ES邻邮,redis讀取數(shù)據(jù)
2. 統(tǒng)計(jì)推薦
-
Azkaban定時調(diào)度筒严,更新電影均分,個數(shù)等數(shù)據(jù)到mongodb
3. 離線推薦
- 用ALS算法訓(xùn)練隱語義模型紫岩,對應(yīng)機(jī)器學(xué)習(xí)有監(jiān)督學(xué)習(xí)中回歸模型算法泉蝌,當(dāng)然新注冊用戶可能會有冷啟動問題勋陪,可以讓用戶自己填標(biāo)簽然后推薦
- 計(jì)算用戶推薦矩陣
- 計(jì)算電影相似度矩陣
-
也是Azkaban定時調(diào)度
代碼實(shí)戰(zhàn)
- 由于本人是JAVA出身诅愚,實(shí)例代碼是Scala語言劫映,很多地方打了Debug才知道具體含義泳赋,這里可以多Debug下祖今,里面的集合.collect下可以得到結(jié)果
- 具體步驟:
- Azkaban定時調(diào)度
- 創(chuàng)建一個SparkSession
- 從mongodb加載數(shù)據(jù)
- 從rating數(shù)據(jù)中提取所有的uid和mid千诬,并去重
- 訓(xùn)練隱語義模型, 使用spark ml lib的ALS算法
- 基于用戶和電影的隱特征,計(jì)算預(yù)測評分邪驮,得到用戶的推薦列表耕捞,計(jì)算user和movie的笛卡爾積俺抽,得到一個空評分矩陣
- 調(diào)用model的predict方法預(yù)測評分
- 過濾出評分大于0的項(xiàng)
- 基于電影隱特征磷斧,計(jì)算相似度矩陣,得到電影的相似度列表
- 對所有電影兩兩計(jì)算它們的相似度冕末,先做笛卡爾積
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 基于評分?jǐn)?shù)據(jù)的LFM档桃,只需要rating數(shù)據(jù)
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int )
case class MongoConfig(uri:String, db:String)
// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )
// 定義基于預(yù)測評分的用戶推薦列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )
// 定義基于LFM電影特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. Azkaban定時調(diào)度
object OfflineRecommender {
// 定義表名和常量
val MONGODB_RATING_COLLECTION = "Rating"
val USER_RECS = "UserRecs"
val MOVIE_RECS = "MovieRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
println("Start")
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 2. 創(chuàng)建一個SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 3. 從mongodb加載數(shù)據(jù)
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating]
.rdd
.map( rating => ( rating.uid, rating.mid, rating.score ) ) // 轉(zhuǎn)化成rdd藻肄,并且去掉時間戳
.cache()
// 4. 從rating數(shù)據(jù)中提取所有的uid和mid嘹屯,并去重
val userRDD = ratingRDD.map(_._1).distinct()
val movieRDD = ratingRDD.map(_._2).distinct()
// 5. 訓(xùn)練隱語義模型, 使用spark ml lib的ALS算法
val trainData = ratingRDD.map( x => Rating(x._1, x._2, x._3) )
val (rank, iterations, lambda) = (200, 5, 0.1)
val model = ALS.train(trainData, rank, iterations, lambda)
// 6. 基于用戶和電影的隱特征州弟,計(jì)算預(yù)測評分婆翔,得到用戶的推薦列表浙滤,計(jì)算user和movie的笛卡爾積气堕,得到一個空評分矩陣
val userMovies = userRDD.cartesian(movieRDD)
// 7. 調(diào)用model的predict方法預(yù)測評分
val preRatings = model.predict(userMovies)
// 8. 過濾出評分大于0的項(xiàng)
val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => ( rating.user, (rating.product, rating.rating) ) )
.groupByKey()
.map{
case (uid, recs) => UserRecs( uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1, x._2)) )
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
// 9. 基于電影隱特征,計(jì)算相似度矩陣誓沸,得到電影的相似度列表
val movieFeatures = model.productFeatures.map{
case (mid, features) => (mid, new DoubleMatrix(features))
}
// 10. 對所有電影兩兩計(jì)算它們的相似度拜隧,先做笛卡爾積
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter{
// 把自己跟自己的配對過濾掉
case (a, b) => a._1 != b._1
}
.map{
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
( a._1, ( b._1, simScore ) )
}
}
// 過濾出相似度大于0.6的
.filter(_._2._2 > 0.6)
.groupByKey()
.map{
case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
println("Over")
}
// 求向量余弦相似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}
}
4. 實(shí)時推薦
- 計(jì)算速度要快垦页,結(jié)果可以不是特別精確干奢,有預(yù)先設(shè)計(jì)好的模型
-
日志觸發(fā)實(shí)時更新,F(xiàn)lume將日志數(shù)據(jù)寫到Kafka辕羽,Spark Stream或者Flink訂閱Kafka的topic, 然后實(shí)時推薦從redis讀取數(shù)據(jù)刁愿,并將結(jié)果寫入Mongodb, 實(shí)時推薦的服務(wù)部署可以類似Flink部署
- 基本原理: 用戶最近的口味是相同的
- 代碼實(shí)踐酌毡,假如用戶對某電影標(biāo)記喜歡: 步驟
- 部署可以用實(shí)時計(jì)算那一套
- 創(chuàng)建一個SparkSession
- 拿到streaming context枷踏,當(dāng)然也可以用Flink
- 從Mongodb加載電影相似度矩陣數(shù)據(jù)掰曾,把它廣播出去
- 定義kafka連接參數(shù)
6.通過kafka創(chuàng)建一個DStream - 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流
- 繼續(xù)做流式處理旷坦,核心實(shí)時算法部分
8.1 從redis里獲取當(dāng)前用戶最近的K次評分秒梅,保存成Array[(mid, score)]
8.2 從相似度矩陣中取出當(dāng)前電影最相似的N個電影捆蜀,作為備選列表辆它,Array[mid],數(shù)據(jù)來源于離線推薦計(jì)算的相似度
8.3 對每個備選電影呢蔫,計(jì)算推薦優(yōu)先級片吊,得到當(dāng)前用戶的實(shí)時推薦列表俏脊,Array[(mid, score)]联予,computeMovieScores有具體說明,拿到備選電影和最近評分電影的相似度就可以過濾了季眷,獲取兩個電影之間的相似度也是根據(jù)離線結(jié)果來的
8.4 把推薦數(shù)據(jù)保存到mongodb
9.開始接收和處理數(shù)據(jù)
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import kafka.Kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 定義連接助手對象子刮,序列化
object ConnHelper extends Serializable{
lazy val jedis = new Jedis("localhost")
lazy val mongoClient = MongoClient( MongoClientURI("mongodb://localhost:27017/recommender") )
}
case class MongoConfig(uri:String, db:String)
// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )
// 定義基于預(yù)測評分的用戶推薦列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )
// 定義基于LFM電影特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. 部署可以用實(shí)時計(jì)算那一套
object StreamingRecommender {
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_MOVIES_NUM = 20
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"kafka.topic" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
// 2. 創(chuàng)建一個SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 3. 拿到streaming context
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(2)) // batch duration
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 4. 加載電影相似度矩陣數(shù)據(jù),把它廣播出去
val simMovieMatrix = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRecs]
.rdd
.map{ movieRecs => // 為了查詢相似度方便橱赠,轉(zhuǎn)換成map
(movieRecs.mid, movieRecs.recs.map( x=> (x.mid, x.score) ).toMap )
}.collectAsMap()
val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)
// 5. 定義kafka連接參數(shù)
val kafkaParam = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "recommender",
"auto.offset.reset" -> "latest"
)
// 6.通過kafka創(chuàng)建一個DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
)
// 7. 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流
val ratingStream = kafkaStream.map{
msg =>
val attr = msg.value().split("\\|")
( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
}
// 8. 繼續(xù)做流式處理狭姨,核心實(shí)時算法部分
ratingStream.foreachRDD{
rdds => rdds.foreach{
case (uid, mid, score, timestamp) => {
println("rating data coming! >>>>>>>>>>>>>>>>")
// 8.1 從redis里獲取當(dāng)前用戶最近的K次評分饼拍,保存成Array[(mid, score)]
val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
// 8.2 從相似度矩陣中取出當(dāng)前電影最相似的N個電影师抄,作為備選列表叨吮,Array[mid]
val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
// 8.3 對每個備選電影挤安,計(jì)算推薦優(yōu)先級丧鸯,得到當(dāng)前用戶的實(shí)時推薦列表丛肢,Array[(mid, score)]
val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
// 8.4 把推薦數(shù)據(jù)保存到mongodb
saveDataToMongoDB( uid, streamRecs )
}
}
}
// 9.開始接收和處理數(shù)據(jù)
ssc.start()
println(">>>>>>>>>>>>>>> streaming started!")
ssc.awaitTermination()
}
// redis操作返回的是java類蜂怎,為了用map操作需要引入轉(zhuǎn)換類
import scala.collection.JavaConversions._
def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
// 從redis讀取數(shù)據(jù)杠步,用戶評分?jǐn)?shù)據(jù)保存在 uid:UID 為key的隊(duì)列里幽歼,value是 MID:SCORE
jedis.lrange("uid:" + uid, 0, num-1)
.map{
item => // 具體每個評分又是以冒號分隔的兩個值
val attr = item.split("\\:")
( attr(0).trim.toInt, attr(1).trim.toDouble )
}
.toArray
}
/**
* 獲取跟當(dāng)前電影做相似的num個電影,作為備選電影
* @param num 相似電影的數(shù)量
* @param mid 當(dāng)前電影ID
* @param uid 當(dāng)前評分用戶ID
* @param simMovies 相似度矩陣
* @return 過濾之后的備選電影列表
*/
def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
(implicit mongoConfig: MongoConfig): Array[Int] ={
// 1. 從相似度矩陣中拿到所有相似的電影
val allSimMovies = simMovies(mid).toArray
// 2. 從mongodb中查詢用戶已看過的電影
val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
.find( MongoDBObject("uid" -> uid) )
.toArray
.map{
item => item.get("mid").toString.toInt
}
// 3. 把看過的過濾,得到輸出列表
allSimMovies.filter( x=> ! ratingExist.contains(x._1) )
.sortWith(_._2>_._2)
.take(num)
.map(x=>x._1)
}
def computeMovieScores(candidateMovies: Array[Int],
userRecentlyRatings: Array[(Int, Double)],
simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
// 定義一個ArrayBuffer诬烹,用于保存每一個備選電影的基礎(chǔ)得分
val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
// 定義一個HashMap绞吁,保存每一個備選電影的增強(qiáng)減弱因子
val increMap = scala.collection.mutable.HashMap[Int, Int]()
val decreMap = scala.collection.mutable.HashMap[Int, Int]()
for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
// 拿到備選電影和最近評分電影的相似度
val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
if(simScore > 0.7){
// 計(jì)算備選電影的基礎(chǔ)推薦得分
scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
if( userRecentlyRating._2 > 3 ){
increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
} else{
decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
}
}
}
// 根據(jù)備選電影的mid做groupby掀泳,根據(jù)公式去求最后的推薦評分
scores.groupBy(_._1).map{
// groupBy之后得到的數(shù)據(jù) Map( mid -> ArrayBuffer[(mid, score)] )
case (mid, scoreList) =>
( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
}.toArray.sortWith(_._2>_._2)
}
// 獲取兩個電影之間的相似度
def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,
scala.collection.immutable.Map[Int, Double]]): Double ={
simMovies.get(mid1) match {
case Some(sims) => sims.get(mid2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
// 求一個數(shù)的對數(shù)员舵,利用換底公式藕畔,底數(shù)默認(rèn)為10
def log(m: Int): Double ={
val N = 10
math.log(m)/ math.log(N)
}
def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
// 定義到StreamRecs表的連接
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
// 如果表中已有uid對應(yīng)的數(shù)據(jù)注服,則刪除
streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
// 將streamRecs數(shù)據(jù)存入表中
streamRecsCollection.insert( MongoDBObject( "uid"->uid,
"recs"-> streamRecs.map(x=>MongoDBObject( "mid"->x._1, "score"->x._2 )) ) )
}
}
5. 基于內(nèi)容推薦
- 當(dāng)然還有混合推薦溶弟,混合推薦就是把種推薦加權(quán)處理辜御,基于統(tǒng)計(jì)推薦 + 基于離線推薦 + 基于實(shí)時推薦 + 基于內(nèi)容推薦
- 電影A的相似電影:有相同標(biāo)簽就可以定義為相似擒权,這個需要定義好標(biāo)簽碳抄,可以用戶自定義畫像等
- 基于UGC的特征提取: 可以用TF-IDF剖效,加入TF-IDF是為了防止熱門標(biāo)簽對推薦結(jié)果影響焰盗,步驟, 可以定時調(diào)度
- 適用于用戶喜歡某個電影姨谷,然后找出電影相似度最高的幾個梦湘,推薦給用戶
- 創(chuàng)建一個SparkSession
- 加載數(shù)據(jù)捌议,并作預(yù)處理
- 核心部分: 用TF-IDF從內(nèi)容信息中提取電影特征向量瓣颅,創(chuàng)建一個分詞器譬正,默認(rèn)按空格分詞曾我,這里按照演員分詞
- 用分詞器對原始數(shù)據(jù)做轉(zhuǎn)換抒巢,生成新的一列words, 現(xiàn)在對象里面有四個"mid", "name", "genres", "words"是按照空格分詞
- 引入HashingTF工具蛉谜,可以把一個詞語序列轉(zhuǎn)化成對應(yīng)的詞頻
- 引入IDF工具,可以得到idf模型, 現(xiàn)在對象里面再加了一列叫rawFeatures客燕,把一個詞語genres序列轉(zhuǎn)化成對應(yīng)的詞頻
- 訓(xùn)練idf模型也搓,得到每個詞的逆文檔頻率
- 用模型對原數(shù)據(jù)進(jìn)行處理还绘,得到文檔中每個詞的tf-idf,作為新的特征向量, 再加了一列features得到逆詞頻
- 對所有電影兩兩計(jì)算它們的相似度塘幅,先做笛卡爾積
10.1 把自己跟自己的配對過濾掉, _1是mid即是電影序號, _2是特征值
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
// 需要的數(shù)據(jù)源是電影內(nèi)容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class MongoConfig(uri:String, db:String)
// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )
// 定義電影內(nèi)容信息提取出的特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
// 1. 適用于 用戶喜歡某個電影电媳,然后找出電影相似度最高的幾個匾乓,推薦給用戶
object ContentRecommender {
// 定義表名和常量
val MONGODB_MOVIE_COLLECTION = "Movie"
val CONTENT_MOVIE_RECS = "ContentMovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 2. 創(chuàng)建一個SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 3. 加載數(shù)據(jù)拼缝,并作預(yù)處理
val movieTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map(
// 提取mid咧七,name继阻,genres三項(xiàng)作為原始內(nèi)容特征瘟檩,genres演員是按照|存儲的,分詞器默認(rèn)按照空格做分詞
x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
)
.toDF("mid", "name", "genres")
.cache()
// 4. 核心部分: 用TF-IDF從內(nèi)容信息中提取電影特征向量歉嗓,創(chuàng)建一個分詞器鉴分,默認(rèn)按空格分詞
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 5. 用分詞器對原始數(shù)據(jù)做轉(zhuǎn)換志珍,生成新的一列words, 現(xiàn)在對象里面有四個"mid", "name", "genres", "words"是按照空格分詞
val wordsData = tokenizer.transform(movieTagsDF)
// 6. 引入HashingTF工具伦糯,可以把一個詞語序列轉(zhuǎn)化成對應(yīng)的詞頻
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
val featurizedData = hashingTF.transform(wordsData)
// 7. 引入IDF工具敛纲,可以得到idf模型, 現(xiàn)在對象里面再加了一列叫rawFeatures淤翔,把一個詞語genres序列轉(zhuǎn)化成對應(yīng)的詞頻
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 8. 訓(xùn)練idf模型旁壮,得到每個詞的逆文檔頻率
val idfModel = idf.fit(featurizedData)
// 9. 用模型對原數(shù)據(jù)進(jìn)行處理抡谐,得到文檔中每個詞的tf-idf,作為新的特征向量, 再加了一列features得到逆詞頻
val rescaledData = idfModel.transform(featurizedData)
val movieFeatures = rescaledData.map(
row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
)
.rdd
.map(
x => ( x._1, new DoubleMatrix(x._2) )
)
movieFeatures.collect().foreach(println)
// 10. 對所有電影兩兩計(jì)算它們的相似度,先做笛卡爾積
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter{
// 10.1 把自己跟自己的配對過濾掉, _1是mid即是電影序號, _2是特征值
case (a, b) => a._1 != b._1
}
.map{
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
( a._1, ( b._1, simScore ) )
}
}
.filter(_._2._2 > 0.6) // 過濾出相似度大于0.6的
.groupByKey()
.map{
// items是item集合( b._1, simScore )對象即 電影序號 + 相似度
case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 求向量余弦相似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}
}
- 基于協(xié)同物品推薦: 可以用上面舉例的KNN算法
6. 部署之Azkaban離線調(diào)度
-
大數(shù)據(jù)離線調(diào)度系統(tǒng), 可以按照順序執(zhí)行流程
- 使用方式:
1.創(chuàng)建Azkaban-Stat.job,編輯器輸入, 其中***表示包名, xxx表示mac路徑杜秸,運(yùn)行前得先裝spark哈
-
將對于jar包和 job文件打包成zip, 然后點(diǎn)upload
- 這里可以直接點(diǎn)運(yùn)行诞挨,也可以設(shè)置定時調(diào)度時間
type=command
command=/usr/local/spark/bin/spark-submit --class ***.OfflineRecommender /Users/xxx/Desktop/ideaworkspace/big_data/MovieRecommendSystem/recommender/OfflineRecommender/target/OfflineRecommender-1.0-SNAPSHOT.jar
- mac啟動方式:
1. cd /Users/xxx/Desktop/azkaban/azkaban-solo-server-0.1.0-SNAPSHOT
2. bin/start-solo.sh
3. 賬號azkaban密碼也是
4. http://localhost:8081/index
7. 部署之Dataworks + Maxcomputer
- 主要是阿里云的大數(shù)據(jù)平臺惶傻,Dataworks轉(zhuǎn)移數(shù)據(jù)银室,Maxcomputer計(jì)算励翼,還可以實(shí)現(xiàn)按照順序類似Azkaban的按順序調(diào)度
- 阿里云平臺可以體驗(yàn)汽抚,阿里平臺也提供人工智能集成造烁,但是公司并沒有使用惭蟋,用了spark ml機(jī)器學(xué)
- 這里也有AI學(xué)習(xí)告组,AI學(xué)習(xí)天地
參考
- 機(jī)器學(xué)習(xí)思維導(dǎo)圖
- 有監(jiān)督和無監(jiān)督學(xué)習(xí)都各有哪些有名的算法和深度學(xué)習(xí)?
- 螞蟻金服 ZSearch 在向量檢索上的探索
- milvus向量數(shù)據(jù)庫文檔
- 一文入門Facebook開源向量檢索框架Faiss
- opencv3與tensorflow的關(guān)系
- mongodb與mysql區(qū)別
- 日志收集組件flume和logstash對比
- ELK系列-使用flume日志收集
- 如何使用flume采集日志到kafka中
- 快速學(xué)習(xí)-電影推薦系統(tǒng)設(shè)計(jì)(實(shí)時推薦模塊)
- 項(xiàng)目體系架構(gòu)設(shè)計(jì)——基于Spark平臺的協(xié)同過濾實(shí)時電影推薦系統(tǒng)項(xiàng)目系列博客(四)
- PyODPS 基本操作 | 學(xué)習(xí)筆記
- PAI 平臺搭建企業(yè)級個性化推薦系統(tǒng) 最佳實(shí)踐
- 機(jī)器學(xué)習(xí)2—KNN算法(原理驼卖、實(shí)現(xiàn)氨肌、實(shí)例)
- 入門機(jī)器學(xué)習(xí)(十五)--無監(jiān)督學(xué)習(xí)(K均值)
- 無監(jiān)督學(xué)習(xí)——K-均值聚類算法對未標(biāo)注數(shù)據(jù)分組
- Flume數(shù)據(jù)采集工具之a(chǎn)gent
- Flume案例二:實(shí)時監(jiān)控單個追加文件(tail -f 日志)(exec source)
- macos安裝spark
- azkaban詳細(xì)使用教程
- Azkaban中的一些坑
- MongoDB使用場景總結(jié)
- MongoDB 在評論中臺的實(shí)踐