七披坏、Azkaban企業(yè)應(yīng)用實例

一、業(yè)務(wù)場景

在廣告追蹤系統(tǒng)中盐数,我們通過提供SDK給用戶,把各種各樣的用戶數(shù)據(jù)采集到我們的服務(wù)器中伞梯,然后通過MR計算玫氢,統(tǒng)計各種輸出。在本文中谜诫,筆者將抽取其中一種業(yè)務(wù)場景:計算用戶留存和付費LTV漾峡。

為了計算以上兩個指標(biāo),需要采集三類數(shù)據(jù):賬戶的激活喻旷、在線生逸、付費記錄。其中用戶留存和付費LTV的計算過程如下:

1且预、用戶留存:把用戶今天在線的數(shù)據(jù)槽袄,與一個月內(nèi)的用戶激活數(shù)據(jù)做對比,找出今天在線的用戶锋谐,是在那天激活的,并計算出差別的天數(shù),這就是用戶留存的計算方法传睹。

2蹲诀、付費LTV:找出今天哪些用戶付費了,把這些用戶三热,與一個月內(nèi)的用戶激活數(shù)據(jù)做對比鼓择,找出今天付費的用戶,是在那天激活的就漾,并計算出差別的天數(shù)呐能,然后把今天付費的總額,除以差別的天數(shù)从藤,得出付費LTV催跪。

出于對公司數(shù)據(jù)安全考慮锁蠕,這里不會貼出任何數(shù)據(jù)和計算代碼,只會把與Azkaban相關(guān)的job信息和思路寫出來懊蒸,讀者可以作為參考荣倾。

二、處理思路

1骑丸、原始的用戶數(shù)據(jù)是混合在一起的舌仍,都放在按天分區(qū)的hdfs的指定目錄下,這樣通危,我們就需要寫一個作為數(shù)據(jù)清洗的MR類铸豁,把原始日志中的在線,激活菊碟,付費三類數(shù)據(jù)分別輸出到獨立的文件中节芥。這在hadoopMR中可以通過輸出文件后綴的方式進行區(qū)分。

2逆害、完成第一步后头镊,我們需要把三類數(shù)據(jù)分別進行統(tǒng)計,比如按照appid進行統(tǒng)計魄幕,幣別需要轉(zhuǎn)換相艇,激活時間需要從時間戳轉(zhuǎn)換為日期等步驟。

3纯陨、第三步就需要把這三類數(shù)據(jù)分別入庫到hive中坛芽,供后面的hiveSQL進行join操作。

4翼抠、把在線數(shù)據(jù)與激活數(shù)據(jù)做join咙轩,得出用戶留存;把付費數(shù)據(jù)與激活數(shù)據(jù)做join阴颖,得出付費LTV臭墨。這兩類數(shù)據(jù)計算完成后,需要入庫到新的表中膘盖。

5胧弛、最后在kylin中進行計算,用戶就可以在kylin中查詢統(tǒng)計結(jié)果了侠畔。

總的數(shù)據(jù)處理流程如下:

Total Task

三结缚、具體job編寫

1、logStat.job:數(shù)據(jù)拆分

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

tmpjars=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/ad-tracker-mr2-1.0.0-SNAPSHOT-jar-with-dependencies.jar


