一、業(yè)務(wù)場景
在廣告追蹤系統(tǒng)中盐数,我們通過提供SDK給用戶,把各種各樣的用戶數(shù)據(jù)采集到我們的服務(wù)器中伞梯,然后通過MR計算玫氢,統(tǒng)計各種輸出。在本文中谜诫,筆者將抽取其中一種業(yè)務(wù)場景:計算用戶留存和付費LTV漾峡。
為了計算以上兩個指標(biāo),需要采集三類數(shù)據(jù):賬戶的激活喻旷、在線生逸、付費記錄。其中用戶留存和付費LTV的計算過程如下:
1且预、用戶留存:把用戶今天在線的數(shù)據(jù)槽袄,與一個月內(nèi)的用戶激活數(shù)據(jù)做對比,找出今天在線的用戶锋谐,是在那天激活的,并計算出差別的天數(shù),這就是用戶留存的計算方法传睹。
2蹲诀、付費LTV:找出今天哪些用戶付費了,把這些用戶三热,與一個月內(nèi)的用戶激活數(shù)據(jù)做對比鼓择,找出今天付費的用戶,是在那天激活的就漾,并計算出差別的天數(shù)呐能,然后把今天付費的總額,除以差別的天數(shù)从藤,得出付費LTV催跪。
出于對公司數(shù)據(jù)安全考慮锁蠕,這里不會貼出任何數(shù)據(jù)和計算代碼,只會把與Azkaban相關(guān)的job信息和思路寫出來懊蒸,讀者可以作為參考荣倾。
二、處理思路
1骑丸、原始的用戶數(shù)據(jù)是混合在一起的舌仍,都放在按天分區(qū)的hdfs的指定目錄下,這樣通危,我們就需要寫一個作為數(shù)據(jù)清洗的MR類铸豁,把原始日志中的在線,激活菊碟,付費三類數(shù)據(jù)分別輸出到獨立的文件中节芥。這在hadoopMR中可以通過輸出文件后綴的方式進行區(qū)分。
2逆害、完成第一步后头镊,我們需要把三類數(shù)據(jù)分別進行統(tǒng)計,比如按照appid進行統(tǒng)計魄幕,幣別需要轉(zhuǎn)換相艇,激活時間需要從時間戳轉(zhuǎn)換為日期等步驟。
3纯陨、第三步就需要把這三類數(shù)據(jù)分別入庫到hive中坛芽,供后面的hiveSQL進行join操作。
4翼抠、把在線數(shù)據(jù)與激活數(shù)據(jù)做join咙轩,得出用戶留存;把付費數(shù)據(jù)與激活數(shù)據(jù)做join阴颖,得出付費LTV臭墨。這兩類數(shù)據(jù)計算完成后,需要入庫到新的表中膘盖。
5胧弛、最后在kylin中進行計算,用戶就可以在kylin中查詢統(tǒng)計結(jié)果了侠畔。
總的數(shù)據(jù)處理流程如下:
三结缚、具體job編寫
1、logStat.job:數(shù)據(jù)拆分
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
tmpjars=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/ad-tracker-mr2-1.0.0-SNAPSHOT-jar-with-dependencies.jar
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/*/input/self-event*
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat
calculate.date=all
job.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=com.dataeye.tracker.mr.common.SuffixMultipleOutputFormat
2软棺、onlineLogStat.job:在線數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ONLINE
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
3红竭、activeLogStat.job:激活數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ACTIVE,/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/00/adtActiveLogStat
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
4、paymentLogStat.job:付費數(shù)據(jù)清洗
type=hadoopJava
job.extend=true
force.output.overwrite=true
mapred.mapper.new-api=true
mapred.reducer.new-api=true
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_PAYMENT
output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat
calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}
job.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper
mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper
mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatReducer
mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel
mapred.output.key.class=org.apache.hadoop.io.Text
mapred.output.value.class=org.apache.hadoop.io.NullWritable
mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
dependencies=logStat
5、onlineHive.job:在線數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_online.sql
dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=onlineLogStat
hive_online.sql:
use azkaban;
load data inpath '${dataPath}' overwrite into table adt_logstat_online PARTITION(day_p='${day_p}');
6茵宪、activeHive.job:激活數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active.sql
dataPath=hdfs://de-hdfs/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=activeLogStat
hive_active.sql
use azkaban;
alter table adt_logstat_active_ext set location '${dataPath}';
INSERT overwrite TABLE adt_logstat_active PARTITION (day_p='${day_p}') SELECT appid,channel,compaign,publisher,site,country,province,city,deviceId,activeDate FROM adt_logstat_active_ext;
7最冰、paymentHive.job:付費數(shù)據(jù)入庫
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_payment.sql
dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat
day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=paymentLogStat
hive_payment.sql
use azkaban;
load data inpath '${dataPath}' overwrite into table adt_logstat_payment PARTITION(day_p='${day_p}');
8、activeOnlineHive.job:用戶留存統(tǒng)計
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active_online.sql
now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}
bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}
dependencies=activeHive,onlineHive
hive_active_online.sql
use azkaban;
INSERT overwrite TABLE user_retain_roll PARTITION (day_p='${now_day}') SELECT av.appid as appid, ol.channel as channel,ol.compaign as compaign,ol.publisher as publisher,ol.site as site, count(av.deviceId) AS total FROM adt_logstat_online AS ol INNER JOIN adt_logstat_active AS av ON ol.deviceId = av.deviceId and ol.appid = av.appid WHERE ol.day_p = '${now_day}' AND av.activeDate BETWEEN '${bef_day}' AND '${now_day}' GROUP BY av.appid, ol.channel,ol.compaign,ol.publisher,ol.site,av.activeDate;
9稀火、activePaymentHive.job:付費LTV統(tǒng)計
type=hive
user.to.proxy=azkaban
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
azk.hive.action=execute.query
hive.script = res/hive_active_payment.sql
now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}
bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}
dependencies=activeHive,paymentHive
hive_active_payment.sql
use azkaban;
INSERT overwrite TABLE user_retain_ltv PARTITION (day_p='${now_day}') select av.appid as appid, py.channel as channel,py.compaign as compaign,py.publisher as publisher,py.site as site, py.deviceId as deviceId, py.paymentCount AS payment from adt_logstat_payment as py inner join adt_logstat_active as av on av.deviceId = py.deviceId and av.appid = py.appid where py.day_p = '${now_day}' and av.day_p = '${now_day}';
10暖哨、kylin.job:kylin計算
type=hadoopJava
job.extend=false
job.class=com.dataeye.kylin.azkaban.JavaMain
classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*
jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs
kylin.url=http://mysql8:7070/kylin/api
cube.name=user_retain_roll_cube,user_retain_ltv_cube
cube.date=${DD:YYYY}-${DD:MM}-${DD:DD-1}
dependencies=activeOnlineHive,activePaymentHive
四、打包運行
打包過程與之前一致凰狞,最終的目錄結(jié)構(gòu)如下:
運行結(jié)果如下:
可以查看執(zhí)行日志: