1.基礎:wordCount
2.三個重要自定義接口:partitioner羹奉、combiner教寂、自定義排序(WritableComparator)
partitioner用于自定義maptask執(zhí)行結果分區(qū)吆玖,按照分區(qū)結果啟動相應數(shù)量reduce紧帕,默認使用對key進行hash的方式分區(qū)挨措。(例子:對手機流量統(tǒng)計同時按照歸屬地進行分區(qū))自定義一個partitioner繼承抽象類:Partitioner然后在job對象中惭缰,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)珠漂,即可在maptask處理數(shù)據時(將數(shù)據寫入緩沖區(qū))對數(shù)據進行自定義分區(qū)晚缩。
combiner用于處理maptask到reduceTask之間的中間結果,Combiner將有相同key的key/value對的value加起來媳危,減少溢寫到磁盤的數(shù)據量荞彼。Combiner會優(yōu)化MapReduce的中間結果,所以它在整個模型中會多次使用待笑。使用combiner要注意不能破壞最終的結果鸣皂,不適用于求平均值這種情況,具體問題具體分析。(例子:結合wordCount即可寞缝,可以發(fā)現(xiàn)每個mapTask執(zhí)行結果變成了類似reduceTask執(zhí)行結果<hello,n(n>1)>)癌压。
自定義排序:比如需要以某個bean作為key并按照bean中的某個屬性進行排序,需對這個bean實現(xiàn)WritableComparable接口荆陆,自定義排序邏輯滩届。(對手機流量統(tǒng)計結果按照總流量大小進行排序輸出)。
3.coding
3.1 兩表join算法的實現(xiàn)
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
在Reduce端實現(xiàn):
思路:
自定義join后的實體類型infoBean被啼,包含兩個表join后的屬性帜消,外加一個flag標識是哪個表的數(shù)據
mapTask:通過文件名判斷是哪種數(shù)據(order還是product),切分行浓体,賦值給infoBean泡挺,構造k-v鍵值對,key為join條件
reduceTask:可知一個productBean會對應多個orderBean命浴,在reduce階段娄猫,一次讀一組key相同的數(shù)據,通過flag區(qū)分是哪種bean(orderBean必然是一組咳促,productBean則是一個)稚新,對orderBean進行遍歷,將所缺的product數(shù)據由productBean跪腹,set進去即可褂删。
在Map端實現(xiàn):解決數(shù)據傾斜的問題
根據join條件,比如有很大一部分pid分區(qū)后涌入一個reduce冲茸,而其他pid只有少數(shù)屯阀,卻也涌入其他reduce,就會造成數(shù)據量大的reduce處理起來較慢轴术,并發(fā)效率低的情況难衰。解決方式是在Map端實現(xiàn),適用于關聯(lián)表中有小表的情形逗栽;可以將小表分發(fā)到所有的map節(jié)點盖袭,這樣,map節(jié)點就可以在本地對自己所讀到的大表數(shù)據進行join并輸出最終結果彼宠,可以大大提高join操作的并發(fā)度鳄虱,加快處理速度。
思路:
先在mapper類中預先定義好小表凭峡,進行join------引入實際場景中的解決方案:一次加載數(shù)據庫或者用distributedcache
在mapper類中重寫?setup方法拙已,該方法在map任務初始化時調用,在此處將小表讀取放入本地緩存摧冀。在map方法中只讀取大表倍踪,然后將小表和其關聯(lián)即可系宫,無需reduce。
指定需要緩存一個文件到所有的maptask運行節(jié)點工作目錄的方法:
3.2? 找出共同好友(difficult)
思路:
重點:好友關系是單向的建车,如何得到一組數(shù)據扩借,以兩個用戶為key,value為兩者的共同好友癞志。
第一步 :map
讀一行? A:B,C,D,F,E,O
輸出? ? <B,A><C,A><D,A><F,A><E,A><O,A>? ?(B在A的好友列表往枷、C在A的好友列表,D在A的好友列表凄杯。。秉宿。)
第二步:reduce
拿到的數(shù)據比如<C,A><C,B><C,E><C,F><C,G>......
輸出:<C? A,B,E,F,G…….>? ? ?——>C為key戒突,value為,列表中有C作為好友的人描睦,即膊存,A,B,E,F,G…都有C作為好友。
第三步:map忱叭,以第二步輸出作為輸入
讀入一行<C? A,B,E,F,G…….>
輸出<A-B,C><A-E,C><A-F,C>……還要把A,B,E,F,G…….先排序防止A-B和B-A這種情況
從而得到兩兩共同好友C
第四步:reduce
讀入數(shù)據 <A-B,C><A-B,F><A-B,G>.......
輸出: A-B? C,F,G,.....
A-B為key隔崎,收集到key對應的所有value,就是這兩兩的共同好友
3.3? 流量統(tǒng)計
1.對流量日志中的用戶統(tǒng)計總上韵丑、下行流量
map:切分數(shù)據爵卒,對日志進行處理,將電話號碼作為key撵彻,將上行流量钓株、下行流量、總流量生成一個bean作為value陌僵。
reduce:一個key轴合,得到一組bean,然后統(tǒng)計總流量碗短。
2.統(tǒng)計流量且按照流量大小倒序排序
使用第一步的結果作為輸入受葛,使bean作為key,電話號碼作為value偎谁,實現(xiàn)WritableComparable接口总滩,自定義排序方法,這樣就自動排序了搭盾。因為每個bean都是不同的咳秉,所以對于reduce來說,不存在一個key對應一組value鸯隅,所以reduce一次只處理一個k-v澜建,將v作為key向挖,key作v,得到排序后的結果炕舵。
3.根據號碼歸屬地對數(shù)據分區(qū)何之,將數(shù)據寫到不同數(shù)據
實現(xiàn)方法:自定義Partitioner,實現(xiàn)getPartition方法咽筋,然后在客戶端程序定義
job.setPartitionerClass(ProvincePartitioner.class);
自定義partition后溶推,要根據自定義partitioner的邏輯設置相應數(shù)量的reduce task
job.setNumReduceTasks(n);
這樣就可以按照自定義方式對數(shù)據進行分區(qū)處理了
注意:如果reduceTask的數(shù)量>= getPartition的結果數(shù)? ,則會多產生幾個空的輸出文件part-r-000xx
如果? ? 1<reduceTask的數(shù)量<getPartition的結果數(shù) 奸攻,則有一部分分區(qū)數(shù)據無處安放蒜危,會Exception!6媚汀辐赞!
如果 reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件硝训,最終結果都交給這一個reduceTask响委,最終也就只會產生一個結果文件 part-r-00000
3.4? web日志預處理
需求:對web訪問日志中的各字段識別切分,去除日志中不合法的記錄
定義一個流量數(shù)據的bean
定義一個數(shù)據校驗函數(shù)窖梁,當數(shù)據不全或者數(shù)據中日志的請求狀態(tài)字段為400赘风,設置為非法記錄。
使用mapper對數(shù)據進行處理纵刘,每次讀取一行邀窃,bean為key,當bean不合法彰导,不寫入蛔翅。