業(yè)務(wù)實(shí)戰(zhàn)場景(十四)推薦系統(tǒng)

思維導(dǎo)圖

思維導(dǎo)圖.png

系列總目錄


推薦系統(tǒng)簡介

1. 推薦系統(tǒng)分類

推薦系統(tǒng)分類.png
1.1 根據(jù)實(shí)時性分類
  1. 離線推薦
  2. 實(shí)時推薦
1.2 推薦原則分類
  1. 基于相似度的推薦
  2. 基于知識的推薦
  3. 基于模型的推薦
1.3 是否個性化分類
  1. 基于統(tǒng)計(jì)的推薦
  2. 個性化推薦
1.4 數(shù)據(jù)源分類
  1. 基于人口統(tǒng)計(jì)學(xué)的推薦
  2. 基于內(nèi)容的推薦
  3. 基于協(xié)同過濾的推薦

2. 推薦系統(tǒng)算法

2.1 基于人口學(xué)統(tǒng)計(jì)
  • 判斷用戶a的年齡段跟用戶c相似辜王,而用戶a喜歡物品A情龄,那給用戶c推薦物品A


    基于人口學(xué)統(tǒng)計(jì).png
2.2 基于內(nèi)容推薦
  • 電影A和電影C的類型都是愛情宰掉,動作判斷這兩個電影相似逞壁,那用戶a喜歡電影A則給用戶a推薦電影C


    基于內(nèi)容推薦.png
2.3 基于協(xié)同過濾
  • 基于內(nèi)容推薦主要是利于用戶評價過的物品特征故痊,而協(xié)同過濾還可以根據(jù)其他用戶的評價。協(xié)同過濾的優(yōu)勢在于不受限與內(nèi)容質(zhì)量判斷,當(dāng)物品內(nèi)容難以獲得,協(xié)同過濾還是有用武之地
  1. 基于近鄰的協(xié)同過濾
    1.1 基于用戶協(xié)同過濾(User-CF)


    基于用戶協(xié)同過濾.png

    1.2 基于物品協(xié)同過濾(Item-CF)


    基于物品協(xié)同過濾.png
  2. 基于模型的協(xié)同過濾
    2.1 奇異值分解(SVD)
    2.2 潛在語義分析(LSA)
    2.3 支撐向量機(jī)(SVM)

2.4 混合推薦
  • 實(shí)際網(wǎng)站的推薦系統(tǒng)往往都不是單純只采用了某一種推薦的機(jī)制和策略伍茄,往往是將多個方法混合在一起,從而達(dá)到更好的推薦效果

機(jī)器學(xué)習(xí)基礎(chǔ)

1. 定義

  • 機(jī)器學(xué)習(xí)是一門人工智能的科學(xué)施逾,該領(lǐng)域的主要研究對象是人工智能敷矫,特別是如何在經(jīng)驗(yàn)學(xué)習(xí)中改善具體算法的性能
  • 機(jī)器學(xué)習(xí)是對能通過經(jīng)驗(yàn)自動改進(jìn)的計(jì)算機(jī)算法的研究
  • 機(jī)器學(xué)習(xí)是用數(shù)據(jù)或以往的經(jīng)驗(yàn),以此優(yōu)化計(jì)算機(jī)程序的性能標(biāo)準(zhǔn)
  • 深度學(xué)習(xí)是機(jī)器學(xué)習(xí)的一個重要分支汉额,推薦算法是機(jī)器學(xué)習(xí)的一個重要應(yīng)用

2. 過程

過程.png

3.1 分類--有監(jiān)督學(xué)習(xí)

  • 有監(jiān)督學(xué)習(xí)是提供數(shù)據(jù)并提供相應(yīng)結(jié)果的機(jī)器學(xué)習(xí)過程
3.1.1 分類算法
  • 輸出被限制有限的離散數(shù)值曹仗,比如根據(jù)房屋特性判斷某個房屋是否出售,這里是否就是離散數(shù)組
  • 分類模型包含分類學(xué)習(xí)過程蠕搜,學(xué)習(xí)過程利用學(xué)習(xí)方法學(xué)習(xí)一個分類器怎茫,分類過程是對已獲取的分類器對新輸入的進(jìn)行分類。分類器性能評估:
  1. 召回率: 所有實(shí)際為正類的數(shù)據(jù)中讥脐,被正確預(yù)測找出的比例
  2. 精確率: 所有實(shí)際為正類(關(guān)注的類)的數(shù)據(jù)中遭居,預(yù)測正確比例
3.1.2 分類模型
  • 輸出被限制連續(xù)數(shù)值啼器,比如根據(jù)房屋特性判斷某個房屋出售概率
  1. KNN
  2. 決策樹
  3. 邏輯斯蒂回歸
3.1.3 回歸算法
  • 輸出連續(xù)的數(shù)值旬渠,比如根據(jù)房屋特性判斷某個房屋出售概率,這里概率就是連續(xù)數(shù)值
