文章推薦系統(tǒng) | 五砍的、計(jì)算文章相似度

推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二莺治、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三廓鞠、收集用戶行為數(shù)據(jù)
文章推薦系統(tǒng) | 四、構(gòu)建離線文章畫(huà)像

在上篇文章中谣旁,我們已經(jīng)完成了離線文章畫(huà)像的構(gòu)建床佳,接下來(lái),我們要為相似文章推薦做準(zhǔn)備榄审,那就是計(jì)算文章之間的相似度砌们。首先,我們要計(jì)算出文章的詞向量,然后利用文章的詞向量來(lái)計(jì)算文章的相似度浪感。

計(jì)算文章詞向量

我們可以通過(guò)大量的歷史文章數(shù)據(jù)昔头,訓(xùn)練文章中每個(gè)詞的詞向量,由于文章數(shù)據(jù)過(guò)多影兽,通常是分頻道進(jìn)行詞向量訓(xùn)練揭斧,即每個(gè)頻道訓(xùn)練一個(gè)詞向量模型,我們包括的頻道如下所示

channel_info = {
            1: "html",
            2: "開(kāi)發(fā)者資訊",
            3: "ios",
            4: "c++",
            5: "android",
            6: "css",
            7: "數(shù)據(jù)庫(kù)",
            8: "區(qū)塊鏈",
            9: "go",
            10: "產(chǎn)品",
            11: "后端",
            12: "linux",
            13: "人工智能",
            14: "php",
            15: "javascript",
            16: "架構(gòu)",
            17: "前端",
            18: "python",
            19: "java",
            20: "算法",
            21: "面試",
            22: "科技動(dòng)態(tài)",
            23: "js",
            24: "設(shè)計(jì)",
            25: "數(shù)碼產(chǎn)品",
        }

接下來(lái)峻堰,分別對(duì)各自頻道內(nèi)的文章進(jìn)行分詞處理讹开,這里先選取 18 號(hào)頻道內(nèi)的所有文章,進(jìn)行分詞處理

spark.sql("use article")
article_data = spark.sql("select * from article_data where channel_id=18")
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])

def segmentation(partition):
    import os
    import re
    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 結(jié)巴加載用戶詞典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用詞文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用詞列表
    stopwords_list = get_stopwords_list()

    # 分詞
    def cut_sentence(sentence):
        """對(duì)切割之后的詞語(yǔ)進(jìn)行過(guò)濾捐名,去除停用詞旦万,保留名詞,英文和自定義詞庫(kù)中的詞桐筏,長(zhǎng)度大于2的詞"""
        # eg:[pair('今天', 't'), pair('有', 'd'), pair('霧', 'n'), pair('霾', 'g')]
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_list]
        filtered_words_list = []
        for seg in seg_list:
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["x", "eng"]:  # 是自定一個(gè)詞語(yǔ)或者是英文單詞
                filtered_words_list.append(seg.word)
        return filtered_words_list

    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)    # 替換掉標(biāo)簽數(shù)據(jù)
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words

words_df 結(jié)果如下所示纸型,words 為分詞后的詞語(yǔ)列表

接著,使用分詞后的所有詞語(yǔ)梅忌,對(duì) Word2Vec 模型進(jìn)行訓(xùn)練并將模型保存到 HDFS狰腌,其中 vectorSize 為詞向量的長(zhǎng)度,minCount 為詞語(yǔ)的最小出現(xiàn)次數(shù)牧氮,windowSize 為訓(xùn)練窗口的大小琼腔,inputCol 為輸入的列名,outputCol 為輸出的列名

from pyspark.ml.feature import Word2Vec

w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='vector', minCount=3)
model = w2v_model.fit(words_df)
model.save("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")

加載訓(xùn)練好的 Word2Vec 模型

from pyspark.ml.feature import Word2VecModel

w2v_model = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
vectors = w2v_model.getVectors()

vectors 結(jié)果如下所示踱葛,其中 vector 是訓(xùn)練后的每個(gè)詞的 100 維詞向量丹莲,是 vector 類型格式的,如 [0.2 -0.05 -0.1 ...]

