案例
aggregateByKey算子其實相當于是針對不同“key”數(shù)據(jù)做一個map+reduce規(guī)約的操作。
舉一個簡單的在生產(chǎn)環(huán)境中的一段代碼
有一些整理好的日志字段,經(jīng)過處理得到了RDD類型為(String,(String,String))的List格式結(jié)果查刻,其中各個String代表的是:(用戶名,(訪問時間,訪問頁面url))
同一個用戶可能在不同的時間訪問了不同或相同的頁面,為了合并同一個用戶的訪問行為岖赋,寫了下面這段代碼,用到aggregateByKey瓮孙。
val data = sc.parallelize(
List(
("13909029812",("20170507","http://www.baidu.com")),("18089376778",("20170401","http://www.google.com")),("18089376778",("20170508","http://www.taobao.com")),("13909029812",("20170507","http://www.51cto.com"))
)
)
data.aggregateByKey(scala.collection.mutable.Set[(String, String)](), 200)((set, item) => {
set += item
}, (set1, set2) => set1 union set2).mapValues(x => x.toIterable).collect
結(jié)果:
res12: Array[(String, Iterable[(String, String)])] = Array((18089376778,Set((20170401,http://www.google.com), (20170508,http://www.taobao.com))), (13909029812,Set((20170507,http://www.51cto.com), (20170507,http://www.baidu.com))))
分解分析:##
aggregateByKey(參數(shù)1)(參數(shù)2贾节,參數(shù)3)
過程:對于data的某個key,參數(shù)1為初始化值衷畦,在參數(shù)2的函數(shù)中,初始值和該key的每一個value傳入函數(shù)進行操作知牌,所有返回的結(jié)果在參數(shù)3中進行規(guī)約祈争。
- 參數(shù)1
scala.collection.mutable.Set[(String, String)]()
new 了一個空的set集合,做為初始值
參數(shù)2
(set, item) => {
set += item
}
一個類似于map的映射函數(shù)角寸,將該key的每一個value(在本案例之是(訪問時間菩混,訪問url))作為item,將其放入set中并返回扁藕。
可知某個key的所有value都會返回一個含有該value的set參數(shù)3
(set1, set2) => set1 union set2
該key的所有value得到的set進行union規(guī)約沮峡。并返回
最終結(jié)果:得到了每一個用戶在所有時間的訪問url的行為信息。