文章推薦系統(tǒng) | 六汽抚、構(gòu)建離線(xiàn)用戶(hù)畫(huà)像

推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二伯病、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三造烁、收集用戶(hù)行為數(shù)據(jù)
文章推薦系統(tǒng) | 四、構(gòu)建離線(xiàn)文章畫(huà)像
文章推薦系統(tǒng) | 五午笛、計(jì)算文章相似度

前面我們完成了文章畫(huà)像的構(gòu)建以及文章相似度的計(jì)算惭蟋,接下來(lái),我們就要實(shí)現(xiàn)用戶(hù)畫(huà)像的構(gòu)建了药磺。用戶(hù)畫(huà)像往往是大型網(wǎng)站的重要模塊告组,基于用戶(hù)畫(huà)像不僅可以實(shí)現(xiàn)個(gè)性化推薦,還可以實(shí)現(xiàn)用戶(hù)分群癌佩、精準(zhǔn)推送木缝、精準(zhǔn)營(yíng)銷(xiāo)以及用戶(hù)行為預(yù)測(cè)便锨、商業(yè)化轉(zhuǎn)化分析等,為商業(yè)決策提供數(shù)據(jù)支持我碟。通常用戶(hù)畫(huà)像包括用戶(hù)屬性信息(性別放案、年齡、出生日期等)怎囚、用戶(hù)行為信息(瀏覽卿叽、收藏、點(diǎn)贊等)以及環(huán)境信息(時(shí)間恳守、地理位置等)考婴。

處理用戶(hù)行為數(shù)據(jù)

在數(shù)據(jù)準(zhǔn)備階段,我們通過(guò) Flume 已經(jīng)可以將用戶(hù)行為數(shù)據(jù)收集到 Hive 的 user_action 表的 HDFS 路徑中催烘,先來(lái)看一下這些數(shù)據(jù)長(zhǎng)什么樣子沥阱,我們讀取當(dāng)天的用戶(hù)行為數(shù)據(jù),注意讀取之前要先關(guān)聯(lián)分區(qū)

_day = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + _day
if fs.exists(_localions):
    # 如果有該文件直接關(guān)聯(lián)伊群,捕獲關(guān)聯(lián)重復(fù)異常
    try:
        self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))
    except Exception as e:
        pass

    self.spark.sql("use profile")
    user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)

user_action 結(jié)果如下所示

可以發(fā)現(xiàn)考杉,上面的一條記錄代表用戶(hù)對(duì)文章的一次行為,但通常我們需要查詢(xún)某個(gè)用戶(hù)對(duì)某篇文章的所有行為舰始,所以崇棠,我們要將這里用戶(hù)對(duì)文章的多條行為數(shù)據(jù)合并為一條,其中包括用戶(hù)對(duì)文章的所有行為丸卷。我們需要新建一個(gè) Hive 表 user_article_basic枕稀,這張表包括了用戶(hù) ID、文章 ID谜嫉、是否曝光萎坷、是否點(diǎn)擊、閱讀時(shí)間等等沐兰,隨后我們將處理好的用戶(hù)行為數(shù)據(jù)存儲(chǔ)到此表中

create table user_article_basic
(
    user_id     BIGINT comment "userID",
    action_time STRING comment "user actions time",
    article_id  BIGINT comment "articleid",
    channel_id  INT comment "channel_id",
    shared      BOOLEAN comment "is shared",
    clicked     BOOLEAN comment "is clicked",
    collected   BOOLEAN comment "is collected",
    exposure    BOOLEAN comment "is exposured",
    read_time   STRING comment "reading time"
)
    COMMENT "user_article_basic"
    CLUSTERED by (user_id) into 2 buckets
    STORED as textfile
    LOCATION '/user/hive/warehouse/profile.db/user_article_basic';

遍歷每一條原始用戶(hù)行為數(shù)據(jù)哆档,判斷用戶(hù)對(duì)文章的行為,在 user_action_basic 中將該用戶(hù)與該文章對(duì)應(yīng)的行為設(shè)置為 True