3.1.4 回歸模型算法
  • 輸入到輸出的映射函數(shù)
  1. 線性回歸
  2. 非線性回歸
  3. 最小二乘法:推薦算法中ALS介紹了最小二乘法端壳,后面篇幅會有具體代碼舉例
3.1.5 監(jiān)督學(xué)習(xí)模型評估策略
  • 監(jiān)督學(xué)習(xí)三要素: 選出合適的模型告丢,選出合適的評價模型策略,選出具體算法
  1. 模型:總結(jié)內(nèi)在規(guī)律用數(shù)學(xué)函數(shù)表示
    1.1 模型評估
    1.1.1 損失函數(shù): 用來衡量模型測誤差大小损谦,評價損失就叫經(jīng)驗(yàn)風(fēng)險
    1.1.2 過擬合和欠擬合: 欠擬合可能是訓(xùn)練集太小岖免,沒有很好的捕獲到信息岳颇,比如誤認(rèn)為綠色的就是樹葉。 過擬合就是學(xué)習(xí)的太徹底颅湘,把很多噪聲都學(xué)進(jìn)去了话侧,比如誤認(rèn)為樹葉必要是橢圓形的。 所以模型的復(fù)雜度要適中
  2. 算法: 選取最優(yōu)模型的具體方法
  3. 策略:選取最優(yōu)模型評價準(zhǔn)則
  • 一般步驟
  1. 得到一個有限的數(shù)據(jù)集合闯参,確定包含所有學(xué)習(xí)模型集合
  2. 確定模型選擇準(zhǔn)則----學(xué)習(xí)策略
  3. 實(shí)現(xiàn)求解最優(yōu)模型算法---學(xué)習(xí)算法
  4. 通過學(xué)習(xí)算法選擇最優(yōu)模型瞻鹏,得到最優(yōu)模型進(jìn)行預(yù)測分析
3.1.6 反饋常見處理
反饋常見處理.png

3.2 分類--無監(jiān)督學(xué)習(xí)

  • 無監(jiān)督學(xué)習(xí)是提供數(shù)據(jù)不提供相應(yīng)結(jié)果的機(jī)器學(xué)習(xí)過程,核心應(yīng)該是密度估計(jì)和聚類分析鹿寨,比如google新聞內(nèi)容會分組新博,按照不同主題呈現(xiàn)給用戶
  • 有監(jiān)督學(xué)習(xí)使用的是有類別的數(shù)據(jù)
  • 除了聚類還有降維, 需要使用降維的原因是數(shù)據(jù)壓縮,數(shù)據(jù)壓縮不僅可以對數(shù)據(jù)進(jìn)行壓縮脚草,使得數(shù)據(jù)占用較少的內(nèi)存或硬盤空間赫悄,還能對學(xué)習(xí)算法進(jìn)行加速
聚類模型-k均值
  • 如果能對顧客數(shù)據(jù)(消費(fèi)金額及購物時間段等)進(jìn)行聚類,那么輸出的類別將是家庭主婦或者上班族等馏慨,顧客將被表示為不同的類別埂淮,這樣就可以針對不同的類別實(shí)施不同的銷售策略
  • 實(shí)現(xiàn)步驟
  1. 原始數(shù)據(jù)集合


    原始數(shù)據(jù)集合.png
  2. 選擇K個隨機(jī)的點(diǎn),稱為聚類中心(cluster centroids)熏纯,K就是“K-均值”中的K同诫,表示的是樣本要進(jìn)行分類的數(shù)目,在本例中K=2樟澜。我們隨機(jī)地選擇連個聚類中心误窖,分別用紅色的叉和藍(lán)色的叉表示


    第一步.png
  3. 對于數(shù)據(jù)集中的每一個數(shù)據(jù),按照距離聚類中心點(diǎn)的距離秩贰,將其與距離最近的中心點(diǎn)關(guān)聯(lián)起來霹俺,組成一個類。如下圖所示毒费,與紅色的聚類中心距離近的點(diǎn)被分為紅色的類丙唧,與藍(lán)色的聚類中心距離近的點(diǎn)被分為藍(lán)色的類


    第二步.png

    3.計(jì)算每一個類中樣本的平均值,將該類的聚類中心移動到平均值的位置觅玻。如下圖所示想际,聚類中心進(jìn)行了相應(yīng)的移動


    第三步.png

    4.重復(fù)步驟②,將樣本進(jìn)行重新分類溪厘,如下圖所示:
    第四步.png

    5.重復(fù)步驟③胡本,再次移動聚類中心


    第五步.png
  4. 重復(fù)步驟②,將樣本進(jìn)行重分類


    第六步.png
  5. 依次類推畸悬,重復(fù)步驟②③侧甫,一直迭代,直到聚類中心不在變化

3.3 推薦協(xié)同過濾推薦

  • 基于內(nèi)容推薦主要是用戶評價過的物品特征,協(xié)同推薦還可以利用其它用戶評價過的物品內(nèi)容
  • 基于協(xié)同推薦解決物品內(nèi)容難以獲得披粟,可以基于其它用戶推薦
  • 協(xié)同推薦可以推薦差異很大的物品咒锻,但是又有內(nèi)在聯(lián)系
