【前言】Combiners和Partitioner都是mapperReduce編程中mapper和reduce的中間步驟,他們的出現(xiàn)給MR計(jì)算的效率以及業(yè)務(wù)功能有很大的提高
Combiners編程的作用:
首先仪媒,Combiners編程其實(shí)本質(zhì)上就是一個(gè)Reduce搓萧,只不過它特殊在其實(shí)map階段的reduce档址。它主要活動(dòng)在mapper之后和reduce之前景东,主要將mapper產(chǎn)生的大量輸出提前先做一次合并或者過濾,以減少傳輸?shù)絩educe的數(shù)據(jù)量捆探,從而堅(jiān)強(qiáng)reduce的壓力然爆,提高效率。
如果不用combiner徐许,那么施蜜,所有的結(jié)果都是reduce完成,效率會(huì)相對(duì)低下雌隅。使用combiner,先完成的map會(huì)在本地聚合缸沃,提升速度恰起。
注意:Combiner的輸入時(shí)mapper的輸出,Combiner 的輸出是reduce的輸入趾牧。Combiner絕不能改變最終的計(jì)算結(jié)果检盼。所以從我的想法來看,Combiner只應(yīng)該用于那種Reduce的輸入key/value與輸出key/value類型完全一致翘单,且不影響最終結(jié)果的場(chǎng)景吨枉。比如累加,最大值等哄芜。
Combiner編程也可以對(duì)數(shù)據(jù)進(jìn)行過濾貌亭,以達(dá)到節(jié)省網(wǎng)路的傳輸提高效率
代碼
//繼承reduce類,實(shí)現(xiàn)reduce方法
public class WCCombiners extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
//define a counter
long counter = 0;
//loop
for(LongWritable l : values){
counter += l.get();
}
//write
context.write(key, new LongWritable(counter));
}
}
Partitioner編程的作用:
將mapper(如果使用了combiner的話就是combiner)輸出的key/value拆分為分片(shard)认臊,每個(gè)reducer對(duì)應(yīng)一個(gè)分片圃庭。默認(rèn)情況下,MR調(diào)用Hashpartitioner類,如果程序員編寫了自己的partition類剧腻,那么就使用自己編寫的partition編程進(jìn)行數(shù)據(jù)分拘央,以達(dá)到map階段的數(shù)據(jù)分區(qū)切片,從而防止reduce階段的數(shù)據(jù)傾斜問題书在,實(shí)現(xiàn)負(fù)載均衡灰伟。
hashpartition的算法(先計(jì)算key的散列值(通常為md5值)。然后通過reducer個(gè)數(shù)執(zhí)行取模運(yùn)算:key.hashCode%(reducer個(gè)數(shù))儒旬。這種方式不僅能夠隨機(jī)地將整個(gè)key空間平均分發(fā)給每個(gè)reducer,同時(shí)也能確保不同mapper產(chǎn)生的相同key能被分發(fā)到同一個(gè)reducer袱箱。)
目的
如果對(duì)數(shù)據(jù)的整體有很好的了解,可以使用自定義Partitioner來達(dá)到reducer的負(fù)載均衡义矛,提高效率发笔。
使用范圍
必須提前知道有多少個(gè)分區(qū)。一般設(shè)置的分區(qū)數(shù)量要比實(shí)際需要的分區(qū)數(shù)量大凉翻,否則會(huì)報(bào)錯(cuò)了讨,當(dāng)然最好相等。
注意
在自定義partitioner時(shí)一定要注意防止數(shù)據(jù)傾斜制轰。
代碼
//partitioner分區(qū)前计,繼承Partitioner復(fù)寫getPartition方法
public static class ProviderPartitioner extends Partitioner<Text, DataBean> {
private static Map<String,Integer> providermap = new HashMap<String,Integer>();
static{
/**
* 假如我們要把電話號(hào)碼用運(yùn)營(yíng)商來分開
* 1:聯(lián)通
* 2:電信
* 3:移動(dòng)
*
* 在真實(shí)項(xiàng)目中,這里可以看成查數(shù)據(jù)庫(kù)
*/
providermap.put("135", 1);
providermap.put("136", 1);
providermap.put("137", 1);
providermap.put("138", 1);
providermap.put("139", 1);
providermap.put("150", 2);
providermap.put("159", 2);
providermap.put("182", 3);
providermap.put("183", 3);
}
//Partitioner編程的輸入?yún)?shù)是map的輸出垃杖,因?yàn)樗趍ap與reduce之間
@Override
public int getPartition(Text key, DataBean value, int numPartitions) {
String account = key.toString();
String sub_acc = account.substring(0,3);
Integer code = providermap.get(sub_acc);
//如果不是三家運(yùn)營(yíng)商男杈,則code設(shè)置為0 表示其他
if(code == null){
code = 0;
}
return code;
}
}