這里尸诽,我們計(jì)算出了所有詞語(yǔ)的詞向量甥材,接下來(lái),還要得到關(guān)鍵詞的詞向量性含,因?yàn)槲覀冃枰ㄟ^(guò)關(guān)鍵詞的詞向量來(lái)計(jì)算文章的詞向量洲赵。那么,首先通過(guò)讀取頻道內(nèi)的文章畫(huà)像來(lái)得到關(guān)鍵詞(實(shí)際場(chǎng)景應(yīng)該只讀取新增文章畫(huà)像)

article_profile = spark.sql("select * from article_profile where channel_id=18")

在文章畫(huà)像表中商蕴,關(guān)鍵詞和權(quán)重是存儲(chǔ)在同一列的叠萍,我們可以利用 LATERAL VIEW explode() 方法,將 map 類型的 keywords 列中的關(guān)鍵詞和權(quán)重轉(zhuǎn)換成單獨(dú)的兩列數(shù)據(jù)

article_profile.registerTempTable('profile')
keyword_weight = spark.sql("select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight")

keyword_weight 結(jié)果如下所示绪商,keyword 為關(guān)鍵詞苛谷,weight 為對(duì)應(yīng)的權(quán)重

這時(shí)就可以利用關(guān)鍵詞 keyword 列,將文章關(guān)鍵詞 keyword_weight 與詞向量結(jié)果 vectors 進(jìn)行內(nèi)連接格郁,從而得到每個(gè)關(guān)鍵詞的詞向量

keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')

keywords_vector 結(jié)果如下所示腹殿,vector 即對(duì)應(yīng)關(guān)鍵詞的 100 維詞向量

接下來(lái)独悴,將文章每個(gè)關(guān)鍵詞的詞向量加入權(quán)重信息,這里使每個(gè)關(guān)鍵詞的詞向量 = 關(guān)鍵詞的權(quán)重 x 關(guān)鍵詞的詞向量赫蛇,即 weight_vector = weight x vector绵患,注意這里的 vector 為 vector 類型,所以 weight x vector 是權(quán)重和向量的每個(gè)元素相乘悟耘,向量的長(zhǎng)度保持不變

def compute_vector(row):
    return row.article_id, row.channel_id, row.keyword, row.weight * row.vector

article_keyword_vectors = keywords_vector.rdd.map(compute_vector).toDF(["article_id", "channel_id", "keyword", "weightingVector"])

article_keyword_vectors 結(jié)果如下所示落蝙,weightingVector 即為加入權(quán)重信息后的關(guān)鍵詞的詞向量

再將上面的結(jié)果按照 article_id 進(jìn)行分組,利用 collect_set() 方法暂幼,將一篇文章內(nèi)所有關(guān)鍵詞的詞向量合并為一個(gè)列表

article_keyword_vectors.registerTempTable('temptable')
article_keyword_vectors = spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id")

article_keyword_vectors 結(jié)果如下所示筏勒,vectors 即為文章內(nèi)所有關(guān)鍵詞向量的列表,如 [[0.6 0.2 ...], [0.1 -0.07 ...], ...]

接下來(lái)旺嬉,利用上面得出的二維列表管行,計(jì)算每篇文章內(nèi)所有關(guān)鍵詞的詞向量的平均值,作為文章的詞向量邪媳。注意捐顷,這里的 vectors 是包含多個(gè)詞向量的列表,詞向量列表的平均值等于其中每個(gè)詞向量的對(duì)應(yīng)元素相加再除以詞向量的個(gè)數(shù)

def compute_avg_vectors(row):
    x = 0
    for i in row.vectors:
        x += i
    # 求平均值
    return row.article_id, row.channel_id, x / len(row.vectors)

article_vector = article_keyword_vectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])

article_vector 結(jié)果如下所示

此時(shí)雨效,article_vector 中的 vector 列還是 vector 類型迅涮,而 Hive 不支持該數(shù)據(jù)類型,所以需要將 vector 類型轉(zhuǎn)成 array 類型(list)

def to_list(row):
    return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]

article_vector = article_vector.rdd.map(to_list).toDF(['article_id', 'channel_id', 'vector'])

在 Hive 中創(chuàng)建文章詞向量表 article_vector

CREATE TABLE article_vector
(
    article_id INT comment "article_id",
    channel_id INT comment "channel_id",
    articlevector ARRAY<DOUBLE> comment "keyword"
);

最后徽龟,將 18 號(hào)頻道內(nèi)的所有文章的詞向量存儲(chǔ)到 Hive 的文章詞向量表 article_vector 中

article_vector.write.insertInto("article_vector")

