3.1 MapReduce工作流程
1)流程示意圖
2)流程詳解
上面的流程是整個mapreduce最全工作流程匠题,但是shuffle過程只是從第7步開始到第16步結束啊央,具體shuffle過程詳解眶诈,如下:
1)maptask收集我們的map()方法輸出的kv對涨醋,放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程中逝撬,及合并的過程中东帅,都要調(diào)用partitioner進行分區(qū)和針對key進行排序
5)reducetask根據(jù)自己的分區(qū)號,去各個maptask機器上取相應的結果分區(qū)數(shù)據(jù)
6)reducetask會取到同一個分區(qū)的來自不同maptask的結果文件球拦,reducetask會將這些文件再進行合并(歸并排序)
7)合并成大文件后靠闭,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group坎炼,調(diào)用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率愧膀,原則上說,緩沖區(qū)越大谣光,磁盤io的次數(shù)越少檩淋,執(zhí)行速度就越快。
緩沖區(qū)的大小可以通過參數(shù)調(diào)整萄金,參數(shù):io.sort.mb ?默認100M蟀悦。
3.2 InputFormat數(shù)據(jù)輸入
3.2.1 Job提交流程和切片源碼詳解
1)job提交流程源碼詳解
waitForCompletion()
submit();
// 1建立連接
connect();
// 1)創(chuàng)建提交job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn還是遠程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid ,并創(chuàng)建job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片氧敢,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫xml配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2)FileInputFormat源碼解析(input.getSplits(job))
(1)找到你數(shù)據(jù)存儲的目錄日戈。
(2)開始遍歷處理(規(guī)劃切片)目錄下的每一個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默認情況下,切片大小=blocksize
d)開始切孙乖,形成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M(每次切片時浙炼,都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)
e)將切片信息寫到一個切片規(guī)劃文件中
f)整個切片的核心過程在getSplit()方法中完成唯袄。
g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進行分片弯屈,并不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數(shù)據(jù)信息恋拷,比如起始位置资厉、長度以及所在的節(jié)點列表等。
h)注意:block是HDFS物理上存儲的數(shù)據(jù)蔬顾,切片是對數(shù)據(jù)邏輯上的劃分宴偿。
(4)提交切片規(guī)劃文件到y(tǒng)arn上,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計算開啟maptask個數(shù)阎抒。
3.2.2 FileInputFormat切片機制
1)FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內(nèi)容長度進行切片
(2)切片大小酪我,默認等于block大小
(3)切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片
比如待處理數(shù)據(jù)有兩個文件:
file1.txt ???320M
file2.txt ???10M
經(jīng)過FileInputFormat的切片機制運算后且叁,形成的切片信息如下:??
file1.txt.split1-- ?0~128
file1.txt.split2-- ?128~256
file1.txt.split3-- ?256~320
file2.txt.split1-- ?0~10M
2)FileInputFormat切片大小的參數(shù)配置
通過分析源碼都哭,在FileInputFormat的279行中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
因此,默認情況下欺矫,切片大小=blocksize纱新。
maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會讓切片變小穆趴,而且就等于配置的這個參數(shù)的值脸爱。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大未妹。
3)獲取切片信息API
// 根據(jù)文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();
3.2.3 CombineTextInputFormat切片機制
關于大量小文件的優(yōu)化策略
1)默認情況下TextInputformat對任務的切片機制是按文件規(guī)劃切片簿废,不管文件多小,都會是一個單獨的切片络它,都會交給一個maptask族檬,這樣如果有大量小文件,就會產(chǎn)生大量的maptask化戳,處理效率極其低下单料。
2)優(yōu)化策略
(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預處理/采集)点楼,將小文件先合并成大文件扫尖,再上傳到HDFS做后續(xù)分析。
(2)補救措施:如果已經(jīng)是大量小文件在HDFS中了掠廓,可以使用另一種InputFormat來做切片(CombineTextInputFormat)换怖,它的切片邏輯跟TextInputFormat不同:它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣却盘,多個小文件就可以交給一個maptask狰域。
(3)優(yōu)先滿足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現(xiàn)步驟
// ?如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
4)案例實操
3.2.4 InputFormat接口實現(xiàn)類
MapReduce任務的輸入文件一般是存儲在HDFS里面黄橘。輸入的文件格式包括:基于行的日志文件、二進制格式文件等屈溉。這些文件一般會很大塞关,達到數(shù)十GB,甚至更大子巾。那么MapReduce是如何讀取這些數(shù)據(jù)的呢帆赢?下面我們首先學習InputFormat接口。
InputFormat常見的接口實現(xiàn)類包括:TextInputFormat线梗、KeyValueTextInputFormat椰于、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等仪搔。
1)TextInputFormat
TextInputFormat是默認的InputFormat瘾婿。每條記錄是一行輸入。鍵K是LongWritable類型,存儲該行在整個文件中的字節(jié)偏移量偏陪。值是這行的內(nèi)容抢呆,不包括任何行終止符(換行符和回車符)。
以下是一個示例笛谦,比如抱虐,一個分片包含了如下4條文本記錄。
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
每條記錄表示為以下鍵/值對:
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(75,From the real demand for more close to the enterprise)
很明顯饥脑,鍵并不是行號恳邀。一般情況下,很難取得行號灶轰,因為文件按字節(jié)而不是按行切分為分片谣沸,計算方式:字符+空格+1個偏移位=數(shù)
2)KeyValueTextInputFormat
每一行均為一條記錄,被分隔符分割為key框往,value鳄抒。可以通過在驅(qū)動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設定分隔符椰弊。默認分隔符是tab(\t)许溅。
job.setInputFormatClass(KeyValueTextInputFormat.class);
以下是一個示例,輸入是一個包含4條記錄的分片秉版。其中——>表示一個(水平方向的)制表符贤重。
line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise
每條記錄表示為以下鍵/值對:
(Rich,learning form)
(Intelligent清焕,learning engine)
(Learning并蝗,more convenient)
(From the,real demand for more close to the enterprise)
?此時的鍵是每行排在制表符之前的Text序列秸妥。
?3)NLineInputFormat
如果使用NlineInputFormat滚停,代表每個map進程處理的InputSplit不再按block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來劃分粥惧。即輸入文件的總行數(shù)/N=切片數(shù)(20)键畴,如果不整除,切片數(shù)=商+1突雪。
以下是一個示例起惕,仍然以上面的4行輸入為例。
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise
?例如咏删,如果N是2惹想,則每個輸入分片包含兩行。開啟2個maptask督函。
(0,Rich learning form)
(19,Intelligent learning engine)
另一個?mapper 則收到后兩行:
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
????????這里的鍵和值與TextInputFormat生成的一樣嘀粱。
3.2.5 自定義InputFormat
1)概述
(1)自定義一個類繼承FileInputFormat激挪。
(2)改寫RecordReader,實現(xiàn)一次讀取一個完整文件封裝為KV草穆。
(3)在輸出時使用SequenceFileOutPutFormat輸出合并文件灌灾。
2)案例實操
詳見7.4小文件處理(自定義InputFormat)。
3.3 MapTask工作機制
3.3.1 并行度決定機制
1)問題引出
maptask的并行度決定map階段的任務處理并發(fā)度悲柱,進而影響到整個job的處理速度锋喜。那么,mapTask并行任務是否越多越好呢豌鸡?
2)MapTask并行度決定機制
一個job的map階段MapTask并行度(個數(shù))嘿般,由客戶端提交job時的切片個數(shù)決定。
3.3.2 MapTask工作機制
(1)Read階段:Map Task通過用戶編寫的RecordReader涯冠,從輸入InputSplit中解析出一個個key/value炉奴。
(2)Map階段:該節(jié)點主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value蛇更。
(3)Collect收集階段:在用戶編寫map()函數(shù)中瞻赶,當數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結果派任。在該函數(shù)內(nèi)部砸逊,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中掌逛。
(4)Spill階段:即“溢寫”师逸,當環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上豆混,生成一個臨時文件篓像。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前皿伺,先要對數(shù)據(jù)進行一次本地排序员辩,并在必要時對數(shù)據(jù)進行合并、壓縮等操作鸵鸥。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進行排序屈暗,排序方式是,先按照分區(qū)編號partition進行排序脂男,然后按照key進行排序。這樣种呐,經(jīng)過排序后宰翅,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序爽室。
步驟2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數(shù))中汁讼。如果用戶設置了Combiner裂明,則寫入文件之前沈撞,對每個分區(qū)中的數(shù)據(jù)進行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結構SpillRecord中,其中每個分區(qū)的元信息包括在臨時文件中的偏移量惦费、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當前內(nèi)存索引大小超過1MB隘世,則將內(nèi)存索引寫到文件output/spillN.out.index中栈雳。
(5)Combine階段:當所有數(shù)據(jù)處理完成后,MapTask對所有臨時文件進行一次合并蝉娜,以確保最終只會生成一個數(shù)據(jù)文件唱较。
當所有數(shù)據(jù)處理完后,MapTask會將所有臨時文件合并成一個大文件召川,并保存到文件output/file.out中南缓,同時生成相應的索引文件output/file.out.index。
在進行文件合并過程中荧呐,MapTask以分區(qū)為單位進行合并汉形。對于某個分區(qū),它將采用多輪遞歸合并的方式倍阐。每輪合并io.sort.factor(默認100)個文件概疆,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后收捣,重復以上過程届案,直到最終得到一個大文件。
讓每個MapTask最終只生成一個數(shù)據(jù)文件罢艾,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機讀取帶來的開銷楣颠。
3.4 Shuffle機制
3.4.1 Shuffle機制
Mapreduce確保每個reducer的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle咐蚯。
3.4.2 Partition分區(qū)
0)問題引出:要求將統(tǒng)計結果按照條件輸出到不同文件中(分區(qū))童漩。比如:將統(tǒng)計結果按照手機歸屬地不同省份輸出到不同文件中(分區(qū))
1)默認partition分區(qū)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
??public int getPartition(K key, V value, int numReduceTasks) {
????return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
??}
}
默認分區(qū)是根據(jù)key的hashCode對reduceTasks個數(shù)取模得到的。用戶沒法控制哪個key存儲到哪個分區(qū)春锋。
&是為了不出現(xiàn)負數(shù)矫膨,也可以用??hashcode()%num + num來代替,但是位運算性能比較高期奔。
2)自定義Partitioner步驟
(1)自定義類繼承Partitioner侧馅,重寫getPartition()方法
public?class?ProvincePartitioner extends?Partitioner<Text, FlowBean> {
@Override
public?int?getPartition(Text key, FlowBean value, int?numPartitions) {
// 1 獲取電話號碼的前三位
String preNum = key.toString().substring(0, 3);
partition = 4;
// 2 判斷是哪個省
if?("136".equals(preNum)) {
partition = 0;
}else?if?("137".equals(preNum)) {
partition = 1;
}else?if?("138".equals(preNum)) {
partition = 2;
}else?if?("139".equals(preNum)) {
partition = 3;
}
return?partition;
}
}
(2)在job驅(qū)動中,設置自定義partitioner:?
job.setPartitionerClass(CustomPartitioner.class);
(3)自定義partition后呐萌,要根據(jù)自定義partitioner的邏輯設置相應數(shù)量的reduce task
job.setNumReduceTasks(5);
3)注意:
如果reduceTask的數(shù)量> getPartition的結果數(shù)馁痴,則會多產(chǎn)生幾個空的輸出文件part-r-000xx;
如果1<reduceTask的數(shù)量<getPartition的結果數(shù)肺孤,則有一部分分區(qū)數(shù)據(jù)無處安放罗晕,會Exception济欢;
如果reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件小渊,最終結果都交給這一個reduceTask法褥,最終也就只會產(chǎn)生一個結果文件?part-r-00000;
例如:假設自定義分區(qū)數(shù)為5酬屉,則
(1)job.setNumReduceTasks(1);會正常運行半等,只不過會產(chǎn)生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大于5,程序會正常運行梆惯,會產(chǎn)生空文件
4)案例實操
詳見7.2.2 需求2:將統(tǒng)計結果按照手機歸屬地不同省份輸出到不同文件中(Partitioner)
詳見7.1.2 需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)
3.4.3 WritableComparable排序
排序是MapReduce框架中最重要的操作之一酱鸭。Map Task和Reduce Task均會對數(shù)據(jù)(按照key)進行排序。該操作屬于Hadoop的默認行為垛吗。任何應用程序中的數(shù)據(jù)均會被排序凹髓,而不管邏輯上是否需要。默認排序是按照字典順序排序怯屉,且實現(xiàn)該排序的方法是快速排序蔚舀。
對于Map Task,它會將處理的結果暫時放到一個緩沖區(qū)中锨络,當緩沖區(qū)使用率達到一定閾值后赌躺,再對緩沖區(qū)中的數(shù)據(jù)進行一次排序,并將這些有序數(shù)據(jù)寫到磁盤上羡儿,而當數(shù)據(jù)處理完畢后礼患,它會對磁盤上所有文件進行一次合并,以將這些文件合并成一個大的有序文件掠归。
對于Reduce Task缅叠,它從每個Map Task上遠程拷貝相應的數(shù)據(jù)文件,如果文件大小超過一定閾值虏冻,則放到磁盤上肤粱,否則放到內(nèi)存中。如果磁盤上文件數(shù)目達到一定閾值厨相,則進行一次合并以生成一個更大文件领曼;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進行一次合并后將數(shù)據(jù)寫到磁盤上蛮穿。當所有數(shù)據(jù)拷貝完畢后庶骄,Reduce Task統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次合并。
每個階段的默認排序
1)排序的分類:
(1)部分排序:
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序践磅。保證輸出的每個文件內(nèi)部排序瓢姻。
(2)全排序:
如何用Hadoop產(chǎn)生一個全局排序的文件?最簡單的方法是使用一個分區(qū)音诈。但該方法在處理大型文件時效率極低幻碱,因為一臺機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構细溅。
替代方案:首先創(chuàng)建一系列排好序的文件褥傍;其次,串聯(lián)這些文件喇聊;最后恍风,生成一個全局排序的文件。主要思路是使用一個分區(qū)來描述輸出的全局排序誓篱。例如:可以為上述文件創(chuàng)建3個分區(qū)朋贬,在第一分區(qū)中,記錄的單詞首字母a-g窜骄,第二分區(qū)記錄單詞首字母h-n, 第三分區(qū)記錄單詞首字母o-z锦募。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值并沒有被排序邻遏。甚至在不同的執(zhí)行輪次中糠亩,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同准验。一般來說赎线,大多數(shù)MapReduce程序會避免讓reduce函數(shù)依賴于值的排序。但是糊饱,有時也需要通過特定的方法對鍵進行排序和分組等以實現(xiàn)對值的排序垂寥。
(4)二次排序:
在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序另锋。
2)自定義排序WritableComparable
(1)原理分析
bean對象實現(xiàn)WritableComparable接口重寫compareTo方法滞项,就可以實現(xiàn)排序
@Override
public?int?compareTo(FlowBean o) {
// 倒序排列,從大到小
return?this.sumFlow > o.getSumFlow() ? -1 : 1;
}
(2)案例實操
詳見7.2.3 需求3:將統(tǒng)計結果按照總流量倒序排序(排序)
詳見7.2.4 需求4:不同省份輸出文件內(nèi)部排序(部分排序)
3.4.4 GroupingComparator分組(輔助排序)
1)對reduce階段的數(shù)據(jù)根據(jù)某一個或幾個字段進行分組砰蠢。
2)案例實操
詳見7.3 求出每一個訂單中最貴的商品(GroupingComparator)
3.4.5 合并
0)在分布式的架構中蓖扑,分布式文件系統(tǒng)HDFS,和分布式運算程序編程框架mapreduce台舱。
HDFS:不怕大文件律杠,怕很多小文件
mapreduce :怕數(shù)據(jù)傾斜
那么mapreduce是如果解決多個小文件的問題呢?
mapreduce關于大量小文件的優(yōu)化策略
(1)?默認情況下竞惋,TextInputFormat對任務的切片機制是按照文件規(guī)劃切片柜去,不管有多少個小文件,都會是單獨的切片拆宛,都會交給一個maptask嗓奢,這樣,如果有大量的小文件
就會產(chǎn)生大量的maptask浑厚,處理效率極端底下
(2)優(yōu)化策略
最好的方法:在數(shù)據(jù)處理的最前端(預處理股耽、采集)根盒,就將小文件合并成大文件,在上傳到HDFS做后續(xù)的分析
補救措施:如果已經(jīng)是大量的小文件在HDFS中了物蝙,可以使用另一種inputformat來做切片(CombineFileInputformat)炎滞,它的切片邏輯跟TextInputformat
注:CombineTextInputFormat是CombineFileInputformat的子類
不同:
它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣诬乞,多個小文件就可以交給一個maptask了
//如果不設置InputFormat册赛,它默認的用的是TextInputFormat.class
/*CombineTextInputFormat為系統(tǒng)自帶的組件類
?* setMinInputSplitSize 中的2048是表示n個小文件之和不能小于2048
?* setMaxInputSplitSize 中的4096當滿足setMinInputSplitSize中的2048情況下 ?在滿足n+1個小文件之和不能大于4096
?*/
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMinInputSplitSize(job, 2048);
CombineTextInputFormat.setMaxInputSplitSize(job, 4096);
1)輸入數(shù)據(jù):準備5個小文件
2)實現(xiàn)過程
(1)不做任何處理,運行需求1中的wordcount程序震嫉,觀察切片個數(shù)為5
(2)在WordcountDriver中增加如下代碼森瘪,運行程序,并觀察運行的切片個數(shù)為1
// 如果不設置InputFormat票堵,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m
注:在看number of splits時扼睬,和最大值(MaxSplitSize)有關、總體規(guī)律就是和低于最大值是一片换衬、高于最大值1.5倍+痰驱,則為兩片;高于最大值2倍以上則向下取整瞳浦,比如文件大小65MB担映,切片最大值為4MB,那么切片為16個.總體來說,切片差值不超過1個叫潦,不影響整體性能
6)自定義Combiner實現(xiàn)步驟:
(1)自定義一個combiner繼承Reducer蝇完,重寫reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
????????// 1 匯總操作
int count = 0;
for(IntWritable v :values){
count = v.get();
}
????????// 2 寫出
context.write(key, new IntWritable(count));
}
}
(2)在job驅(qū)動類中設置:??
job.setCombinerClass(WordcountCombiner.class);
3.5 ReduceTask工作機制
1)設置ReduceTask并行度(個數(shù))
reducetask的并行度同樣影響整個job的執(zhí)行并發(fā)度和執(zhí)行效率,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同矗蕊,Reducetask數(shù)量的決定是可以直接手動設置:
//默認值是1短蜕,手動設置為4
job.setNumReduceTasks(4);
2)注意
(1)reducetask=0 ,表示沒有reduce階段傻咖,輸出文件個數(shù)和map個數(shù)一致朋魔。
(2)reducetask默認值就是1,所以輸出文件個數(shù)為一個卿操。
(3)如果數(shù)據(jù)分布不均勻警检,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)reducetask數(shù)量并不是任意設置,還要考慮業(yè)務邏輯需求害淤,有些情況下扇雕,需要計算全局匯總結果,就只能有1個reducetask窥摄。
(5)具體多少個reducetask镶奉,需要根據(jù)集群性能而定。
(6)如果分區(qū)數(shù)不是1,但是reducetask為1哨苛,是否執(zhí)行分區(qū)過程鸽凶。答案是:不執(zhí)行分區(qū)過程。因為在maptask的源碼中移国,執(zhí)行分區(qū)的前提是先判斷reduceNum個數(shù)是否大于1吱瘩。不大于1肯定不執(zhí)行。
3)實驗:測試reducetask多少合適迹缀。
(1)實驗環(huán)境:1個master節(jié)點,16個slave節(jié)點:CPU:8GHZ蜜徽,內(nèi)存:?2G
(2)實驗結論:
表1 改變reduce task (數(shù)據(jù)量為1GB)
Map task =16
Reduce task151015162025304560
總時間8921461109288100128101145104
4)ReduceTask工作機制
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù)祝懂,并針對某一片數(shù)據(jù),如果其大小超過一定閾值拘鞋,則寫到磁盤上砚蓬,否則直接放到內(nèi)存中。
(2)Merge階段:在遠程拷貝數(shù)據(jù)的同時盆色,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并灰蛙,以防止內(nèi)存使用過多或磁盤上文件過多。
(3)Sort階段:按照MapReduce語義隔躲,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)摩梧。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略宣旱。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結果進行了局部排序仅父,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可浑吟。
(4)Reduce階段:reduce()函數(shù)將計算結果寫到HDFS上笙纤。
3.6 OutputFormat數(shù)據(jù)輸出
3.6.1 OutputFormat接口實現(xiàn)類
?OutputFormat是MapReduce輸出的基類,所有實現(xiàn)MapReduce輸出都實現(xiàn)了?OutputFormat接口组力。下面我們介紹幾種常見的OutputFormat實現(xiàn)類省容。
1)文本輸出TextOutputFormat
????????默認的輸出格式是TextOutputFormat,它把每條記錄寫為文本行燎字。它的鍵和值可以是任意類型腥椒,因為TextOutputFormat調(diào)用toString()方法把它們轉(zhuǎn)換為字符串。
2)SequenceFileOutputFormat
?SequenceFileOutputFormat將它的輸出寫為一個順序文件轩触。如果輸出需要作為后續(xù)?MapReduce任務的輸入寞酿,這便是一種好的輸出格式,因為它的格式緊湊脱柱,很容易被壓縮伐弹。
3)自定義OutputFormat
根據(jù)用戶需求,自定義實現(xiàn)輸出。
3.6.2 自定義OutputFormat
為了實現(xiàn)控制最終文件的輸出路徑惨好,可以自定義OutputFormat煌茴。
要在一個mapreduce程序中根據(jù)數(shù)據(jù)的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現(xiàn)日川。
1)自定義OutputFormat步驟
(1)自定義一個類繼承FileOutputFormat蔓腐。
(2)改寫recordwriter,具體改寫輸出數(shù)據(jù)的方法write()龄句。
2)實操案例:
詳見7.5 修改日志內(nèi)容及自定義日志輸出路徑(自定義OutputFormat)回论。
3.7 Join多種應用
3.7.1 Map join(Distributedcache分布式緩存)
1)使用場景:一張表十分小、一張表很大分歇。
2)解決方案
在map端緩存多張表傀蓉,提前處理業(yè)務邏輯,這樣增加map端業(yè)務职抡,減少reduce端數(shù)據(jù)的壓力葬燎,盡可能的減少數(shù)據(jù)傾斜。
3)具體辦法:采用distributedcache
(1)在mapper的setup階段缚甩,將文件讀取到緩存集合中谱净。
(2)在驅(qū)動函數(shù)中加載緩存。
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節(jié)點
4)實操案例:
詳見7.6.2需求2:map端表合并(Distributedcache)
3.7.2 Reduce join
1)原理:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區(qū)別不同來源的記錄擅威。然后用連接字段作為key壕探,其余部分和新加的標志作為value,最后進行輸出裕寨。
Reduce端的主要工作:在reduce端以連接字段作為key的分組已經(jīng)完成浩蓉,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在map階段已經(jīng)打標志)分開,最后進行合并就ok了宾袜。
2)該方法的缺點
這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現(xiàn)大量的數(shù)據(jù)傳輸捻艳,效率很低。
3)案例實操
詳見7.6.1 需求1:reduce端表合并(數(shù)據(jù)傾斜)
3.8 數(shù)據(jù)清洗(ETL)
1)概述
在運行核心業(yè)務Mapreduce程序之前庆猫,往往要先對數(shù)據(jù)進行清洗认轨,清理掉不符合用戶要求的數(shù)據(jù)。清理的過程往往只需要運行mapper程序月培,不需要運行reduce程序嘁字。
2)實操案例
詳見7.7 日志清洗(數(shù)據(jù)清洗)。
3.9 計數(shù)器應用
Hadoop為每個作業(yè)維護若干內(nèi)置計數(shù)器杉畜,以描述多項指標纪蜒。例如,某些計數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù)此叠,使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量纯续。
1)API
(1)采用枚舉的方式統(tǒng)計計數(shù)
enum MyCounter{MALFORORMED,NORMAL}
//對枚舉定義的自定義計數(shù)器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)采用計數(shù)器組、計數(shù)器名稱的方式統(tǒng)計
context.getCounter("counterGroup", "countera").increment(1);
組名和計數(shù)器名稱隨便起,但最好有意義猬错。
(3)計數(shù)結果在程序運行后的控制臺上查看窗看。
2)案例實操
詳見7.7 日志清洗(數(shù)據(jù)清洗)。
3.10 MapReduce開發(fā)總結
在編寫mapreduce程序時倦炒,需要考慮的幾個方面:
1)輸入數(shù)據(jù)接口:InputFormat
???默認使用的實現(xiàn)類是:TextInputFormat
???TextInputFormat的功能邏輯是:一次讀一行文本显沈,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回逢唤。
KeyValueTextInputFormat每一行均為一條記錄拉讯,被分隔符分割為key,value鳖藕。默認分隔符是tab(\t)遂唧。
NlineInputFormat按照指定的行數(shù)N來劃分切片。
CombineTextInputFormat可以把多個小文件合并成一個切片處理吊奢,提高處理效率。
用戶還可以自定義InputFormat纹烹。
2)邏輯處理接口:Mapper ?
???用戶根據(jù)業(yè)務需求實現(xiàn)其中三個方法:map() ??setup() ??cleanup ()
3)Partitioner分區(qū)
有默認實現(xiàn)?HashPartitioner页滚,邏輯是根據(jù)key的哈希值和numReduces來返回一個分區(qū)號;key.hashCode()&Integer.MAXVALUE % numReduces
如果業(yè)務上有特別的需求铺呵,可以自定義分區(qū)裹驰。
4)Comparable排序
當我們用自定義的對象作為key來輸出時,就必須要實現(xiàn)WritableComparable接口片挂,重寫其中的compareTo()方法幻林。
部分排序:對最終輸出的每一個文件進行內(nèi)部排序。
全排序:對所有數(shù)據(jù)進行排序音念,通常只有一個Reduce沪饺。
二次排序:排序的條件有兩個。
5)Combiner合并
Combiner合并可以提高程序執(zhí)行效率闷愤,減少io傳輸整葡。但是使用時必須不能影響原有的業(yè)務處理結果。
6)reduce端分組:Groupingcomparator
reduceTask拿到輸入數(shù)據(jù)(一個partition的所有數(shù)據(jù))后讥脐,首先需要對數(shù)據(jù)進行分組遭居,其分組的默認原則是key相同,然后對每一組kv數(shù)據(jù)調(diào)用一次reduce()方法旬渠,并且將這一組kv中的第一個kv的key作為參數(shù)傳給reduce的key俱萍,將這一組數(shù)據(jù)的value的迭代器傳給reduce()的values參數(shù)。
利用上述這個機制告丢,我們可以實現(xiàn)一個高效的分組取最大值的邏輯枪蘑。
自定義一個bean對象用來封裝我們的數(shù)據(jù),然后改寫其compareTo方法產(chǎn)生倒序排序的效果。然后自定義一個Groupingcomparator腥寇,將bean對象的分組邏輯改成按照我們的業(yè)務分組id來分組(比如訂單號)成翩。這樣,我們要取的最大值就是reduce()方法中傳進來key赦役。
7)邏輯處理接口:Reducer
用戶根據(jù)業(yè)務需求實現(xiàn)其中三個方法:reduce() ??setup() ??cleanup ()
8)輸出數(shù)據(jù)接口:OutputFormat
默認實現(xiàn)類是TextOutputFormat麻敌,功能邏輯是:將每一個KV對向目標文本文件中輸出為一行。
?SequenceFileOutputFormat將它的輸出寫為一個順序文件掂摔。如果輸出需要作為后續(xù)?MapReduce任務的輸入术羔,這便是一種好的輸出格式,因為它的格式緊湊乙漓,很容易被壓縮级历。
用戶還可以自定義OutputFormat。