需求:
從App啟動(dòng)日志中讀取所需數(shù)據(jù)凄鼻,用來(lái)得到用戶的常用啟動(dòng)時(shí)間點(diǎn),澄狗郑活動(dòng)的地理位置锦庸,并更新到用戶畫(huà)像表中。
背景:
device_id 設(shè)備id
start_time App啟動(dòng)時(shí)間
latitude 上傳緯度
longitude 上傳精度
實(shí)現(xiàn)
原始數(shù)據(jù)
DataFrame數(shù)據(jù)形式是結(jié)構(gòu)化數(shù)據(jù)蒲祈,如下
device_id start_time latitude longitude
0 ad3bb7b86f9ad9ff4786f767094f31f4835cf167 2017-07-26 00:18:55.985 39.69992988280924 116.5083292779666
1 7e98a8401070c6bcb46a9ef32efd0e6a7066a220 2017-07-26 00:04:37.532 39.82559758010746 116.4472136767569
2 865675021801835 2017-07-26 00:19:28.821 0.000000 0.000000
通過(guò)
schemeRdd.rdd變成普通rdd
結(jié)構(gòu)變?yōu)?[(device_id,start_time,latitude,longitude),(device_id,start_time,latitude,longitude),...]
1. 經(jīng)緯度處理成geohash6 合并到一起生成的RDD數(shù)據(jù)結(jié)構(gòu)
rdd.map(lambda x: (x['device_id'], x['start_time'],geohash.encode(x[lat],x[lng]))
得到數(shù)據(jù)結(jié)構(gòu)是:
[('device_id',start_time,geohash6)), (('device_id',start_time,geohash6))]
2. 拆分開(kāi)成2個(gè)rdd 甘萧,分別是時(shí)間和GeoHash的,分別處理
timerdd = rdd.map(lambda x: (x[0],x[1]))
timerdd_group=timerdd.groupByKey()
得到數(shù)據(jù)結(jié)構(gòu)
[('device_id1',['2017-09-01 12:34:00','2017-06-30 01:01:12']),('device_id2',['2017-09-05 11:15:32','2017-07-30 07:01:08'])]
geohashrdd=rdd.map(lambda x: (x[0],x[2]))
geohashrdd_group=geohashrdd.groupByKey()
得到數(shù)據(jù)結(jié)構(gòu)
[('device_id1',['EXXEQ6','OKWQE9','SDEDD1','SDQGW2']),('device_id2',['LLSJW0','YWQDX7'])]
2.1 得到用0-24來(lái)表示的常使用的時(shí)間點(diǎn) 梆掸, 得到最近使用時(shí)間到日期 (注意時(shí)間list也許沒(méi)數(shù)據(jù)扬卷,則設(shè)置默認(rèn)時(shí)間)
result_time_rdd= timerdd_group.mapValues(lambda x: use_time(x)) #該函數(shù)生成一個(gè)元祖 ('often_use_time','last_use_time')
得到結(jié)構(gòu)為
[('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
2.2 得到最常去的2個(gè)geohash的值
result_geohash_rdd = geohashrdd_group.mapValues(lambda x: activie_geohash(x) )
得到結(jié)構(gòu)為
[('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
- 根據(jù)結(jié)果數(shù)據(jù)結(jié)構(gòu),組拼更新SQL酸钦,更新數(shù)據(jù)
結(jié)果2個(gè)rdd
result_time_rdd = [('device_id1',(12,'2017-09-01')),('device_id2',(9,'2017-06-05')),('device_id3',(0,'1970-01-01'))]
result_geohash_rdd = [('device_id1',('EWEQ1','SDWQ3'),('device_id2',('UYWIE8','']),('device_id3',['','']]
通過(guò)cogroup作得到
[('device_id1',([12,'2017-09-01'],['EWEQ1','SDWQ3']),('device_id2',['UYWIE8','POKE5']),('device_id3',['MVBD5','ZCXV4']]
更新到表中
UPDATE recommend.user_tag SET open_app_period=x[1],open_app_last=x[2],active_loc_1=x[3],active_loc_2=x[4] WHERE device_id=%s
結(jié)束語(yǔ)
全文使用RDD的API來(lái)實(shí)現(xiàn)怪得,RDD的API十分強(qiáng)大
目前建議使用DataFrame來(lái)實(shí)現(xiàn)