3.3.1 基于近鄰協(xié)同推薦(類似分類,是否推薦)
是否推薦.png
  • 基于用戶協(xié)同: 根據(jù)所有用戶對物品偏好守屉,找出口味相同鄰居惑艇,并根據(jù)近鄰?fù)扑]拇泛,可以用K-近鄰算法,基于K個鄰居推薦
  • 基于用戶推薦與人口統(tǒng)計(jì)差別:基于人口統(tǒng)計(jì)學(xué)只考慮用戶本身特性兢卵,基于用戶協(xié)同過濾考慮用戶歷史偏好
  • 基于物品協(xié)同推薦,類似基于用戶協(xié)同過濾秽荤,使用所有用戶對物品偏好柠横,發(fā)現(xiàn)物品與物品相似度窃款,根據(jù)用戶歷史偏好進(jìn)行推薦
  • 基于物品協(xié)同推薦與基于內(nèi)容推薦差別: 基于內(nèi)容推薦是基于物品特征本身,協(xié)同推薦則會考慮歷史
  • 使用場景: 基于物品協(xié)同場景是在web站點(diǎn)中晨继,物品遠(yuǎn)小于用戶紊扬,所以物品會穩(wěn)定些唉擂。基于用戶協(xié)同: 新聞推薦中新聞數(shù)量大于用戶數(shù)量腹缩,這時候用戶會穩(wěn)定些
  • knn算法舉例:
  1. 首先獲取訓(xùn)練集藏鹊,每行數(shù)據(jù)包含多個特征和分類標(biāo)簽


    訓(xùn)練集.png
  2. 輸入沒有標(biāo)簽但有多個特征的新數(shù)據(jù)


    輸入無標(biāo)簽.png
  3. 將新數(shù)據(jù)的每個特征與樣本中每條數(shù)據(jù)對應(yīng)的特征進(jìn)行比較盘寡,然后提取出樣本中與新數(shù)據(jù)最相似的K條數(shù)據(jù)


    對比選擇相似數(shù)據(jù).png

    結(jié)果.png
  4. 如何得到相似度:可以用歐式距離求解, 將上述訓(xùn)練集中的數(shù)據(jù)特征用來對應(yīng)A或B的坐標(biāo)宴抚,即大眼睛甫煞、高鼻梁、細(xì)腰常潮、... 對應(yīng) a1,a2喊式。萧朝。。献联,大眼睛何址、高鼻梁都是字符用爪,這怎么進(jìn)行計(jì)算呢? 將字符型數(shù)據(jù)轉(zhuǎn)化為數(shù)值型數(shù)據(jù)以及其它對數(shù)據(jù)的預(yù)處理操作也是機(jī)器學(xué)習(xí)中的關(guān)鍵步驟,可以將眼睛的大小級別設(shè)為1诸衔,2颇玷,3個等級亚隙,3表示為大眼睛,1表示為小眼睛诊霹,鼻梁渣淳、身高等特征同理
    具體化.png
3.3.2 基于模型協(xié)同推薦(類似回歸)
  • 基于樣本的用戶偏好入愧,訓(xùn)練一個推薦模型,根據(jù)用戶實(shí)時偏好怔蚌,進(jìn)行新物品預(yù)測,計(jì)算得分
  • 與基于近鄰區(qū)別: 基于近鄰是使用已有用戶偏好椅野,通過近鄰數(shù)據(jù)預(yù)測對新數(shù)據(jù)偏好竟闪,類似分類杖狼〉基于模型是訓(xùn)練模型,根據(jù)模型預(yù)測暗挑,類似回歸
  • 訓(xùn)練模型可以使用LFM訓(xùn)練隱語義: 協(xié)同過濾非常依賴歷史數(shù)據(jù)炸裆,而歷史數(shù)據(jù)一般是稀疏的鲜屏,這就需要降維處理洛史,分解矩陣之后得到用戶和物品的隱藏特征
  • 矩陣因式分解,分解出需要訓(xùn)練的模型土思,得到LFM, 并得到隱藏特征f1 f2忆嗜。捆毫。绩卤。江醇, SPARK ML中有ALS算法解決模型的數(shù)據(jù)表達(dá)陶夜,里面加入平方損失函數(shù)赖晶,并加入正則化遏插,防止過擬合


    矩陣因式分解.png

    提取f1_f2.png
  • 模型的求解: 最小交替二乘法ALS或者隨機(jī)梯度下降算法
  • ALS:由于模擬的矩陣P Q都位置那就先固定一個P0, 通過損失函數(shù)求出Q胳嘲,這是典型的最小二乘法問題扣草,然后反過來固定Q0求出P辰妙,如此交替直到達(dá)到誤差滿足閾值密浑,后面篇幅會有具體代碼舉例

電影推薦項(xiàng)目

1. 架構(gòu)

