推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二莺治、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三廓鞠、收集用戶行為數(shù)據(jù)
文章推薦系統(tǒng) | 四、構(gòu)建離線文章畫(huà)像
在上篇文章中谣旁,我們已經(jīng)完成了離線文章畫(huà)像的構(gòu)建床佳,接下來(lái),我們要為相似文章推薦做準(zhǔn)備榄审,那就是計(jì)算文章之間的相似度砌们。首先,我們要計(jì)算出文章的詞向量,然后利用文章的詞向量來(lái)計(jì)算文章的相似度浪感。
計(jì)算文章詞向量
我們可以通過(guò)大量的歷史文章數(shù)據(jù)昔头,訓(xùn)練文章中每個(gè)詞的詞向量,由于文章數(shù)據(jù)過(guò)多影兽,通常是分頻道進(jìn)行詞向量訓(xùn)練揭斧,即每個(gè)頻道訓(xùn)練一個(gè)詞向量模型,我們包括的頻道如下所示
channel_info = {
1: "html",
2: "開(kāi)發(fā)者資訊",
3: "ios",
4: "c++",
5: "android",
6: "css",
7: "數(shù)據(jù)庫(kù)",
8: "區(qū)塊鏈",
9: "go",
10: "產(chǎn)品",
11: "后端",
12: "linux",
13: "人工智能",
14: "php",
15: "javascript",
16: "架構(gòu)",
17: "前端",
18: "python",
19: "java",
20: "算法",
21: "面試",
22: "科技動(dòng)態(tài)",
23: "js",
24: "設(shè)計(jì)",
25: "數(shù)碼產(chǎn)品",
}
接下來(lái)峻堰,分別對(duì)各自頻道內(nèi)的文章進(jìn)行分詞處理讹开,這里先選取 18 號(hào)頻道內(nèi)的所有文章,進(jìn)行分詞處理
spark.sql("use article")
article_data = spark.sql("select * from article_data where channel_id=18")
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])
def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs
abspath = "/root/words"
# 結(jié)巴加載用戶詞典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)
# 停用詞文本
stopwords_path = os.path.join(abspath, "stopwords.txt")
def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list
# 所有的停用詞列表
stopwords_list = get_stopwords_list()
# 分詞
def cut_sentence(sentence):
"""對(duì)切割之后的詞語(yǔ)進(jìn)行過(guò)濾捐名,去除停用詞旦万,保留名詞,英文和自定義詞庫(kù)中的詞桐筏,長(zhǎng)度大于2的詞"""
# eg:[pair('今天', 't'), pair('有', 'd'), pair('霧', 'n'), pair('霾', 'g')]
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一個(gè)詞語(yǔ)或者是英文單詞
filtered_words_list.append(seg.word)
return filtered_words_list
for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替換掉標(biāo)簽數(shù)據(jù)
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words
words_df
結(jié)果如下所示纸型,words 為分詞后的詞語(yǔ)列表
接著,使用分詞后的所有詞語(yǔ)梅忌,對(duì) Word2Vec 模型進(jìn)行訓(xùn)練并將模型保存到 HDFS狰腌,其中 vectorSize 為詞向量的長(zhǎng)度,minCount 為詞語(yǔ)的最小出現(xiàn)次數(shù)牧氮,windowSize 為訓(xùn)練窗口的大小琼腔,inputCol 為輸入的列名,outputCol 為輸出的列名
from pyspark.ml.feature import Word2Vec
w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='vector', minCount=3)
model = w2v_model.fit(words_df)
model.save("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
加載訓(xùn)練好的 Word2Vec 模型
from pyspark.ml.feature import Word2VecModel
w2v_model = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
vectors = w2v_model.getVectors()
vectors
結(jié)果如下所示踱葛,其中 vector 是訓(xùn)練后的每個(gè)詞的 100 維詞向量丹莲,是 vector 類型格式的,如 [0.2 -0.05 -0.1 ...]
這里尸诽,我們計(jì)算出了所有詞語(yǔ)的詞向量甥材,接下來(lái),還要得到關(guān)鍵詞的詞向量性含,因?yàn)槲覀冃枰ㄟ^(guò)關(guān)鍵詞的詞向量來(lái)計(jì)算文章的詞向量洲赵。那么,首先通過(guò)讀取頻道內(nèi)的文章畫(huà)像來(lái)得到關(guān)鍵詞(實(shí)際場(chǎng)景應(yīng)該只讀取新增文章畫(huà)像)
article_profile = spark.sql("select * from article_profile where channel_id=18")
在文章畫(huà)像表中商蕴,關(guān)鍵詞和權(quán)重是存儲(chǔ)在同一列的叠萍,我們可以利用 LATERAL VIEW explode()
方法,將 map 類型的 keywords 列中的關(guān)鍵詞和權(quán)重轉(zhuǎn)換成單獨(dú)的兩列數(shù)據(jù)
article_profile.registerTempTable('profile')
keyword_weight = spark.sql("select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight")
keyword_weight
結(jié)果如下所示绪商,keyword 為關(guān)鍵詞苛谷,weight 為對(duì)應(yīng)的權(quán)重
這時(shí)就可以利用關(guān)鍵詞 keyword 列,將文章關(guān)鍵詞 keyword_weight
與詞向量結(jié)果 vectors
進(jìn)行內(nèi)連接格郁,從而得到每個(gè)關(guān)鍵詞的詞向量
keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')
keywords_vector
結(jié)果如下所示腹殿,vector 即對(duì)應(yīng)關(guān)鍵詞的 100 維詞向量
接下來(lái)独悴,將文章每個(gè)關(guān)鍵詞的詞向量加入權(quán)重信息,這里使每個(gè)關(guān)鍵詞的詞向量 = 關(guān)鍵詞的權(quán)重 x 關(guān)鍵詞的詞向量赫蛇,即 weight_vector = weight x vector绵患,注意這里的 vector
為 vector 類型,所以 weight x vector 是權(quán)重和向量的每個(gè)元素相乘悟耘,向量的長(zhǎng)度保持不變
def compute_vector(row):
return row.article_id, row.channel_id, row.keyword, row.weight * row.vector
article_keyword_vectors = keywords_vector.rdd.map(compute_vector).toDF(["article_id", "channel_id", "keyword", "weightingVector"])
article_keyword_vectors
結(jié)果如下所示落蝙,weightingVector 即為加入權(quán)重信息后的關(guān)鍵詞的詞向量
再將上面的結(jié)果按照 article_id 進(jìn)行分組,利用 collect_set()
方法暂幼,將一篇文章內(nèi)所有關(guān)鍵詞的詞向量合并為一個(gè)列表
article_keyword_vectors.registerTempTable('temptable')
article_keyword_vectors = spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id")
article_keyword_vectors
結(jié)果如下所示筏勒,vectors 即為文章內(nèi)所有關(guān)鍵詞向量的列表,如 [[0.6 0.2 ...], [0.1 -0.07 ...], ...]
接下來(lái)旺嬉,利用上面得出的二維列表管行,計(jì)算每篇文章內(nèi)所有關(guān)鍵詞的詞向量的平均值,作為文章的詞向量邪媳。注意捐顷,這里的 vectors
是包含多個(gè)詞向量的列表,詞向量列表的平均值等于其中每個(gè)詞向量的對(duì)應(yīng)元素相加再除以詞向量的個(gè)數(shù)
def compute_avg_vectors(row):
x = 0
for i in row.vectors:
x += i
# 求平均值
return row.article_id, row.channel_id, x / len(row.vectors)
article_vector = article_keyword_vectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])
article_vector
結(jié)果如下所示
此時(shí)雨效,article_vector
中的 vector
列還是 vector 類型迅涮,而 Hive 不支持該數(shù)據(jù)類型,所以需要將 vector 類型轉(zhuǎn)成 array 類型(list)
def to_list(row):
return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]
article_vector = article_vector.rdd.map(to_list).toDF(['article_id', 'channel_id', 'vector'])
在 Hive 中創(chuàng)建文章詞向量表 article_vector
CREATE TABLE article_vector
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
articlevector ARRAY<DOUBLE> comment "keyword"
);
最后徽龟,將 18 號(hào)頻道內(nèi)的所有文章的詞向量存儲(chǔ)到 Hive 的文章詞向量表 article_vector 中
article_vector.write.insertInto("article_vector")
這樣叮姑,我們就計(jì)算出了 18 號(hào)頻道下每篇文章的詞向量,在實(shí)際場(chǎng)景中据悔,我們還要分別計(jì)算出其他所有頻道下每篇文章的詞向量传透。
計(jì)算文章相似度
前面我們計(jì)算出了文章的詞向量,接下來(lái)就可以根據(jù)文章的詞向量來(lái)計(jì)算文章的相似度了极颓。通常我們會(huì)有幾百萬(wàn)朱盐、幾千萬(wàn)甚至上億規(guī)模的文章數(shù)據(jù),為了優(yōu)化計(jì)算性能菠隆,我們可以只計(jì)算每個(gè)頻道內(nèi)文章之間的相似度托享,因?yàn)橥ǔV挥邢嗤l道的文章關(guān)聯(lián)性較高,而不同頻道之間的文章通常關(guān)聯(lián)性較低浸赫。在每個(gè)頻道內(nèi),我們還可以用聚類或局部敏感哈希對(duì)文章進(jìn)行分桶赃绊,將文章相似度的計(jì)算限制在更小的范圍既峡,只計(jì)算相同分類內(nèi)或相同桶內(nèi)的文章相似度。
- 聚類(Clustering)碧查,對(duì)每個(gè)頻道內(nèi)的文章進(jìn)行聚類运敢,可以使用 KMeans 算法校仑,需要提前設(shè)定好類別個(gè)數(shù) K,聚類算法的時(shí)間復(fù)雜度并不小传惠,也可以使用一些優(yōu)化的聚類算法迄沫,比如二分聚類、層次聚類等卦方。但通常聚類算法也比較耗時(shí)羊瘩,所以通常被使用更多的是局部敏感哈希。
Spark 的 BisectingKMeans 模型訓(xùn)練代碼示例
from pyspark.ml.clustering import BisectingKMeans
bkmeans = BisectingKMeans(k=100, minDivisibleClusterSize=50, featuresCol="articlevector", predictionCol='group')
bkmeans_model = bkmeans.fit(article_vector)
bkmeans_model.save("hdfs://hadoop-master:9000/headlines/models/articleBisKmeans/channel_%d_%s.bkmeans" % (channel_id, channel))
- 局部敏感哈希 LSH(Locality Sensitive Hashing)盼砍,LSH 算法是基于一個(gè)假設(shè)尘吗,如果兩個(gè)文本在原有的數(shù)據(jù)空間是相似的,那么經(jīng)過(guò)哈希函數(shù)轉(zhuǎn)換以后浇坐,它們?nèi)匀痪哂泻芨叩南嗨贫炔谴罚丛较嗨频奈谋驹诠V螅涞较嗤耐皟?nèi)的概率就越高近刘。所以擒贸,我們只需要將目標(biāo)文章進(jìn)行哈希映射并得到其桶號(hào),然后取出該桶內(nèi)的所有文章觉渴,再進(jìn)行線性匹配即可查找到與目標(biāo)文章相鄰的文章介劫。其實(shí) LSH 并不能保證一定能夠查找到與目標(biāo)文章最相鄰的文章,而是在減少需要匹配的文章個(gè)數(shù)的同時(shí)疆拘,保證查找到最近鄰的文章的概率很大蜕猫。
下面我們將使用 LSH 模型來(lái)計(jì)算文章相似度,首先哎迄,讀取 18 號(hào)頻道內(nèi)所有文章的 ID 和詞向量作為訓(xùn)練集
article_vector = spark.sql("select article_id, articlevector from article_vector where channel_id=18")
train = articlevector.select(['article_id', 'articlevector'])
文章詞向量表中的詞向量是被存儲(chǔ)為 array 類型的回右,我們利用 Spark 的 Vectors.dense()
方法,將 array 類型(list)轉(zhuǎn)為 vector 類型
from pyspark.ml.linalg import Vectors
def list_to_vector(row):
return row.article_id, Vectors.dense(row.articlevector)
train = train.rdd.map(list_to_vector).toDF(['article_id', 'articlevector'])
使用訓(xùn)練集 train
對(duì) Spark 的 BucketedRandomProjectionLSH
模型進(jìn)行訓(xùn)練漱挚,其中 inputCol 為輸入特征列翔烁,outputCol 為輸出特征列,numHashTables 為哈希表數(shù)量旨涝,bucketLength 為桶的數(shù)量蹬屹,數(shù)量越多,相同數(shù)據(jù)進(jìn)入到同一個(gè)桶的概率就越高
from pyspark.ml.feature import BucketedRandomProjectionLSH
brp = BucketedRandomProjectionLSH(inputCol='articlevector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)
訓(xùn)練好模型后白华,調(diào)用 approxSimilarityJoin()
方法即可計(jì)算數(shù)據(jù)之間的相似度慨默,如 model.approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance')
就是利用歐幾里得距離作為相似度,計(jì)算在 df1 與 df2 每條數(shù)據(jù)的相似度弧腥,這里我們計(jì)算訓(xùn)練集中所有文章之間的相似度
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')
similar
結(jié)果如下所示厦取,EuclideanDistance 就是兩篇文章的歐幾里得距離,即相似度
在后面的推薦流程中管搪,會(huì)經(jīng)常查詢文章相似度虾攻,所以出于性能考慮铡买,我們選擇將文章相似度結(jié)果存儲(chǔ)到 Hbase 中。首先創(chuàng)建文章相似度表
create 'article_similar', 'similar'
然后存儲(chǔ)文章相似度結(jié)果
def save_hbase(partition):
import happybase
pool = happybase.ConnectionPool(size=3, host='hadoop-master')
with pool.connection() as conn:
# 建立表連接
table = conn.table('article_similar')
for row in partition:
if row.datasetA.article_id != row.datasetB.article_id:
table.put(str(row.datasetA.article_id).encode(), {"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})
# 手動(dòng)關(guān)閉所有的連接
conn.close()
similar.foreachPartition(save_hbase)
Apscheduler 定時(shí)更新
將文章相似度計(jì)算加入到文章畫(huà)像更新方法中霎箍,首先合并最近一個(gè)小時(shí)的文章完整信息奇钞,接著計(jì)算 TF-IDF 和 TextRank 權(quán)重,并根據(jù) TF-IDF 和 TextRank 權(quán)重計(jì)算得出關(guān)鍵詞和主題詞漂坏,最后計(jì)算文章的詞向量及文章的相似度
def update_article_profile():
"""
定時(shí)更新文章畫(huà)像及文章相似度
:return:
"""
ua = UpdateArticle()
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():
textrank_keywords_df, keywordsIndex = ua.generate_article_label()
article_profile = ua.get_article_profile(textrank_keywords_df, keywordsIndex)
ua.compute_article_similar(article_profile)
參考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤(pán)景埃, 提取碼:eakp)