這樣叮姑,我們就計(jì)算出了 18 號(hào)頻道下每篇文章的詞向量,在實(shí)際場(chǎng)景中据悔,我們還要分別計(jì)算出其他所有頻道下每篇文章的詞向量传透。

計(jì)算文章相似度

前面我們計(jì)算出了文章的詞向量,接下來(lái)就可以根據(jù)文章的詞向量來(lái)計(jì)算文章的相似度了极颓。通常我們會(huì)有幾百萬(wàn)朱盐、幾千萬(wàn)甚至上億規(guī)模的文章數(shù)據(jù),為了優(yōu)化計(jì)算性能菠隆,我們可以只計(jì)算每個(gè)頻道內(nèi)文章之間的相似度托享,因?yàn)橥ǔV挥邢嗤l道的文章關(guān)聯(lián)性較高,而不同頻道之間的文章通常關(guān)聯(lián)性較低浸赫。在每個(gè)頻道內(nèi),我們還可以用聚類或局部敏感哈希對(duì)文章進(jìn)行分桶赃绊,將文章相似度的計(jì)算限制在更小的范圍既峡,只計(jì)算相同分類內(nèi)或相同桶內(nèi)的文章相似度。

  • 聚類(Clustering)碧查,對(duì)每個(gè)頻道內(nèi)的文章進(jìn)行聚類运敢,可以使用 KMeans 算法校仑,需要提前設(shè)定好類別個(gè)數(shù) K,聚類算法的時(shí)間復(fù)雜度并不小传惠,也可以使用一些優(yōu)化的聚類算法迄沫,比如二分聚類、層次聚類等卦方。但通常聚類算法也比較耗時(shí)羊瘩,所以通常被使用更多的是局部敏感哈希。

Spark 的 BisectingKMeans 模型訓(xùn)練代碼示例

from pyspark.ml.clustering import BisectingKMeans

bkmeans = BisectingKMeans(k=100, minDivisibleClusterSize=50, featuresCol="articlevector", predictionCol='group')
bkmeans_model = bkmeans.fit(article_vector)
bkmeans_model.save("hdfs://hadoop-master:9000/headlines/models/articleBisKmeans/channel_%d_%s.bkmeans" % (channel_id, channel))
  • 局部敏感哈希 LSH(Locality Sensitive Hashing)盼砍,LSH 算法是基于一個(gè)假設(shè)尘吗,如果兩個(gè)文本在原有的數(shù)據(jù)空間是相似的,那么經(jīng)過(guò)哈希函數(shù)轉(zhuǎn)換以后浇坐,它們?nèi)匀痪哂泻芨叩南嗨贫炔谴罚丛较嗨频奈谋驹诠V螅涞较嗤耐皟?nèi)的概率就越高近刘。所以擒贸,我們只需要將目標(biāo)文章進(jìn)行哈希映射并得到其桶號(hào),然后取出該桶內(nèi)的所有文章觉渴,再進(jìn)行線性匹配即可查找到與目標(biāo)文章相鄰的文章介劫。其實(shí) LSH 并不能保證一定能夠查找到與目標(biāo)文章最相鄰的文章,而是在減少需要匹配的文章個(gè)數(shù)的同時(shí)疆拘,保證查找到最近鄰的文章的概率很大蜕猫。

下面我們將使用 LSH 模型來(lái)計(jì)算文章相似度,首先哎迄,讀取 18 號(hào)頻道內(nèi)所有文章的 ID 和詞向量作為訓(xùn)練集

article_vector = spark.sql("select article_id, articlevector from article_vector where channel_id=18")
train = articlevector.select(['article_id', 'articlevector'])

文章詞向量表中的詞向量是被存儲(chǔ)為 array 類型的回右,我們利用 Spark 的 Vectors.dense() 方法,將 array 類型(list)轉(zhuǎn)為 vector 類型

from pyspark.ml.linalg import Vectors

def list_to_vector(row):
    return row.article_id, Vectors.dense(row.articlevector)

train = train.rdd.map(list_to_vector).toDF(['article_id', 'articlevector'])

使用訓(xùn)練集 train 對(duì) Spark 的 BucketedRandomProjectionLSH 模型進(jìn)行訓(xùn)練漱挚,其中 inputCol 為輸入特征列翔烁,outputCol 為輸出特征列,numHashTables 為哈希表數(shù)量旨涝,bucketLength 為桶的數(shù)量蹬屹,數(shù)量越多,相同數(shù)據(jù)進(jìn)入到同一個(gè)桶的概率就越高