1.1 大數(shù)據(jù)處理流程
  • 1.1.1 實(shí)時計(jì)算
  1. 用戶接口: 網(wǎng)站或者APP, 前端可以通過埋點(diǎn)產(chǎn)生數(shù)據(jù)
  2. 后端服務(wù)器: SpringBoot項(xiàng)目懒构,通過打log形式產(chǎn)生數(shù)據(jù)
  3. 日志文件: 后端服務(wù)器集群部署胆剧,所以有可能一個后端服務(wù)有多份日志文件
  4. 日志采集:每收集一分鐘秩霍,或者一點(diǎn)數(shù)據(jù)就放入文件,然后可以轉(zhuǎn)移到flume中辕近,或者直接通過定制api打入flume中移宅,可以配置flume寫入kafka中
  5. 數(shù)據(jù)總線: 通常由kafka等來的消息漏峰,實(shí)時數(shù)據(jù),實(shí)時log, 寫入kafka, 再由Flink等實(shí)時處理讀取
  6. 實(shí)時計(jì)算: Flink等倔喂,可以封裝大量業(yè)務(wù)靖苇,甚至進(jìn)行機(jī)器學(xué)習(xí)贤壁,智能推薦等
  7. 數(shù)據(jù)存儲:計(jì)算完存儲進(jìn)數(shù)據(jù)庫
  8. 數(shù)據(jù)可視化:大屏展示等


    實(shí)時計(jì)算.jpg
  • 1.1.2 離線數(shù)倉
  1. 用戶接口: 網(wǎng)站或者APP, 前端可以通過埋點(diǎn)產(chǎn)生數(shù)據(jù)
  2. 后端服務(wù)器: SpringBoot項(xiàng)目脾拆,通過打log形式產(chǎn)生數(shù)據(jù)
  3. 日志文件: 后端服務(wù)器集群部署名船,所以有可能一個后端服務(wù)有多份日志文件
  4. 日志采集:可以用python腳本渠驼,或者自己寫java服務(wù),利用定時任務(wù)疯趟,將當(dāng)天所有數(shù)據(jù)采集起來谋梭,用文件日志轉(zhuǎn)移到flume agent監(jiān)控的目錄瓮床,然后flume agent可以sink到HDFS, flume agent是單個jvm進(jìn)程
  5. 日志存儲: 可以存儲在Hadoop上用于大數(shù)據(jù)分析
  6. 日志清洗: 可以用Azkaban來進(jìn)行定時調(diào)度隘庄,可以用corn定時工具調(diào)度丑掺,將HDFS文件寫入另外個HDFS文件
  7. 數(shù)據(jù)加載: 將清洗后的HDFS文件放入HIVE表中,HIVE表分區(qū)兼丰,每個分區(qū)存一天數(shù)據(jù)
  8. 數(shù)據(jù)倉庫: 數(shù)倉處理
  9. 數(shù)據(jù)計(jì)算: Spark計(jì)算
  10. 數(shù)據(jù)存儲:計(jì)算完存儲進(jìn)數(shù)據(jù)庫
  11. 數(shù)據(jù)可視化:大屏展示等


    離線數(shù)據(jù)倉庫.jpg
1.2 系統(tǒng)模塊設(shè)計(jì)
  • 項(xiàng)目分為 實(shí)時推薦服務(wù)鳍征,離線推薦服務(wù)艳丛,離線統(tǒng)計(jì)服務(wù)氮双,內(nèi)容檢索服務(wù)其中各個服務(wù)細(xì)分又分為基于內(nèi)容,基于協(xié)同砰粹,基于模型的推薦


    系統(tǒng)模塊設(shè)計(jì).png
1.3 項(xiàng)目系統(tǒng)架構(gòu)
  • 離線部分: Azkaban調(diào)度系統(tǒng)將每日的HDF定時S進(jìn)行清洗加載,Spark離線統(tǒng)計(jì)服務(wù)進(jìn)行離線統(tǒng)計(jì)計(jì)算饭入,Spark Ml lib機(jī)器學(xué)習(xí)中ALS實(shí)現(xiàn)離線推薦
  • 在線部分: 從綜合業(yè)務(wù)服務(wù)谐丢,一般是spring boot服務(wù)乾忱,使用flume采集到kafka, 使用spark stream(Flink可以)進(jìn)行實(shí)時推薦來補(bǔ)充離線推薦的信息滯后性
  • 近線部分: 離線窄瘟,在線都有可能直接寫業(yè)務(wù)數(shù)據(jù)庫趟卸,綜合業(yè)務(wù)服務(wù)可以從業(yè)務(wù)數(shù)據(jù)庫锄列,ES邻邮,redis讀取數(shù)據(jù)


    項(xiàng)目系統(tǒng)架構(gòu).png

    更詳細(xì)架構(gòu).jpg

2. 統(tǒng)計(jì)推薦

  • Azkaban定時調(diào)度筒严,更新電影均分,個數(shù)等數(shù)據(jù)到mongodb


    統(tǒng)計(jì)推薦.png