if user_action.collect():
    def _generate(row):
        _list = []
        if row.action == 'exposure':
            for article_id in eval(row.articleId):
                # ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
                _list.append(
                    [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])
            return _list
        else:
            class Temp(object):
                shared = False
                clicked = False
                collected = False
                read_time = ""

            _tp = Temp()
            if row.action == 'click':
                _tp.clicked = True
            elif row.action == 'share':
                _tp.shared = True
            elif row.action == 'collect':
                _tp.collected = True
            elif row.action == 'read':
                _tp.clicked = True

            _list.append(
                [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,
                 True, row.readTime])
            return _list

    user_action_basic = user_action.rdd.flatMap(_generate)
    user_action_basic = user_action_basic.toDF(
        ["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",
         "read_time"])

user_action_basic 結(jié)果如下所示住闯,這里的一條記錄包括了某個(gè)用戶(hù)對(duì)某篇文章的所有行為

由于 Hive 目前還不支持 pyspark 的原子性操作瓜浸,所以 user_article_basic 表的用戶(hù)行為數(shù)據(jù)只能全量更新(實(shí)際場(chǎng)景中可以選擇其他語(yǔ)言或數(shù)據(jù)庫(kù)實(shí)現(xiàn))。這里寞秃,我們需要將當(dāng)天的用戶(hù)行為與 user_action_basic 的歷史用戶(hù)行為進(jìn)行合并

old_data = uup.spark.sql("select * from user_article_basic")
new_data = old_data.unionAll(user_action_basic)

合并后又會(huì)產(chǎn)生一個(gè)新的問(wèn)題斟叼,那就是用戶(hù) ID 和文章 ID 可能重復(fù),因?yàn)榻裉炷硞€(gè)用戶(hù)對(duì)某篇文章的記錄可能在歷史數(shù)據(jù)中也存在春寿,而 unionAll() 方法并沒(méi)有去重朗涩,這里我們可以按照用戶(hù) ID 和文章 ID 進(jìn)行分組,利用 max() 方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可绑改,去重后直接存儲(chǔ)到 user_article_basic 表中

new_data.registerTempTable("temptable")