input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/*/input/self-event*

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat

calculate.date=all


job.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper


mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.LogStatReducer


mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel


mapred.output.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.output.value.class=org.apache.hadoop.io.NullWritable


mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=com.dataeye.tracker.mr.common.SuffixMultipleOutputFormat

2软棺、onlineLogStat.job:在線數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs


input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ONLINE

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}


job.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper


mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.OnlineLogStatReducer


mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel


mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable


mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

3红竭、activeLogStat.job:激活數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs


input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_ACTIVE,/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD-1}/00/adtActiveLogStat

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}


job.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper


mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.ActiveLogStatReducer


mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel


mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable


mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

4、paymentLogStat.job:付費數(shù)據(jù)清洗

type=hadoopJava

job.extend=true

force.output.overwrite=true

mapred.mapper.new-api=true

mapred.reducer.new-api=true

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs


input.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/TrackingLogStat/*TRACING_ACTIVE_LOG_PAYMENT

output.path=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat

calculate.date=${DD:YYYY}-${DD:MM}-${DD:DD}


job.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper


mapreduce.map.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatMapper

mapreduce.reduce.class=com.dataeye.tracker.mr.mapred.actionpay.PaymentLogStatReducer


mapred.mapoutput.key.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel

mapred.mapoutput.value.class=com.dataeye.tracker.mr.common.OutFieldsBaseModel


mapred.output.key.class=org.apache.hadoop.io.Text

mapred.output.value.class=org.apache.hadoop.io.NullWritable


mapreduce.inputformat.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

mapreduce.outputformat.class=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

dependencies=logStat

5、onlineHive.job:在線數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_online.sql

dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtOnlineLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=onlineLogStat

hive_online.sql:

use azkaban;

load data inpath '${dataPath}' overwrite into table adt_logstat_online PARTITION(day_p='${day_p}');

6茵宪、activeHive.job:激活數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active.sql

dataPath=hdfs://de-hdfs/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtActiveLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=activeLogStat

hive_active.sql


use azkaban;

alter table adt_logstat_active_ext set location '${dataPath}';

INSERT overwrite TABLE adt_logstat_active PARTITION (day_p='${day_p}') SELECT appid,channel,compaign,publisher,site,country,province,city,deviceId,activeDate FROM adt_logstat_active_ext;

7最冰、paymentHive.job:付費數(shù)據(jù)入庫

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_payment.sql

dataPath=/user/tracking/event_logserver/${DD:YYYY}/${DD:MM}/${DD:DD}/00/adtPaymentLogStat

day_p=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=paymentLogStat

hive_payment.sql

use azkaban;

load data inpath '${dataPath}' overwrite into table adt_logstat_payment PARTITION(day_p='${day_p}');

8、activeOnlineHive.job:用戶留存統(tǒng)計

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active_online.sql

now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}

bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}

dependencies=activeHive,onlineHive

hive_active_online.sql

use azkaban;

INSERT overwrite TABLE user_retain_roll PARTITION (day_p='${now_day}') SELECT av.appid as appid, ol.channel as channel,ol.compaign as compaign,ol.publisher as publisher,ol.site as site, count(av.deviceId) AS total FROM adt_logstat_online AS ol INNER JOIN adt_logstat_active AS av ON ol.deviceId = av.deviceId and ol.appid = av.appid WHERE ol.day_p = '${now_day}' AND av.activeDate BETWEEN '${bef_day}' AND '${now_day}' GROUP BY av.appid, ol.channel,ol.compaign,ol.publisher,ol.site,av.activeDate;

9稀火、activePaymentHive.job:付費LTV統(tǒng)計

type=hive

user.to.proxy=azkaban

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs

azk.hive.action=execute.query

hive.script = res/hive_active_payment.sql

now_day=${DD:YYYY}-${DD:MM}-${DD:DD-1}

bef_day=${DD:YYYY}-${DD:MM}-${DD:DD-3}

dependencies=activeHive,paymentHive

hive_active_payment.sql

use azkaban;

INSERT overwrite TABLE user_retain_ltv PARTITION (day_p='${now_day}') select  av.appid as appid, py.channel as channel,py.compaign as compaign,py.publisher as publisher,py.site as site, py.deviceId as deviceId, py.paymentCount AS payment from adt_logstat_payment as py  inner join adt_logstat_active as av on av.deviceId = py.deviceId and av.appid = py.appid where py.day_p = '${now_day}' and av.day_p = '${now_day}';

10暖哨、kylin.job:kylin計算

type=hadoopJava

job.extend=false

job.class=com.dataeye.kylin.azkaban.JavaMain

classpath=./lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/lib/*,/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/extlib/*

jvm.args=-Dlog4j.log.dir=/home/hadoop2/azkaban/azkaban-solo-server-3.0.0/logs


kylin.url=http://mysql8:7070/kylin/api

cube.name=user_retain_roll_cube,user_retain_ltv_cube

cube.date=${DD:YYYY}-${DD:MM}-${DD:DD-1}

dependencies=activeOnlineHive,activePaymentHive

四、打包運行

打包過程與之前一致凰狞,最終的目錄結(jié)構(gòu)如下:

目錄結(jié)構(gòu)

運行結(jié)果如下:

Success Task

可以查看執(zhí)行日志:

Success Task Detail
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末篇裁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子赡若,更是在濱河造成了極大的恐慌达布,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,835評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件逾冬,死亡現(xiàn)場離奇詭異黍聂,居然都是意外死亡,警方通過查閱死者的電腦和手機身腻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,900評論 2 383
  • 文/潘曉璐 我一進店門分冈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人霸株,你說我怎么就攤上這事〖牵” “怎么了去件?”我有些...
    開封第一講書人閱讀 156,481評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長扰路。 經(jīng)常有香客問我尤溜,道長,這世上最難降的妖魔是什么汗唱? 我笑而不...
    開封第一講書人閱讀 56,303評論 1 282
  • 正文 為了忘掉前任宫莱,我火速辦了婚禮,結(jié)果婚禮上哩罪,老公的妹妹穿的比我還像新娘授霸。我一直安慰自己,他們只是感情好际插,可當(dāng)我...
    茶點故事閱讀 65,375評論 5 384
  • 文/花漫 我一把揭開白布碘耳。 她就那樣靜靜地躺著,像睡著了一般框弛。 火紅的嫁衣襯著肌膚如雪辛辨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,729評論 1 289
  • 那天,我揣著相機與錄音斗搞,去河邊找鬼指攒。 笑死,一個胖子當(dāng)著我的面吹牛僻焚,可吹牛的內(nèi)容都是我干的允悦。 我是一名探鬼主播,決...
    沈念sama閱讀 38,877評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼溅呢,長吁一口氣:“原來是場噩夢啊……” “哼澡屡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起咐旧,我...
    開封第一講書人閱讀 37,633評論 0 266
  • 序言:老撾萬榮一對情侶失蹤驶鹉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后铣墨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體室埋,經(jīng)...
    沈念sama閱讀 44,088評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,443評論 2 326
  • 正文 我和宋清朗相戀三年伊约,在試婚紗的時候發(fā)現(xiàn)自己被綠了姚淆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,563評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡屡律,死狀恐怖腌逢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情超埋,我是刑警寧澤搏讶,帶...
    沈念sama閱讀 34,251評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站霍殴,受9級特大地震影響媒惕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜来庭,卻給世界環(huán)境...
    茶點故事閱讀 39,827評論 3 312
  • 文/蒙蒙 一妒蔚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧月弛,春花似錦肴盏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,712評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至佛寿,卻和暖如春幌墓,著一層夾襖步出監(jiān)牢的瞬間但壮,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,943評論 1 264
  • 我被黑心中介騙來泰國打工常侣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜡饵,地道東北人。 一個月前我還...
    沈念sama閱讀 46,240評論 2 360
  • 正文 我出身青樓胳施,卻偏偏與公主長得像溯祸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子舞肆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,435評論 2 348

推薦閱讀更多精彩內(nèi)容