3. 離線推薦

  • 用ALS算法訓(xùn)練隱語義模型紫岩,對應(yīng)機(jī)器學(xué)習(xí)有監(jiān)督學(xué)習(xí)中回歸模型算法泉蝌,當(dāng)然新注冊用戶可能會有冷啟動問題勋陪,可以讓用戶自己填標(biāo)簽然后推薦
  • 計(jì)算用戶推薦矩陣
  • 計(jì)算電影相似度矩陣
  • 也是Azkaban定時調(diào)度


    離線推薦.png
代碼實(shí)戰(zhàn)
  • 由于本人是JAVA出身诅愚,實(shí)例代碼是Scala語言劫映,很多地方打了Debug才知道具體含義泳赋,這里可以多Debug下祖今,里面的集合.collect下可以得到結(jié)果
  • 具體步驟:
  1. Azkaban定時調(diào)度
  2. 創(chuàng)建一個SparkSession
  3. 從mongodb加載數(shù)據(jù)
  4. 從rating數(shù)據(jù)中提取所有的uid和mid千诬,并去重
  5. 訓(xùn)練隱語義模型, 使用spark ml lib的ALS算法
  6. 基于用戶和電影的隱特征,計(jì)算預(yù)測評分邪驮,得到用戶的推薦列表耕捞,計(jì)算user和movie的笛卡爾積俺抽,得到一個空評分矩陣
  7. 調(diào)用model的predict方法預(yù)測評分
  8. 過濾出評分大于0的項(xiàng)
  9. 基于電影隱特征磷斧,計(jì)算相似度矩陣,得到電影的相似度列表
  10. 對所有電影兩兩計(jì)算它們的相似度冕末,先做笛卡爾積
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix


// 基于評分?jǐn)?shù)據(jù)的LFM档桃,只需要rating數(shù)據(jù)
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int )

case class MongoConfig(uri:String, db:String)

// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )

// 定義基于預(yù)測評分的用戶推薦列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定義基于LFM電影特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. Azkaban定時調(diào)度
object OfflineRecommender {

  // 定義表名和常量
  val MONGODB_RATING_COLLECTION = "Rating"

  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"

  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    println("Start")
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 2. 創(chuàng)建一個SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))


    // 3. 從mongodb加載數(shù)據(jù)
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map( rating => ( rating.uid, rating.mid, rating.score ) )    // 轉(zhuǎn)化成rdd藻肄,并且去掉時間戳
      .cache()

    // 4. 從rating數(shù)據(jù)中提取所有的uid和mid嘹屯,并去重
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    // 5. 訓(xùn)練隱語義模型, 使用spark ml lib的ALS算法
    val trainData = ratingRDD.map( x => Rating(x._1, x._2, x._3) )

    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    // 6. 基于用戶和電影的隱特征州弟,計(jì)算預(yù)測評分婆翔,得到用戶的推薦列表浙滤,計(jì)算user和movie的笛卡爾積气堕,得到一個空評分矩陣
    val userMovies = userRDD.cartesian(movieRDD)

    // 7. 調(diào)用model的predict方法預(yù)測評分
    val preRatings = model.predict(userMovies)

    // 8.  過濾出評分大于0的項(xiàng)
    val userRecs = preRatings
      .filter(_.rating > 0)   
      .map(rating => ( rating.user, (rating.product, rating.rating) ) )
      .groupByKey()
      .map{
        case (uid, recs) => UserRecs( uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1, x._2)) )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 9. 基于電影隱特征,計(jì)算相似度矩陣誓沸,得到電影的相似度列表
    val movieFeatures = model.productFeatures.map{
      case (mid, features) => (mid, new DoubleMatrix(features))
    }

    // 10. 對所有電影兩兩計(jì)算它們的相似度拜隧,先做笛卡爾積
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配對過濾掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      // 過濾出相似度大于0.6的
      .filter(_._2._2 > 0.6)   
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()

    println("Over")
  }

  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }

}

4. 實(shí)時推薦

  • 計(jì)算速度要快垦页,結(jié)果可以不是特別精確干奢,有預(yù)先設(shè)計(jì)好的模型
  • 日志觸發(fā)實(shí)時更新,F(xiàn)lume將日志數(shù)據(jù)寫到Kafka辕羽,Spark Stream或者Flink訂閱Kafka的topic, 然后實(shí)時推薦從redis讀取數(shù)據(jù)刁愿,并將結(jié)果寫入Mongodb, 實(shí)時推薦的服務(wù)部署可以類似Flink部署


    實(shí)時推薦.png
  • 基本原理: 用戶最近的口味是相同的
  • 代碼實(shí)踐酌毡,假如用戶對某電影標(biāo)記喜歡: 步驟
  1. 部署可以用實(shí)時計(jì)算那一套
  2. 創(chuàng)建一個SparkSession
  3. 拿到streaming context枷踏,當(dāng)然也可以用Flink
  4. 從Mongodb加載電影相似度矩陣數(shù)據(jù)掰曾,把它廣播出去
  5. 定義kafka連接參數(shù)
    6.通過kafka創(chuàng)建一個DStream
  6. 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流
  7. 繼續(xù)做流式處理旷坦,核心實(shí)時算法部分
    8.1 從redis里獲取當(dāng)前用戶最近的K次評分秒梅,保存成Array[(mid, score)]
    8.2 從相似度矩陣中取出當(dāng)前電影最相似的N個電影捆蜀,作為備選列表辆它,Array[mid],數(shù)據(jù)來源于離線推薦計(jì)算的相似度
    8.3 對每個備選電影呢蔫,計(jì)算推薦優(yōu)先級片吊,得到當(dāng)前用戶的實(shí)時推薦列表俏脊,Array[(mid, score)]联予,computeMovieScores有具體說明,拿到備選電影和最近評分電影的相似度就可以過濾了季眷,獲取兩個電影之間的相似度也是根據(jù)離線結(jié)果來的
    8.4 把推薦數(shù)據(jù)保存到mongodb
    9.開始接收和處理數(shù)據(jù)

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import kafka.Kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 定義連接助手對象子刮,序列化
object ConnHelper extends Serializable{
  lazy val jedis = new Jedis("localhost")
  lazy val mongoClient = MongoClient( MongoClientURI("mongodb://localhost:27017/recommender") )
}