from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(inputCol='articlevector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)

訓(xùn)練好模型后白华,調(diào)用 approxSimilarityJoin() 方法即可計(jì)算數(shù)據(jù)之間的相似度慨默,如 model.approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance') 就是利用歐幾里得距離作為相似度,計(jì)算在 df1 與 df2 每條數(shù)據(jù)的相似度弧腥,這里我們計(jì)算訓(xùn)練集中所有文章之間的相似度

similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

similar 結(jié)果如下所示厦取,EuclideanDistance 就是兩篇文章的歐幾里得距離,即相似度

在后面的推薦流程中管搪,會(huì)經(jīng)常查詢文章相似度虾攻,所以出于性能考慮铡买,我們選擇將文章相似度結(jié)果存儲(chǔ)到 Hbase 中。首先創(chuàng)建文章相似度表

create 'article_similar', 'similar'

然后存儲(chǔ)文章相似度結(jié)果

def save_hbase(partition):
    import happybase
    pool = happybase.ConnectionPool(size=3, host='hadoop-master')

    with pool.connection() as conn:
        # 建立表連接
        table = conn.table('article_similar')
        for row in partition:
            if row.datasetA.article_id != row.datasetB.article_id:
                table.put(str(row.datasetA.article_id).encode(), {"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})
                
        # 手動(dòng)關(guān)閉所有的連接
        conn.close()

similar.foreachPartition(save_hbase)

Apscheduler 定時(shí)更新

將文章相似度計(jì)算加入到文章畫(huà)像更新方法中霎箍,首先合并最近一個(gè)小時(shí)的文章完整信息奇钞,接著計(jì)算 TF-IDF 和 TextRank 權(quán)重,并根據(jù) TF-IDF 和 TextRank 權(quán)重計(jì)算得出關(guān)鍵詞和主題詞漂坏,最后計(jì)算文章的詞向量及文章的相似度

def update_article_profile():
    """
    定時(shí)更新文章畫(huà)像及文章相似度
    :return:
    """
    ua = UpdateArticle()
    sentence_df = ua.merge_article_data()
    if sentence_df.rdd.collect():
        textrank_keywords_df, keywordsIndex = ua.generate_article_label()
        article_profile = ua.get_article_profile(textrank_keywords_df, keywordsIndex)
        ua.compute_article_similar(article_profile)

參考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤(pán)景埃, 提取碼:eakp)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市樊拓,隨后出現(xiàn)的幾起案子纠亚,更是在濱河造成了極大的恐慌,老刑警劉巖筋夏,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蒂胞,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡条篷,警方通過(guò)查閱死者的電腦和手機(jī)骗随,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)赴叹,“玉大人鸿染,你說(shuō)我怎么就攤上這事∑蚯桑” “怎么了涨椒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)绽媒。 經(jīng)常有香客問(wèn)我蚕冬,道長(zhǎng),這世上最難降的妖魔是什么是辕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任囤热,我火速辦了婚禮,結(jié)果婚禮上获三,老公的妹妹穿的比我還像新娘旁蔼。我一直安慰自己,他們只是感情好疙教,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布棺聊。 她就那樣靜靜地躺著,像睡著了一般贞谓。 火紅的嫁衣襯著肌膚如雪躺屁。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,031評(píng)論 1 285
  • 那天经宏,我揣著相機(jī)與錄音犀暑,去河邊找鬼。 笑死烁兰,一個(gè)胖子當(dāng)著我的面吹牛耐亏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播沪斟,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼广辰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了主之?” 一聲冷哼從身側(cè)響起择吊,我...
    開(kāi)封第一講書(shū)人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎槽奕,沒(méi)想到半個(gè)月后几睛,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡粤攒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年所森,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夯接。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡焕济,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盔几,到底是詐尸還是另有隱情晴弃,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布逊拍,位于F島的核電站上鞠,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏顺献。R本人自食惡果不足惜旗国,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望注整。 院中可真熱鬧能曾,春花似錦、人聲如沸肿轨。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)椒袍。三九已至驼唱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間驹暑,已是汗流浹背玫恳。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工辨赐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人京办。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓掀序,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親惭婿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子不恭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容