self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time, 
        article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked, 
        max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable 
        group by user_id, article_id''')

表 user_article_basic 結(jié)果如下所示

計(jì)算用戶(hù)畫(huà)像

我們選擇將用戶(hù)畫(huà)像存儲(chǔ)在 Hbase 中谢床,因?yàn)?Hbase 支持原子性操作和快速讀取兄一,并且 Hive 也可以通過(guò)創(chuàng)建外部表關(guān)聯(lián)到 Hbase,進(jìn)行離線(xiàn)分析识腿,如果要?jiǎng)h除 Hive 外部表的話(huà)出革,對(duì) Hbase 也沒(méi)有影響。首先渡讼,在 Hbase 中創(chuàng)建用戶(hù)畫(huà)像表

create 'user_profile', 'basic','partial','env'

在 Hive 中創(chuàng)建 Hbase 外部表骂束,注意字段類(lèi)型設(shè)置為 map

create external table user_profile_hbase
(
    user_id         STRING comment "userID",
    information     MAP<STRING, DOUBLE> comment "user basic information",
    article_partial MAP<STRING, DOUBLE> comment "article partial",
    env             MAP<STRING, INT> comment "user env"
)
    COMMENT "user profile table"
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
        WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
    TBLPROPERTIES ("hbase.table.name" = "user_profile");

創(chuàng)建外部表之后,還需要導(dǎo)入一些依賴(lài)包

cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/
cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/

接下來(lái)成箫,讀取處理好的用戶(hù)行為數(shù)據(jù)展箱,由于日志中的 channel_id 有可能是來(lái)自于推薦頻道(0),而不是文章真實(shí)的頻道蹬昌,所以這里要將 channel_id 列刪除

spark.sql("use profile")
user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')

通過(guò)文章 ID混驰,將用戶(hù)行為數(shù)據(jù)與文章畫(huà)像數(shù)據(jù)進(jìn)行連接,從而得到文章頻道 ID 和文章主題詞

spark.sql('use article')
article_topic = spark.sql("select article_id, channel_id, topics from article_profile")
user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])

user_article_topic 結(jié)果如下圖所示皂贩,其中 topics 列即為文章主題詞列表栖榨,如 ['補(bǔ)碼', '字符串', '李白', ...]

接下來(lái),我們需要計(jì)算每一個(gè)主題詞對(duì)于用戶(hù)的權(quán)重明刷,所以需要將 topics 列中的每個(gè)主題詞都拆分為單獨(dú)的一條記錄婴栽。可以利用 Spark 的 explode() 方法辈末,達(dá)到類(lèi)似“爆炸”的效果

import pyspark.sql.functions as F

user_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')

user_article_topic 如下圖所示

我們通過(guò)用戶(hù)對(duì)哪些文章發(fā)生了行為以及該文章有哪些主題詞居夹,計(jì)算出了用戶(hù)對(duì)哪些主題詞發(fā)生了行為。這樣本冲,我們就可以根據(jù)用戶(hù)對(duì)主題詞的行為來(lái)計(jì)算主題詞對(duì)用戶(hù)的權(quán)重,并且將這些主題詞作為用戶(hù)的標(biāo)簽劫扒。那么檬洞,用戶(hù)標(biāo)簽權(quán)重的計(jì)算公式為:用戶(hù)標(biāo)簽權(quán)重 =(用戶(hù)行為分值之和)x 時(shí)間衰減。其中沟饥,時(shí)間衰減公式為:時(shí)間衰減系數(shù) = 1 / (log(t) + 1)添怔,其中 t 為發(fā)生行為的時(shí)間距離當(dāng)前時(shí)間的大小

不同的用戶(hù)行為對(duì)應(yīng)不同的權(quán)重,如下所示

用戶(hù)行為 分值
閱讀時(shí)間(<1000) 1
閱讀時(shí)間(>=1000) 2
收藏 2
分享 3
點(diǎn)擊 5

計(jì)算用戶(hù)標(biāo)簽及權(quán)重贤旷,并存儲(chǔ)到 Hbase 中 user_profile 表的 partial 列族中广料。注意,這里我們將頻道 ID 和標(biāo)簽一起作為 partial 列族的鍵存儲(chǔ)幼驶,這樣我們就方便查詢(xún)不同頻道的標(biāo)簽及權(quán)重了

def compute_user_label_weights(partitions):
    """ 計(jì)算用戶(hù)標(biāo)簽權(quán)重
    """
    action_weight = {
        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }

    from datetime import datetime
    import numpy as np
    
    # 循環(huán)處理每個(gè)用戶(hù)對(duì)應(yīng)的每個(gè)主題詞
    for row in partitions:
        # 計(jì)算時(shí)間衰減系數(shù)
        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        alpha = 1 / (np.log(t.days + 1) + 1)
        
        if row.read_time  == '':
            read_t = 0
        else:
            read_t = int(row.read_time)
        
        # 計(jì)算閱讀時(shí)間的行為分?jǐn)?shù)
        read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']
        
        # 計(jì)算各種行為的權(quán)重和并乘以時(shí)間衰減系數(shù)
        weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +
                          row.collected * action_weight['collect'] + read_score)
        
        # 更新到user_profilehbase表
        with pool.connection() as conn:
            table = conn.table('user_profile')
            table.put('user:{}'.format(row.user_id).encode(),
                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
                          weights).encode()})
            conn.close()

user_topic.foreachPartition(compute_user_label_weights)

在 Hive 中查詢(xún)用戶(hù)標(biāo)簽及權(quán)重

hive> select * from user_profile_hbase limit 1;
OK
user:1  {"birthday":0.0,"gender":null}  {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR......

接下來(lái),要將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中。讀取用戶(hù)基礎(chǔ)信息湿痢,存儲(chǔ)到用戶(hù)畫(huà)像表的 basic 列族即可

def update_user_info():
    """
    更新用戶(hù)畫(huà)像的屬性信息
    :return:
    """
    spark.sql("use toutiao")
    user_basic = spark.sql("select user_id, gender, birthday from user_profile")

    def udapte_user_basic(partition):

        import happybase
        #  用于讀取hbase緩存結(jié)果配置
        pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
        for row in partition:
            from datetime import date
            age = 0
            if row.birthday != 'null':
                born = datetime.strptime(row.birthday, '%Y-%m-%d')
                today = date.today()
                age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))

            with pool.connection() as conn:
                table = conn.table('user_profile')
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                table.put('user:{}'.format(row.user_id).encode(),
                          {'basic:birthday'.encode(): json.dumps(age).encode()})
                conn.close()

    user_basic.foreachPartition(udapte_user_basic)

到這里,我們的用戶(hù)畫(huà)像就計(jì)算完成了畅铭。

Apscheduler 定時(shí)更新

定義更新用戶(hù)畫(huà)像方法,首先處理用戶(hù)行為日志勃蜘,拆分文章主題詞硕噩,接著計(jì)算用戶(hù)標(biāo)簽的權(quán)重,最后再將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中

def update_user_profile():
    """
    定時(shí)更新用戶(hù)畫(huà)像的邏輯
    :return:
    """
    up = UpdateUserProfile()
    if up.update_user_action_basic():
        up.update_user_label()
        up.update_user_info()

在 Apscheduler 中添加定時(shí)更新用戶(hù)畫(huà)像任務(wù)缭贡,設(shè)定每隔 2 個(gè)小時(shí)更新一次

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 創(chuàng)建scheduler炉擅,多進(jìn)程執(zhí)行
executors = {
    'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一個(gè)定時(shí)運(yùn)行文章畫(huà)像更新的任務(wù), 每隔1個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一個(gè)定時(shí)運(yùn)行用戶(hù)畫(huà)像更新的任務(wù)阳惹, 每隔2個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)

scheduler.start()

另外說(shuō)一下谍失,在實(shí)際場(chǎng)景中,用戶(hù)畫(huà)像往往是非常復(fù)雜的穆端,下面是電商場(chǎng)景的用戶(hù)畫(huà)像袱贮,可以了解一下。

參考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤(pán)体啰, 提取碼:eakp)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末攒巍,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子荒勇,更是在濱河造成了極大的恐慌柒莉,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沽翔,死亡現(xiàn)場(chǎng)離奇詭異兢孝,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)仅偎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)跨蟹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人橘沥,你說(shuō)我怎么就攤上這事窗轩。” “怎么了座咆?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵痢艺,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我介陶,道長(zhǎng)堤舒,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任哺呜,我火速辦了婚禮舌缤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己友驮,他們只是感情好漂羊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著卸留,像睡著了一般走越。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耻瑟,一...
    開(kāi)封第一講書(shū)人閱讀 51,688評(píng)論 1 305
  • 那天旨指,我揣著相機(jī)與錄音,去河邊找鬼喳整。 笑死谆构,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的框都。 我是一名探鬼主播搬素,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼魏保!你這毒婦竟也來(lái)了熬尺?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤谓罗,失蹤者是張志新(化名)和其女友劉穎粱哼,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體檩咱,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡揭措,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了刻蚯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绊含。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖炊汹,靈堂內(nèi)的尸體忽然破棺而出艺挪,到底是詐尸還是另有隱情,我是刑警寧澤兵扬,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站口蝠,受9級(jí)特大地震影響器钟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜妙蔗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一傲霸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦昙啄、人聲如沸穆役。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)耿币。三九已至,卻和暖如春韧拒,著一層夾襖步出監(jiān)牢的瞬間淹接,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工叛溢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留塑悼,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓楷掉,卻偏偏與公主長(zhǎng)得像厢蒜,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子烹植,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容