case class MongoConfig(uri:String, db:String)

// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )

// 定義基于預(yù)測評分的用戶推薦列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定義基于LFM電影特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. 部署可以用實(shí)時計(jì)算那一套
object StreamingRecommender {

  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")

    // 2. 創(chuàng)建一個SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 3. 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2))    // batch duration

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 4. 加載電影相似度矩陣數(shù)據(jù),把它廣播出去
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd
      .map{ movieRecs => // 為了查詢相似度方便橱赠,轉(zhuǎn)換成map
        (movieRecs.mid, movieRecs.recs.map( x=> (x.mid, x.score) ).toMap )
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 5. 定義kafka連接參數(shù)
    val kafkaParam = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"
    )
    // 6.通過kafka創(chuàng)建一個DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
    )

    // 7. 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流
    val ratingStream = kafkaStream.map{
      msg =>
        val attr = msg.value().split("\\|")
        ( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    }

    // 8. 繼續(xù)做流式處理狭姨,核心實(shí)時算法部分
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid, mid, score, timestamp) => {
          println("rating data coming! >>>>>>>>>>>>>>>>")

          // 8.1 從redis里獲取當(dāng)前用戶最近的K次評分饼拍,保存成Array[(mid, score)]
          val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )

          // 8.2 從相似度矩陣中取出當(dāng)前電影最相似的N個電影师抄,作為備選列表叨吮,Array[mid]
          val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )

          // 8.3 對每個備選電影挤安,計(jì)算推薦優(yōu)先級丧鸯,得到當(dāng)前用戶的實(shí)時推薦列表丛肢,Array[(mid, score)]
          val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )

          // 8.4 把推薦數(shù)據(jù)保存到mongodb
          saveDataToMongoDB( uid, streamRecs )
        }
      }
    }
    // 9.開始接收和處理數(shù)據(jù)
    ssc.start()

    println(">>>>>>>>>>>>>>> streaming started!")

    ssc.awaitTermination()

  }

  // redis操作返回的是java類蜂怎,為了用map操作需要引入轉(zhuǎn)換類
  import scala.collection.JavaConversions._

  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    // 從redis讀取數(shù)據(jù)杠步,用戶評分?jǐn)?shù)據(jù)保存在 uid:UID 為key的隊(duì)列里幽歼,value是 MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item => // 具體每個評分又是以冒號分隔的兩個值
          val attr = item.split("\\:")
          ( attr(0).trim.toInt, attr(1).trim.toDouble )
      }
      .toArray
  }

  /**
    * 獲取跟當(dāng)前電影做相似的num個電影,作為備選電影
    * @param num       相似電影的數(shù)量
    * @param mid       當(dāng)前電影ID
    * @param uid       當(dāng)前評分用戶ID
    * @param simMovies 相似度矩陣
    * @return          過濾之后的備選電影列表
    */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                     (implicit mongoConfig: MongoConfig): Array[Int] ={
    // 1. 從相似度矩陣中拿到所有相似的電影
    val allSimMovies = simMovies(mid).toArray

    // 2. 從mongodb中查詢用戶已看過的電影
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find( MongoDBObject("uid" -> uid) )
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }

    // 3. 把看過的過濾,得到輸出列表
    allSimMovies.filter( x=> ! ratingExist.contains(x._1) )
      .sortWith(_._2>_._2)
      .take(num)
      .map(x=>x._1)
  }

  def computeMovieScores(candidateMovies: Array[Int],
                         userRecentlyRatings: Array[(Int, Double)],
                         simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
    // 定義一個ArrayBuffer诬烹,用于保存每一個備選電影的基礎(chǔ)得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定義一個HashMap绞吁,保存每一個備選電影的增強(qiáng)減弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()

    for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
      // 拿到備選電影和最近評分電影的相似度
      val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

      if(simScore > 0.7){
        // 計(jì)算備選電影的基礎(chǔ)推薦得分
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
        if( userRecentlyRating._2 > 3 ){
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else{
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    // 根據(jù)備選電影的mid做groupby掀泳,根據(jù)公式去求最后的推薦評分
    scores.groupBy(_._1).map{
      // groupBy之后得到的數(shù)據(jù) Map( mid -> ArrayBuffer[(mid, score)] )
      case (mid, scoreList) =>
        ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray.sortWith(_._2>_._2)
  }

  // 獲取兩個電影之間的相似度
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,
    scala.collection.immutable.Map[Int, Double]]): Double ={

    simMovies.get(mid1) match {
      case Some(sims) => sims.get(mid2) match {
        case Some(score) => score
        case None => 0.0
      }
      case None => 0.0
    }
  }

  // 求一個數(shù)的對數(shù)员舵,利用換底公式藕畔,底數(shù)默認(rèn)為10
  def log(m: Int): Double ={
    val N = 10
    math.log(m)/ math.log(N)
  }

  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
    // 定義到StreamRecs表的連接
    val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

    // 如果表中已有uid對應(yīng)的數(shù)據(jù)注服,則刪除
    streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
    // 將streamRecs數(shù)據(jù)存入表中
    streamRecsCollection.insert( MongoDBObject( "uid"->uid,
      "recs"-> streamRecs.map(x=>MongoDBObject( "mid"->x._1, "score"->x._2 )) ) )
  }

}

