推薦閱讀:
文章推薦系統(tǒng) | 一、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二伯病、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三造烁、收集用戶(hù)行為數(shù)據(jù)
文章推薦系統(tǒng) | 四、構(gòu)建離線(xiàn)文章畫(huà)像
文章推薦系統(tǒng) | 五午笛、計(jì)算文章相似度
前面我們完成了文章畫(huà)像的構(gòu)建以及文章相似度的計(jì)算惭蟋,接下來(lái),我們就要實(shí)現(xiàn)用戶(hù)畫(huà)像的構(gòu)建了药磺。用戶(hù)畫(huà)像往往是大型網(wǎng)站的重要模塊告组,基于用戶(hù)畫(huà)像不僅可以實(shí)現(xiàn)個(gè)性化推薦,還可以實(shí)現(xiàn)用戶(hù)分群癌佩、精準(zhǔn)推送木缝、精準(zhǔn)營(yíng)銷(xiāo)以及用戶(hù)行為預(yù)測(cè)便锨、商業(yè)化轉(zhuǎn)化分析等,為商業(yè)決策提供數(shù)據(jù)支持我碟。通常用戶(hù)畫(huà)像包括用戶(hù)屬性信息(性別放案、年齡、出生日期等)怎囚、用戶(hù)行為信息(瀏覽卿叽、收藏、點(diǎn)贊等)以及環(huán)境信息(時(shí)間恳守、地理位置等)考婴。
處理用戶(hù)行為數(shù)據(jù)
在數(shù)據(jù)準(zhǔn)備階段,我們通過(guò) Flume 已經(jīng)可以將用戶(hù)行為數(shù)據(jù)收集到 Hive 的 user_action 表的 HDFS 路徑中催烘,先來(lái)看一下這些數(shù)據(jù)長(zhǎng)什么樣子沥阱,我們讀取當(dāng)天的用戶(hù)行為數(shù)據(jù),注意讀取之前要先關(guān)聯(lián)分區(qū)
_day = time.strftime("%Y-%m-%d", time.localtime())
_localions = '/user/hive/warehouse/profile.db/user_action/' + _day
if fs.exists(_localions):
# 如果有該文件直接關(guān)聯(lián)伊群,捕獲關(guān)聯(lián)重復(fù)異常
try:
self.spark.sql("alter table user_action add partition (dt='%s') location '%s'" % (_day, _localions))
except Exception as e:
pass
self.spark.sql("use profile")
user_action = self.spark.sql("select actionTime, readTime, channelId, param.articleId, param.algorithmCombine, param.action, param.userId from user_action where dt>=" + _day)
user_action
結(jié)果如下所示
可以發(fā)現(xiàn)考杉,上面的一條記錄代表用戶(hù)對(duì)文章的一次行為,但通常我們需要查詢(xún)某個(gè)用戶(hù)對(duì)某篇文章的所有行為舰始,所以崇棠,我們要將這里用戶(hù)對(duì)文章的多條行為數(shù)據(jù)合并為一條,其中包括用戶(hù)對(duì)文章的所有行為丸卷。我們需要新建一個(gè) Hive 表 user_article_basic枕稀,這張表包括了用戶(hù) ID、文章 ID谜嫉、是否曝光萎坷、是否點(diǎn)擊、閱讀時(shí)間等等沐兰,隨后我們將處理好的用戶(hù)行為數(shù)據(jù)存儲(chǔ)到此表中
create table user_article_basic
(
user_id BIGINT comment "userID",
action_time STRING comment "user actions time",
article_id BIGINT comment "articleid",
channel_id INT comment "channel_id",
shared BOOLEAN comment "is shared",
clicked BOOLEAN comment "is clicked",
collected BOOLEAN comment "is collected",
exposure BOOLEAN comment "is exposured",
read_time STRING comment "reading time"
)
COMMENT "user_article_basic"
CLUSTERED by (user_id) into 2 buckets
STORED as textfile
LOCATION '/user/hive/warehouse/profile.db/user_article_basic';
遍歷每一條原始用戶(hù)行為數(shù)據(jù)哆档,判斷用戶(hù)對(duì)文章的行為,在 user_action_basic 中將該用戶(hù)與該文章對(duì)應(yīng)的行為設(shè)置為 True
if user_action.collect():
def _generate(row):
_list = []
if row.action == 'exposure':
for article_id in eval(row.articleId):
# ["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
_list.append(
[row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])
return _list
else:
class Temp(object):
shared = False
clicked = False
collected = False
read_time = ""
_tp = Temp()
if row.action == 'click':
_tp.clicked = True
elif row.action == 'share':
_tp.shared = True
elif row.action == 'collect':
_tp.collected = True
elif row.action == 'read':
_tp.clicked = True
_list.append(
[row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,
True, row.readTime])
return _list
user_action_basic = user_action.rdd.flatMap(_generate)
user_action_basic = user_action_basic.toDF(
["user_id", "action_time", "article_id", "channel_id", "shared", "clicked", "collected", "exposure",
"read_time"])
user_action_basic
結(jié)果如下所示住闯,這里的一條記錄包括了某個(gè)用戶(hù)對(duì)某篇文章的所有行為
由于 Hive 目前還不支持 pyspark 的原子性操作瓜浸,所以 user_article_basic 表的用戶(hù)行為數(shù)據(jù)只能全量更新(實(shí)際場(chǎng)景中可以選擇其他語(yǔ)言或數(shù)據(jù)庫(kù)實(shí)現(xiàn))。這里寞秃,我們需要將當(dāng)天的用戶(hù)行為與 user_action_basic 的歷史用戶(hù)行為進(jìn)行合并
old_data = uup.spark.sql("select * from user_article_basic")
new_data = old_data.unionAll(user_action_basic)
合并后又會(huì)產(chǎn)生一個(gè)新的問(wèn)題斟叼,那就是用戶(hù) ID 和文章 ID 可能重復(fù),因?yàn)榻裉炷硞€(gè)用戶(hù)對(duì)某篇文章的記錄可能在歷史數(shù)據(jù)中也存在春寿,而 unionAll()
方法并沒(méi)有去重朗涩,這里我們可以按照用戶(hù) ID 和文章 ID 進(jìn)行分組,利用 max()
方法得到 action_time, channel_id, shared, clicked, collected, exposure, read_time 即可绑改,去重后直接存儲(chǔ)到 user_article_basic 表中
new_data.registerTempTable("temptable")
self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time,
article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked,
max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable
group by user_id, article_id''')
表 user_article_basic 結(jié)果如下所示
計(jì)算用戶(hù)畫(huà)像
我們選擇將用戶(hù)畫(huà)像存儲(chǔ)在 Hbase 中谢床,因?yàn)?Hbase 支持原子性操作和快速讀取兄一,并且 Hive 也可以通過(guò)創(chuàng)建外部表關(guān)聯(lián)到 Hbase,進(jìn)行離線(xiàn)分析识腿,如果要?jiǎng)h除 Hive 外部表的話(huà)出革,對(duì) Hbase 也沒(méi)有影響。首先渡讼,在 Hbase 中創(chuàng)建用戶(hù)畫(huà)像表
create 'user_profile', 'basic','partial','env'
在 Hive 中創(chuàng)建 Hbase 外部表骂束,注意字段類(lèi)型設(shè)置為 map
create external table user_profile_hbase
(
user_id STRING comment "userID",
information MAP<STRING, DOUBLE> comment "user basic information",
article_partial MAP<STRING, DOUBLE> comment "article partial",
env MAP<STRING, INT> comment "user env"
)
COMMENT "user profile table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
TBLPROPERTIES ("hbase.table.name" = "user_profile");
創(chuàng)建外部表之后,還需要導(dǎo)入一些依賴(lài)包
cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/
cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/
接下來(lái)成箫,讀取處理好的用戶(hù)行為數(shù)據(jù)展箱,由于日志中的 channel_id 有可能是來(lái)自于推薦頻道(0),而不是文章真實(shí)的頻道蹬昌,所以這里要將 channel_id 列刪除
spark.sql("use profile")
user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')
通過(guò)文章 ID混驰,將用戶(hù)行為數(shù)據(jù)與文章畫(huà)像數(shù)據(jù)進(jìn)行連接,從而得到文章頻道 ID 和文章主題詞
spark.sql('use article')
article_topic = spark.sql("select article_id, channel_id, topics from article_profile")
user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])
user_article_topic
結(jié)果如下圖所示皂贩,其中 topics 列即為文章主題詞列表栖榨,如 ['補(bǔ)碼', '字符串', '李白', ...]
接下來(lái),我們需要計(jì)算每一個(gè)主題詞對(duì)于用戶(hù)的權(quán)重明刷,所以需要將 topics 列中的每個(gè)主題詞都拆分為單獨(dú)的一條記錄婴栽。可以利用 Spark 的 explode()
方法辈末,達(dá)到類(lèi)似“爆炸”的效果
import pyspark.sql.functions as F
user_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')
user_article_topic
如下圖所示
我們通過(guò)用戶(hù)對(duì)哪些文章發(fā)生了行為以及該文章有哪些主題詞居夹,計(jì)算出了用戶(hù)對(duì)哪些主題詞發(fā)生了行為。這樣本冲,我們就可以根據(jù)用戶(hù)對(duì)主題詞的行為來(lái)計(jì)算主題詞對(duì)用戶(hù)的權(quán)重,并且將這些主題詞作為用戶(hù)的標(biāo)簽劫扒。那么檬洞,用戶(hù)標(biāo)簽權(quán)重的計(jì)算公式為:用戶(hù)標(biāo)簽權(quán)重 =(用戶(hù)行為分值之和)x 時(shí)間衰減。其中沟饥,時(shí)間衰減公式為:時(shí)間衰減系數(shù) = 1 / (log(t) + 1)添怔,其中 t 為發(fā)生行為的時(shí)間距離當(dāng)前時(shí)間的大小
不同的用戶(hù)行為對(duì)應(yīng)不同的權(quán)重,如下所示
用戶(hù)行為 | 分值 |
---|---|
閱讀時(shí)間(<1000) | 1 |
閱讀時(shí)間(>=1000) | 2 |
收藏 | 2 |
分享 | 3 |
點(diǎn)擊 | 5 |
計(jì)算用戶(hù)標(biāo)簽及權(quán)重贤旷,并存儲(chǔ)到 Hbase 中 user_profile 表的 partial 列族中广料。注意,這里我們將頻道 ID 和標(biāo)簽一起作為 partial 列族的鍵存儲(chǔ)幼驶,這樣我們就方便查詢(xún)不同頻道的標(biāo)簽及權(quán)重了
def compute_user_label_weights(partitions):
""" 計(jì)算用戶(hù)標(biāo)簽權(quán)重
"""
action_weight = {
"read_min": 1,
"read_middle": 2,
"collect": 2,
"share": 3,
"click": 5
}
from datetime import datetime
import numpy as np
# 循環(huán)處理每個(gè)用戶(hù)對(duì)應(yīng)的每個(gè)主題詞
for row in partitions:
# 計(jì)算時(shí)間衰減系數(shù)
t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
alpha = 1 / (np.log(t.days + 1) + 1)
if row.read_time == '':
read_t = 0
else:
read_t = int(row.read_time)
# 計(jì)算閱讀時(shí)間的行為分?jǐn)?shù)
read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']
# 計(jì)算各種行為的權(quán)重和并乘以時(shí)間衰減系數(shù)
weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +
row.collected * action_weight['collect'] + read_score)
# 更新到user_profilehbase表
with pool.connection() as conn:
table = conn.table('user_profile')
table.put('user:{}'.format(row.user_id).encode(),
{'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
weights).encode()})
conn.close()
user_topic.foreachPartition(compute_user_label_weights)
在 Hive 中查詢(xún)用戶(hù)標(biāo)簽及權(quán)重
hive> select * from user_profile_hbase limit 1;
OK
user:1 {"birthday":0.0,"gender":null} {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR......
接下來(lái),要將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中。讀取用戶(hù)基礎(chǔ)信息湿痢,存儲(chǔ)到用戶(hù)畫(huà)像表的 basic 列族即可
def update_user_info():
"""
更新用戶(hù)畫(huà)像的屬性信息
:return:
"""
spark.sql("use toutiao")
user_basic = spark.sql("select user_id, gender, birthday from user_profile")
def udapte_user_basic(partition):
import happybase
# 用于讀取hbase緩存結(jié)果配置
pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
for row in partition:
from datetime import date
age = 0
if row.birthday != 'null':
born = datetime.strptime(row.birthday, '%Y-%m-%d')
today = date.today()
age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))
with pool.connection() as conn:
table = conn.table('user_profile')
table.put('user:{}'.format(row.user_id).encode(),
{'basic:gender'.encode(): json.dumps(row.gender).encode()})
table.put('user:{}'.format(row.user_id).encode(),
{'basic:birthday'.encode(): json.dumps(age).encode()})
conn.close()
user_basic.foreachPartition(udapte_user_basic)
到這里,我們的用戶(hù)畫(huà)像就計(jì)算完成了畅铭。
Apscheduler 定時(shí)更新
定義更新用戶(hù)畫(huà)像方法,首先處理用戶(hù)行為日志勃蜘,拆分文章主題詞硕噩,接著計(jì)算用戶(hù)標(biāo)簽的權(quán)重,最后再將用戶(hù)屬性信息加入到用戶(hù)畫(huà)像中
def update_user_profile():
"""
定時(shí)更新用戶(hù)畫(huà)像的邏輯
:return:
"""
up = UpdateUserProfile()
if up.update_user_action_basic():
up.update_user_label()
up.update_user_info()
在 Apscheduler 中添加定時(shí)更新用戶(hù)畫(huà)像任務(wù)缭贡,設(shè)定每隔 2 個(gè)小時(shí)更新一次
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
# 創(chuàng)建scheduler炉擅,多進(jìn)程執(zhí)行
executors = {
'default': ProcessPoolExecutor(3)
}
scheduler = BlockingScheduler(executors=executors)
# 添加一個(gè)定時(shí)運(yùn)行文章畫(huà)像更新的任務(wù), 每隔1個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一個(gè)定時(shí)運(yùn)行用戶(hù)畫(huà)像更新的任務(wù)阳惹, 每隔2個(gè)小時(shí)運(yùn)行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
scheduler.start()
另外說(shuō)一下谍失,在實(shí)際場(chǎng)景中,用戶(hù)畫(huà)像往往是非常復(fù)雜的穆端,下面是電商場(chǎng)景的用戶(hù)畫(huà)像袱贮,可以了解一下。
參考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤(pán)体啰, 提取碼:eakp)