推薦閱讀:
文章推薦系統(tǒng) | 一渴语、推薦流程設(shè)計(jì)
文章推薦系統(tǒng) | 二、同步業(yè)務(wù)數(shù)據(jù)
文章推薦系統(tǒng) | 三毅贮、收集用戶行為數(shù)據(jù)
文章推薦系統(tǒng) | 四办悟、構(gòu)建離線文章畫像
文章推薦系統(tǒng) | 五、計(jì)算文章相似度
文章推薦系統(tǒng) | 六滩褥、構(gòu)建離線用戶畫像
文章推薦系統(tǒng) | 七病蛉、構(gòu)建離線文章特征和用戶特征
文章推薦系統(tǒng) | 八、基于模型的離線召回
文章推薦系統(tǒng) | 九瑰煎、基于內(nèi)容的離線及在線召回
文章推薦系統(tǒng) | 十铺然、基于熱門文章和新文章的在線召回
文章推薦系統(tǒng) | 十一、基于 LR 模型的離線排序
文章推薦系統(tǒng) | 十二酒甸、基于 FTRL 模型的在線排序
上圖是 Wide&Deep 模型的網(wǎng)絡(luò)結(jié)構(gòu)魄健,深度學(xué)習(xí)可以通過嵌入(Embedding)表達(dá)出更精準(zhǔn)的用戶興趣及物品特征,不僅能減少人工特征工程的工作量插勤,還能提高模型的泛化能力沽瘦,使得用戶行為預(yù)估更加準(zhǔn)確。Wide&Deep 模型適合高維稀疏特征的推薦場(chǎng)景农尖,兼有稀疏特征的可解釋性和深模型的泛化能力析恋。通常將類別特征做 Embedding 學(xué)習(xí),再將 Embedding 稠密特征輸入深模型中盛卡。Wide 部分的輸入特征包括:類別特征和離散化的數(shù)值特征助隧,Deep部分的輸入特征包括:數(shù)值特征和 Embedding 后的類別特征。其中滑沧,Wide 部分使用 FTRL + L1并村;Deep 部分使用 AdaGrad,并且兩側(cè)是一起聯(lián)合進(jìn)行訓(xùn)練的滓技。
離線訓(xùn)練
TensorFlow 實(shí)現(xiàn)了很多深度模型哩牍,其中就包括 Wide&Deep,API 接口為 tf.estimator.DNNLinearCombinedClassifier
殖属,我們可以直接使用姐叁。在上篇文章中已經(jīng)實(shí)現(xiàn)了將訓(xùn)練數(shù)據(jù)寫入 TFRecord 文件,在這里可以直接讀取
@staticmethod
def read_ctr_records():
dataset = tf.data.TFRecordDataset(["./train_ctr_201905.tfrecords"])
dataset = dataset.map(parse_tfrecords)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.repeat(10000)
return dataset.make_one_shot_iterator().get_next()
解析每個(gè)樣本,將 TFRecord 中序列化的 feature 列外潜,解析成 channel_id (1), article_vector (100), user_weights (10), article_weights (10)
def parse_tfrecords(example):
features = {
"label": tf.FixedLenFeature([], tf.int64),
"feature": tf.FixedLenFeature([], tf.string)
}
parsed_features = tf.parse_single_example(example, features)
feature = tf.decode_raw(parsed_features['feature'], tf.float64)
feature = tf.reshape(tf.cast(feature, tf.float32), [1, 121])
# 特征順序 1 channel_id, 100 article_vector, 10 user_weights, 10 article_weights
# 1 channel_id類別型特征原环, 100維文章向量求平均值當(dāng)連續(xù)特征,10維用戶權(quán)重求平均值當(dāng)連續(xù)特征
channel_id = tf.cast(tf.slice(feature, [0, 0], [1, 1]), tf.int32)
vector = tf.reduce_sum(tf.slice(feature, [0, 1], [1, 100]), axis=1, keep_dims=True)
user_weights = tf.reduce_sum(tf.slice(feature, [0, 101], [1, 10]), axis=1, keep_dims=True)
article_weights = tf.reduce_sum(tf.slice(feature, [0, 111], [1, 10]), axis=1, keep_dims=True)
label = tf.reshape(tf.cast(parsed_features['label'], tf.float32), [1, 1])
# 構(gòu)造字典 名稱-tensor
FEATURE_COLUMNS = ['channel_id', 'vector', 'user_weigths', 'article_weights']
tensor_list = [channel_id, vector, user_weights, article_weights]
feature_dict = dict(zip(FEATURE_COLUMNS, tensor_list))
return feature_dict, label
指定輸入特征的數(shù)據(jù)類型处窥,并定義 Wide&Deep 模型 model
# 離散類型
channel_id = tf.feature_column.categorical_column_with_identity('channel_id', num_buckets=25)
# 連續(xù)類型
vector = tf.feature_column.numeric_column('vector')
user_weigths = tf.feature_column.numeric_column('user_weigths')
article_weights = tf.feature_column.numeric_column('article_weights')
wide_columns = [channel_id]
# embedding_column用來表示類別型的變量
deep_columns = [tf.feature_column.embedding_column(channel_id, dimension=25),
vector, user_weigths, article_weights]
estimator = tf.estimator.DNNLinearCombinedClassifier(model_dir="./ckpt/wide_and_deep",
linear_feature_columns=wide_columns,
dnn_feature_columns=deep_columns,
dnn_hidden_units=[1024, 512, 256])
通過調(diào)用 read_ctr_records() 方法嘱吗,來讀取 TFRecod 文件中的訓(xùn)練數(shù)據(jù),并設(shè)置訓(xùn)練步長滔驾,對(duì)定義好的 FTRL 模型進(jìn)行訓(xùn)練及預(yù)估
model.train(read_ctr_records, steps=1000)
result = model.evaluate(read_ctr_records)
可以用上一次模型的參數(shù)作為當(dāng)前模型的初始化參數(shù)谒麦,訓(xùn)練完成后,通常會(huì)進(jìn)行離線指標(biāo)分析哆致,若符合預(yù)期即可導(dǎo)出模型
columns = wide_columns + deep_columns
feature_spec = tf.feature_column.make_parse_example_spec(columns)
serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(feature_spec)
model.export_savedmodel("./serving_model/wdl/", serving_input_receiver_fn)
TFServing 部署
安裝
docker pull tensorflow/serving
啟動(dòng)
docker run -p 8501:8501 -p 8500:8500 --mount type=bind,source=/root/toutiao_project/reco_sys/server/models/serving_model/wdl,target=/models/wdl -e MODEL_NAME=wdl -t tensorflow/serving
- -p 8501:8501 為端口映射(-p 主機(jī)端口 : docker 容器程序)
- TFServing 使用 8501 端口對(duì)外提供 HTTP 服務(wù)绕德,使用8500對(duì)外提供 gRPC 服務(wù),這里同時(shí)開放了兩個(gè)端口的使用
- --mount type=bind,source=/home/ubuntu/detectedmodel/wdl,target=/models/wdl 為文件映射摊阀,將主機(jī)(source)的模型文件映射到 docker 容器程序(target)的位置耻蛇,以便 TFServing 使用模型,target 參數(shù)為 /models/模型名稱
- -e MODEL_NAME= wdl 設(shè)置了一個(gè)環(huán)境變量胞此,名為 MODEL_NAME臣咖,此變量被 TFServing 讀取,用來按名字尋找模型漱牵,與上面 target 參數(shù)中的模型名稱對(duì)應(yīng)
- -t 為 TFServing 創(chuàng)建一個(gè)偽終端夺蛇,供程序運(yùn)行
- tensorflow/serving 為鏡像名稱
在線排序
通常在線排序是根據(jù)用戶實(shí)時(shí)的推薦請(qǐng)求,對(duì)召回結(jié)果進(jìn)行 CTR 預(yù)估酣胀,進(jìn)而計(jì)算出排序結(jié)果并返回刁赦。我們需要根據(jù)召回結(jié)果構(gòu)造測(cè)試集,其中每個(gè)測(cè)試樣本包括用戶特征和文章特征闻镶。首先截型,根據(jù)用戶 ID 和頻道 ID 讀取用戶特征(用戶在每個(gè)頻道的特征不同,所以是分頻道存儲(chǔ)的)
try:
user_feature = eval(hbu.get_table_row('ctr_feature_user',
'{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode()))
except Exception as e:
user_feature = []
再根據(jù)用戶 ID 讀取召回結(jié)果
recall_set = read_hbase_recall('cb_recall',
'recall:user:{}'.format(temp.user_id).encode(),
'als:{}'.format(temp.channel_id).encode())
接著儒溉,遍歷召回結(jié)果,獲取文章特征发钝,并將用戶特征合并顿涣,構(gòu)建樣本
examples = []
for article_id in recall_set:
try:
article_feature = eval(hbu.get_table_row('ctr_feature_article',
'{}'.format(article_id).encode(),
'article:{}'.format(article_id).encode()))
except Exception as e:
article_feature = []
if not article_feature:
article_feature = [0.0] * 111
channel_id = int(article_feature[0])
# 計(jì)算后面若干向量的平均值
vector = np.mean(article_feature[11:])
# 用戶權(quán)重特征
user_feature = np.mean(user_feature)
# 文章權(quán)重特征
article_feature = np.mean(article_feature[1:11])
# 構(gòu)建example
example = tf.train.Example(features=tf.train.Features(feature={
"channel_id": tf.train.Feature(int64_list=tf.train.Int64List(value=[channel_id])),
"vector": tf.train.Feature(float_list=tf.train.FloatList(value=[vector])),
'user_weigths': tf.train.Feature(float_list=tf.train.FloatList(value=[user_feature])),
'article_weights': tf.train.Feature(float_list=tf.train.FloatList(value=[article_feature])),
}))
examples.append(example)
調(diào)用 TFServing 的模型服務(wù),獲取排序結(jié)果
with grpc.insecure_channel("127.0.0.1:8500") as channel:
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
request = classification_pb2.ClassificationRequest()
# 構(gòu)造請(qǐng)求酝豪,指定模型名稱涛碑,指定輸入樣本
request.model_spec.name = 'wdl'
request.input.example_list.examples.extend(examples)
# 發(fā)送請(qǐng)求,獲取排序結(jié)果
response = stub.Classify(request, 10.0)
這樣孵淘,我們就實(shí)現(xiàn)了 Wide&Deep 模型的離線訓(xùn)練和 TFServing 模型部署以及在線排序服務(wù)的調(diào)用蒲障。使用這種方式,線上服務(wù)需要將特征發(fā)送給TF Serving,這不可避免引入了網(wǎng)絡(luò) IO揉阎,給帶寬和預(yù)估時(shí)延帶來壓力庄撮。可以通過并發(fā)請(qǐng)求毙籽,召回多個(gè)召回結(jié)果集合洞斯,然后并發(fā)請(qǐng)求 TF Serving 模型服務(wù),這樣可以有效降低整體預(yù)估時(shí)延坑赡。還可以通過特征 ID 化烙如,將字符串類型的特征名哈希到 64 位整型空間,這樣有效減少傳輸?shù)臄?shù)據(jù)量毅否,降低使用的帶寬亚铁。
模型同步
實(shí)際環(huán)境中,我們可能還要經(jīng)常將離線訓(xùn)練好的模型同步到線上服務(wù)機(jī)器螟加,大致同步過程如下:
- 同步前徘溢,檢查模型 md5 文件,只有該文件更新了仰迁,才需要同步
- 同步時(shí)甸昏,隨機(jī)鏈接 HTTPFS 機(jī)器并限制下載速度
- 同步后,校驗(yàn)?zāi)P臀募?md5 值并備份舊模型
同步過程中徐许,需要處理發(fā)生錯(cuò)誤或者超時(shí)的情況施蜜,可以設(shè)定觸發(fā)報(bào)警或重試機(jī)制。通常模型的同步時(shí)間都在分鐘級(jí)別雌隅。
參考
https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(學(xué)習(xí)資源已保存至網(wǎng)盤翻默, 提取碼:eakp)