5. 基于內(nèi)容推薦

  • 當(dāng)然還有混合推薦溶弟,混合推薦就是把種推薦加權(quán)處理辜御,基于統(tǒng)計(jì)推薦 + 基于離線推薦 + 基于實(shí)時推薦 + 基于內(nèi)容推薦
  • 電影A的相似電影:有相同標(biāo)簽就可以定義為相似擒权,這個需要定義好標(biāo)簽碳抄,可以用戶自定義畫像等
  • 基于UGC的特征提取: 可以用TF-IDF剖效,加入TF-IDF是為了防止熱門標(biāo)簽對推薦結(jié)果影響焰盗,步驟, 可以定時調(diào)度
  1. 適用于用戶喜歡某個電影姨谷,然后找出電影相似度最高的幾個梦湘,推薦給用戶
  2. 創(chuàng)建一個SparkSession
  3. 加載數(shù)據(jù)捌议,并作預(yù)處理
  4. 核心部分: 用TF-IDF從內(nèi)容信息中提取電影特征向量瓣颅,創(chuàng)建一個分詞器譬正,默認(rèn)按空格分詞曾我,這里按照演員分詞
  5. 用分詞器對原始數(shù)據(jù)做轉(zhuǎn)換抒巢,生成新的一列words, 現(xiàn)在對象里面有四個"mid", "name", "genres", "words"是按照空格分詞
  6. 引入HashingTF工具蛉谜,可以把一個詞語序列轉(zhuǎn)化成對應(yīng)的詞頻
  7. 引入IDF工具,可以得到idf模型, 現(xiàn)在對象里面再加了一列叫rawFeatures客燕,把一個詞語genres序列轉(zhuǎn)化成對應(yīng)的詞頻
  8. 訓(xùn)練idf模型也搓,得到每個詞的逆文檔頻率
  9. 用模型對原數(shù)據(jù)進(jìn)行處理还绘,得到文檔中每個詞的tf-idf,作為新的特征向量, 再加了一列features得到逆詞頻
  10. 對所有電影兩兩計(jì)算它們的相似度塘幅,先做笛卡爾積
    10.1 把自己跟自己的配對過濾掉, _1是mid即是電影序號, _2是特征值
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix


// 需要的數(shù)據(jù)源是電影內(nèi)容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
                 shoot: String, language: String, genres: String, actors: String, directors: String)

case class MongoConfig(uri:String, db:String)

// 定義一個基準(zhǔn)推薦對象
case class Recommendation( mid: Int, score: Double )

// 定義電影內(nèi)容信息提取出的特征向量的電影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )

// 1. 適用于 用戶喜歡某個電影电媳,然后找出電影相似度最高的幾個匾乓,推薦給用戶
object ContentRecommender {

  // 定義表名和常量
  val MONGODB_MOVIE_COLLECTION = "Movie"

  val CONTENT_MOVIE_RECS = "ContentMovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 2. 創(chuàng)建一個SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 3. 加載數(shù)據(jù)拼缝,并作預(yù)處理
    val movieTagsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .map(
        // 提取mid咧七,name继阻,genres三項(xiàng)作為原始內(nèi)容特征瘟檩,genres演員是按照|存儲的,分詞器默認(rèn)按照空格做分詞
        x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
      )
      .toDF("mid", "name", "genres")
      .cache()



    // 4. 核心部分: 用TF-IDF從內(nèi)容信息中提取電影特征向量歉嗓,創(chuàng)建一個分詞器鉴分,默認(rèn)按空格分詞
    val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")

    // 5. 用分詞器對原始數(shù)據(jù)做轉(zhuǎn)換志珍,生成新的一列words, 現(xiàn)在對象里面有四個"mid", "name", "genres", "words"是按照空格分詞
    val wordsData = tokenizer.transform(movieTagsDF)

    // 6. 引入HashingTF工具伦糯,可以把一個詞語序列轉(zhuǎn)化成對應(yīng)的詞頻
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
    val featurizedData = hashingTF.transform(wordsData)

    // 7. 引入IDF工具敛纲,可以得到idf模型, 現(xiàn)在對象里面再加了一列叫rawFeatures淤翔,把一個詞語genres序列轉(zhuǎn)化成對應(yīng)的詞頻
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 8. 訓(xùn)練idf模型旁壮,得到每個詞的逆文檔頻率
    val idfModel = idf.fit(featurizedData)
    // 9. 用模型對原數(shù)據(jù)進(jìn)行處理抡谐,得到文檔中每個詞的tf-idf,作為新的特征向量, 再加了一列features得到逆詞頻
    val rescaledData = idfModel.transform(featurizedData)


    val movieFeatures = rescaledData.map(
      row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
    )
      .rdd
      .map(
        x => ( x._1, new DoubleMatrix(x._2) )
      )
    movieFeatures.collect().foreach(println)

    // 10. 對所有電影兩兩計(jì)算它們的相似度,先做笛卡爾積
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 10.1 把自己跟自己的配對過濾掉, _1是mid即是電影序號, _2是特征值
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      .filter(_._2._2 > 0.6)    // 過濾出相似度大于0.6的
      .groupByKey()
      .map{
        // items是item集合( b._1, simScore )對象即 電影序號 + 相似度
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", CONTENT_MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }
}

  • 基于協(xié)同物品推薦: 可以用上面舉例的KNN算法

6. 部署之Azkaban離線調(diào)度

  • 大數(shù)據(jù)離線調(diào)度系統(tǒng), 可以按照順序執(zhí)行流程


    Azkaban歷史.png
  • 使用方式:
    1.創(chuàng)建Azkaban-Stat.job,編輯器輸入, 其中***表示包名, xxx表示mac路徑杜秸,運(yùn)行前得先裝spark哈
  1. 將對于jar包和 job文件打包成zip, 然后點(diǎn)upload


    upload.png
  2. 這里可以直接點(diǎn)運(yùn)行诞挨,也可以設(shè)置定時調(diào)度時間
type=command
command=/usr/local/spark/bin/spark-submit --class ***.OfflineRecommender /Users/xxx/Desktop/ideaworkspace/big_data/MovieRecommendSystem/recommender/OfflineRecommender/target/OfflineRecommender-1.0-SNAPSHOT.jar 
  1. mac啟動方式:
1. cd /Users/xxx/Desktop/azkaban/azkaban-solo-server-0.1.0-SNAPSHOT
2. bin/start-solo.sh 
3. 賬號azkaban密碼也是 
4. http://localhost:8081/index

7. 部署之Dataworks + Maxcomputer

  • 主要是阿里云的大數(shù)據(jù)平臺惶傻,Dataworks轉(zhuǎn)移數(shù)據(jù)银室,Maxcomputer計(jì)算励翼,還可以實(shí)現(xiàn)按照順序類似Azkaban的按順序調(diào)度
  • 阿里云平臺可以體驗(yàn)汽抚,阿里平臺也提供人工智能集成造烁,但是公司并沒有使用惭蟋,用了spark ml機(jī)器學(xué)
  • 這里也有AI學(xué)習(xí)告组,AI學(xué)習(xí)天地

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鸿秆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子怎囚,更是在濱河造成了極大的恐慌卿叽,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件考婴,死亡現(xiàn)場離奇詭異,居然都是意外死亡催烘,警方通過查閱死者的電腦和手機(jī)沥阱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來伊群,“玉大人考杉,你說我怎么就攤上這事。” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵撒汉,是天一觀的道長磨总。 經(jīng)常有香客問我,道長弟翘,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮萎坷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘沐兰。我一直安慰自己哆档,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布僧鲁。 她就那樣靜靜地躺著虐呻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪寞秃。 梳的紋絲不亂的頭發(fā)上斟叼,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機(jī)與錄音春寿,去河邊找鬼朗涩。 笑死,一個胖子當(dāng)著我的面吹牛绑改,可吹牛的內(nèi)容都是我干的谢床。 我是一名探鬼主播兄一,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼识腿!你這毒婦竟也來了出革?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤渡讼,失蹤者是張志新(化名)和其女友劉穎骂束,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體成箫,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡展箱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蹬昌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片混驰。...
    茶點(diǎn)故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖皂贩,靈堂內(nèi)的尸體忽然破棺而出栖榨,到底是詐尸還是另有隱情,我是刑警寧澤先紫,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布治泥,位于F島的核電站,受9級特大地震影響遮精,放射性物質(zhì)發(fā)生泄漏居夹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一本冲、第九天 我趴在偏房一處隱蔽的房頂上張望准脂。 院中可真熱鬧,春花似錦檬洞、人聲如沸狸膏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽湾戳。三九已至,卻和暖如春广料,著一層夾襖步出監(jiān)牢的瞬間砾脑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工艾杏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留韧衣,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像畅铭,于是被迫代替她去往敵國和親氏淑。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容