MapReduce框架原理

3.1 MapReduce工作流程

1)流程示意圖

2)流程詳解

上面的流程是整個mapreduce最全工作流程匠题,但是shuffle過程只是從第7步開始到第16步結束啊央,具體shuffle過程詳解眶诈,如下:

1)maptask收集我們的map()方法輸出的kv對涨醋,放到內(nèi)存緩沖區(qū)中

2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合并成大的溢出文件

4)在溢出過程中逝撬,及合并的過程中东帅,都要調(diào)用partitioner進行分區(qū)和針對key進行排序

5)reducetask根據(jù)自己的分區(qū)號,去各個maptask機器上取相應的結果分區(qū)數(shù)據(jù)

6)reducetask會取到同一個分區(qū)的來自不同maptask的結果文件球拦,reducetask會將這些文件再進行合并(歸并排序)

7)合并成大文件后靠闭,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group坎炼,調(diào)用用戶自定義的reduce()方法)

3)注意

Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率愧膀,原則上說,緩沖區(qū)越大谣光,磁盤io的次數(shù)越少檩淋,執(zhí)行速度就越快。

緩沖區(qū)的大小可以通過參數(shù)調(diào)整萄金,參數(shù):io.sort.mb ?默認100M蟀悦。

3.2 InputFormat數(shù)據(jù)輸入

3.2.1 Job提交流程和切片源碼詳解

1)job提交流程源碼詳解


waitForCompletion()

submit();

// 1建立連接

connect();

// 1)創(chuàng)建提交job的代理

new Cluster(getConfiguration());

// (1)判斷是本地yarn還是遠程

initialize(jobTrackAddr, conf);

// 2 提交job

submitter.submitJobInternal(Job.this, cluster)

// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)獲取jobid ,并創(chuàng)建job路徑

JobID jobId = submitClient.getNewJobID();

// 3)拷貝jar包到集群

copyAndConfigureFiles(job, submitJobDir);

rUploader.uploadFiles(job, jobSubmitDir);

// 4)計算切片氧敢,生成切片規(guī)劃文件

writeSplits(job, submitJobDir);

maps = writeNewSplits(job, jobSubmitDir);

input.getSplits(job);

// 5)向Stag路徑寫xml配置文件

writeConf(conf, submitJobFile);

conf.writeXml(out);

// 6)提交job,返回提交狀態(tài)

status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2)FileInputFormat源碼解析(input.getSplits(job))

(1)找到你數(shù)據(jù)存儲的目錄日戈。

(2)開始遍歷處理(規(guī)劃切片)目錄下的每一個文件

(3)遍歷第一個文件ss.txt

a)獲取文件大小fs.sizeOf(ss.txt);

b)計算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

c)默認情況下,切片大小=blocksize

d)開始切孙乖,形成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M(每次切片時浙炼,都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)

e)將切片信息寫到一個切片規(guī)劃文件中

f)整個切片的核心過程在getSplit()方法中完成唯袄。

g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進行分片弯屈,并不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數(shù)據(jù)信息恋拷,比如起始位置资厉、長度以及所在的節(jié)點列表等。

h)注意:block是HDFS物理上存儲的數(shù)據(jù)蔬顾,切片是對數(shù)據(jù)邏輯上的劃分宴偿。

(4)提交切片規(guī)劃文件到y(tǒng)arn上,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計算開啟maptask個數(shù)阎抒。

3.2.2 FileInputFormat切片機制

1)FileInputFormat中默認的切片機制:

(1)簡單地按照文件的內(nèi)容長度進行切片

(2)切片大小酪我,默認等于block大小

(3)切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片

比如待處理數(shù)據(jù)有兩個文件:

file1.txt ???320M

file2.txt ???10M

經(jīng)過FileInputFormat的切片機制運算后且叁,形成的切片信息如下:??

file1.txt.split1-- ?0~128

file1.txt.split2-- ?128~256

file1.txt.split3-- ?256~320

file2.txt.split1-- ?0~10M

2)FileInputFormat切片大小的參數(shù)配置

通過分析源碼都哭,在FileInputFormat的279行中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));

切片主要由這幾個值來運算決定

mapreduce.input.fileinputformat.split.minsize=1 默認值為1

mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue

因此,默認情況下欺矫,切片大小=blocksize纱新。

maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會讓切片變小穆趴,而且就等于配置的這個參數(shù)的值脸爱。

minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大未妹。

3)獲取切片信息API

// 根據(jù)文件類型獲取切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();

// 獲取切片的文件名稱

String name = inputSplit.getPath().getName();

3.2.3 CombineTextInputFormat切片機制

關于大量小文件的優(yōu)化策略

1)默認情況下TextInputformat對任務的切片機制是按文件規(guī)劃切片簿废,不管文件多小,都會是一個單獨的切片络它,都會交給一個maptask族檬,這樣如果有大量小文件,就會產(chǎn)生大量的maptask化戳,處理效率極其低下单料。

2)優(yōu)化策略

(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預處理/采集)点楼,將小文件先合并成大文件扫尖,再上傳到HDFS做后續(xù)分析。

(2)補救措施:如果已經(jīng)是大量小文件在HDFS中了掠廓,可以使用另一種InputFormat來做切片(CombineTextInputFormat)换怖,它的切片邏輯跟TextInputFormat不同:它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣却盘,多個小文件就可以交給一個maptask狰域。

(3)優(yōu)先滿足最小切片大小,不超過最大切片大小

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具體實現(xiàn)步驟

// ?如果不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class)

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

4)案例實操

3.2.4 InputFormat接口實現(xiàn)類

MapReduce任務的輸入文件一般是存儲在HDFS里面黄橘。輸入的文件格式包括:基于行的日志文件、二進制格式文件等屈溉。這些文件一般會很大塞关,達到數(shù)十GB,甚至更大子巾。那么MapReduce是如何讀取這些數(shù)據(jù)的呢帆赢?下面我們首先學習InputFormat接口。

InputFormat常見的接口實現(xiàn)類包括:TextInputFormat线梗、KeyValueTextInputFormat椰于、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等仪搔。

1)TextInputFormat

TextInputFormat是默認的InputFormat瘾婿。每條記錄是一行輸入。鍵K是LongWritable類型,存儲該行在整個文件中的字節(jié)偏移量偏陪。值是這行的內(nèi)容抢呆,不包括任何行終止符(換行符和回車符)。

以下是一個示例笛谦,比如抱虐,一個分片包含了如下4條文本記錄。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對:

(0,Rich learning form)

(20,Intelligent learning engine)

(49,Learning more convenient)

(75,From the real demand for more close to the enterprise)

很明顯饥脑,鍵并不是行號恳邀。一般情況下,很難取得行號灶轰,因為文件按字節(jié)而不是按行切分為分片谣沸,計算方式:字符+空格+1個偏移位=數(shù)

2)KeyValueTextInputFormat

每一行均為一條記錄,被分隔符分割為key框往,value鳄抒。可以通過在驅(qū)動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設定分隔符椰弊。默認分隔符是tab(\t)许溅。

job.setInputFormatClass(KeyValueTextInputFormat.class);

以下是一個示例,輸入是一個包含4條記錄的分片秉版。其中——>表示一個(水平方向的)制表符贤重。

line1 ——>Rich learning form

line2 ——>Intelligent learning engine

line3 ——>Learning more convenient

line4 ——>From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對:

(Rich,learning form)

(Intelligent清焕,learning engine)

(Learning并蝗,more convenient)

(From the,real demand for more close to the enterprise)

?此時的鍵是每行排在制表符之前的Text序列秸妥。

?3)NLineInputFormat

如果使用NlineInputFormat滚停,代表每個map進程處理的InputSplit不再按block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來劃分粥惧。即輸入文件的總行數(shù)/N=切片數(shù)(20)键畴,如果不整除,切片數(shù)=商+1突雪。

以下是一個示例起惕,仍然以上面的4行輸入為例。

Rich learning form

Intelligent learning engine

Learning more convenient

From the real demand for more close to the enterprise

?例如咏删,如果N是2惹想,則每個輸入分片包含兩行。開啟2個maptask督函。

(0,Rich learning form)

(19,Intelligent learning engine)

另一個?mapper 則收到后兩行:

(47,Learning more convenient)

(72,From the real demand for more close to the enterprise)

????????這里的鍵和值與TextInputFormat生成的一樣嘀粱。

3.2.5 自定義InputFormat

1)概述

(1)自定義一個類繼承FileInputFormat激挪。

(2)改寫RecordReader,實現(xiàn)一次讀取一個完整文件封裝為KV草穆。

(3)在輸出時使用SequenceFileOutPutFormat輸出合并文件灌灾。

2)案例實操

詳見7.4小文件處理(自定義InputFormat)。

3.3 MapTask工作機制

3.3.1 并行度決定機制

1)問題引出

maptask的并行度決定map階段的任務處理并發(fā)度悲柱,進而影響到整個job的處理速度锋喜。那么,mapTask并行任務是否越多越好呢豌鸡?

2)MapTask并行度決定機制

一個job的map階段MapTask并行度(個數(shù))嘿般,由客戶端提交job時的切片個數(shù)決定。

3.3.2 MapTask工作機制

(1)Read階段:Map Task通過用戶編寫的RecordReader涯冠,從輸入InputSplit中解析出一個個key/value炉奴。

(2)Map階段:該節(jié)點主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value蛇更。

(3)Collect收集階段:在用戶編寫map()函數(shù)中瞻赶,當數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結果派任。在該函數(shù)內(nèi)部砸逊,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中掌逛。

(4)Spill階段:即“溢寫”师逸,當環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上豆混,生成一個臨時文件篓像。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前皿伺,先要對數(shù)據(jù)進行一次本地排序员辩,并在必要時對數(shù)據(jù)進行合并、壓縮等操作鸵鸥。

溢寫階段詳情:

步驟1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進行排序屈暗,排序方式是,先按照分區(qū)編號partition進行排序脂男,然后按照key進行排序。這樣种呐,經(jīng)過排序后宰翅,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序爽室。


步驟2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數(shù))中汁讼。如果用戶設置了Combiner裂明,則寫入文件之前沈撞,對每個分區(qū)中的數(shù)據(jù)進行一次聚集操作。

步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結構SpillRecord中,其中每個分區(qū)的元信息包括在臨時文件中的偏移量惦费、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當前內(nèi)存索引大小超過1MB隘世,則將內(nèi)存索引寫到文件output/spillN.out.index中栈雳。

(5)Combine階段:當所有數(shù)據(jù)處理完成后,MapTask對所有臨時文件進行一次合并蝉娜,以確保最終只會生成一個數(shù)據(jù)文件唱较。

當所有數(shù)據(jù)處理完后,MapTask會將所有臨時文件合并成一個大文件召川,并保存到文件output/file.out中南缓,同時生成相應的索引文件output/file.out.index。

在進行文件合并過程中荧呐,MapTask以分區(qū)為單位進行合并汉形。對于某個分區(qū),它將采用多輪遞歸合并的方式倍阐。每輪合并io.sort.factor(默認100)個文件概疆,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后收捣,重復以上過程届案,直到最終得到一個大文件。

讓每個MapTask最終只生成一個數(shù)據(jù)文件罢艾,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機讀取帶來的開銷楣颠。

3.4 Shuffle機制

3.4.1 Shuffle機制

Mapreduce確保每個reducer的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle咐蚯。

3.4.2 Partition分區(qū)

0)問題引出:要求將統(tǒng)計結果按照條件輸出到不同文件中(分區(qū))童漩。比如:將統(tǒng)計結果按照手機歸屬地不同省份輸出到不同文件中(分區(qū))

1)默認partition分區(qū)

public class HashPartitioner<K, V> extends Partitioner<K, V> {

??public int getPartition(K key, V value, int numReduceTasks) {

????return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

??}

}

默認分區(qū)是根據(jù)key的hashCode對reduceTasks個數(shù)取模得到的。用戶沒法控制哪個key存儲到哪個分區(qū)春锋。


&是為了不出現(xiàn)負數(shù)矫膨,也可以用??hashcode()%num + num來代替,但是位運算性能比較高期奔。

2)自定義Partitioner步驟

(1)自定義類繼承Partitioner侧馅,重寫getPartition()方法

public?class?ProvincePartitioner extends?Partitioner<Text, FlowBean> {


@Override

public?int?getPartition(Text key, FlowBean value, int?numPartitions) {


// 1 獲取電話號碼的前三位

String preNum = key.toString().substring(0, 3);

partition = 4;

// 2 判斷是哪個省

if?("136".equals(preNum)) {

partition = 0;

}else?if?("137".equals(preNum)) {

partition = 1;

}else?if?("138".equals(preNum)) {

partition = 2;

}else?if?("139".equals(preNum)) {

partition = 3;

}

return?partition;

}

}

(2)在job驅(qū)動中,設置自定義partitioner:?

job.setPartitionerClass(CustomPartitioner.class);

(3)自定義partition后呐萌,要根據(jù)自定義partitioner的邏輯設置相應數(shù)量的reduce task

job.setNumReduceTasks(5);

3)注意:

如果reduceTask的數(shù)量> getPartition的結果數(shù)馁痴,則會多產(chǎn)生幾個空的輸出文件part-r-000xx;

如果1<reduceTask的數(shù)量<getPartition的結果數(shù)肺孤,則有一部分分區(qū)數(shù)據(jù)無處安放罗晕,會Exception济欢;

如果reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件小渊,最終結果都交給這一個reduceTask法褥,最終也就只會產(chǎn)生一個結果文件?part-r-00000;

例如:假設自定義分區(qū)數(shù)為5酬屉,則

(1)job.setNumReduceTasks(1);會正常運行半等,只不過會產(chǎn)生一個輸出文件

(2)job.setNumReduceTasks(2);會報錯

(3)job.setNumReduceTasks(6);大于5,程序會正常運行梆惯,會產(chǎn)生空文件

4)案例實操

詳見7.2.2 需求2:將統(tǒng)計結果按照手機歸屬地不同省份輸出到不同文件中(Partitioner)

詳見7.1.2 需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)

3.4.3 WritableComparable排序

排序是MapReduce框架中最重要的操作之一酱鸭。Map Task和Reduce Task均會對數(shù)據(jù)(按照key)進行排序。該操作屬于Hadoop的默認行為垛吗。任何應用程序中的數(shù)據(jù)均會被排序凹髓,而不管邏輯上是否需要。默認排序是按照字典順序排序怯屉,且實現(xiàn)該排序的方法是快速排序蔚舀。

對于Map Task,它會將處理的結果暫時放到一個緩沖區(qū)中锨络,當緩沖區(qū)使用率達到一定閾值后赌躺,再對緩沖區(qū)中的數(shù)據(jù)進行一次排序,并將這些有序數(shù)據(jù)寫到磁盤上羡儿,而當數(shù)據(jù)處理完畢后礼患,它會對磁盤上所有文件進行一次合并,以將這些文件合并成一個大的有序文件掠归。

對于Reduce Task缅叠,它從每個Map Task上遠程拷貝相應的數(shù)據(jù)文件,如果文件大小超過一定閾值虏冻,則放到磁盤上肤粱,否則放到內(nèi)存中。如果磁盤上文件數(shù)目達到一定閾值厨相,則進行一次合并以生成一個更大文件领曼;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進行一次合并后將數(shù)據(jù)寫到磁盤上蛮穿。當所有數(shù)據(jù)拷貝完畢后庶骄,Reduce Task統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次合并。

每個階段的默認排序

1)排序的分類:

(1)部分排序:

MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序践磅。保證輸出的每個文件內(nèi)部排序瓢姻。

(2)全排序:

如何用Hadoop產(chǎn)生一個全局排序的文件?最簡單的方法是使用一個分區(qū)音诈。但該方法在處理大型文件時效率極低幻碱,因為一臺機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構细溅。

替代方案:首先創(chuàng)建一系列排好序的文件褥傍;其次,串聯(lián)這些文件喇聊;最后恍风,生成一個全局排序的文件。主要思路是使用一個分區(qū)來描述輸出的全局排序誓篱。例如:可以為上述文件創(chuàng)建3個分區(qū)朋贬,在第一分區(qū)中,記錄的單詞首字母a-g窜骄,第二分區(qū)記錄單詞首字母h-n, 第三分區(qū)記錄單詞首字母o-z锦募。

(3)輔助排序:(GroupingComparator分組)

Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值并沒有被排序邻遏。甚至在不同的執(zhí)行輪次中糠亩,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同准验。一般來說赎线,大多數(shù)MapReduce程序會避免讓reduce函數(shù)依賴于值的排序。但是糊饱,有時也需要通過特定的方法對鍵進行排序和分組等以實現(xiàn)對值的排序垂寥。

(4)二次排序:

在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序另锋。

2)自定義排序WritableComparable

(1)原理分析

bean對象實現(xiàn)WritableComparable接口重寫compareTo方法滞项,就可以實現(xiàn)排序

@Override

public?int?compareTo(FlowBean o) {

// 倒序排列,從大到小

return?this.sumFlow > o.getSumFlow() ? -1 : 1;

}

(2)案例實操

詳見7.2.3 需求3:將統(tǒng)計結果按照總流量倒序排序(排序)

詳見7.2.4 需求4:不同省份輸出文件內(nèi)部排序(部分排序)

3.4.4 GroupingComparator分組(輔助排序)

1)對reduce階段的數(shù)據(jù)根據(jù)某一個或幾個字段進行分組砰蠢。

2)案例實操

詳見7.3 求出每一個訂單中最貴的商品(GroupingComparator)

3.4.5 合并

0)在分布式的架構中蓖扑,分布式文件系統(tǒng)HDFS,和分布式運算程序編程框架mapreduce台舱。

HDFS:不怕大文件律杠,怕很多小文件

mapreduce :怕數(shù)據(jù)傾斜

那么mapreduce是如果解決多個小文件的問題呢?

mapreduce關于大量小文件的優(yōu)化策略

(1)?默認情況下竞惋,TextInputFormat對任務的切片機制是按照文件規(guī)劃切片柜去,不管有多少個小文件,都會是單獨的切片拆宛,都會交給一個maptask嗓奢,這樣,如果有大量的小文件

就會產(chǎn)生大量的maptask浑厚,處理效率極端底下

(2)優(yōu)化策略

最好的方法:在數(shù)據(jù)處理的最前端(預處理股耽、采集)根盒,就將小文件合并成大文件,在上傳到HDFS做后續(xù)的分析

補救措施:如果已經(jīng)是大量的小文件在HDFS中了物蝙,可以使用另一種inputformat來做切片(CombineFileInputformat)炎滞,它的切片邏輯跟TextInputformat

注:CombineTextInputFormat是CombineFileInputformat的子類

不同:

它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣诬乞,多個小文件就可以交給一個maptask了

//如果不設置InputFormat册赛,它默認的用的是TextInputFormat.class

/*CombineTextInputFormat為系統(tǒng)自帶的組件類

?* setMinInputSplitSize 中的2048是表示n個小文件之和不能小于2048

?* setMaxInputSplitSize 中的4096當滿足setMinInputSplitSize中的2048情況下 ?在滿足n+1個小文件之和不能大于4096

?*/

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMinInputSplitSize(job, 2048);

CombineTextInputFormat.setMaxInputSplitSize(job, 4096);

1)輸入數(shù)據(jù):準備5個小文件

2)實現(xiàn)過程

(1)不做任何處理,運行需求1中的wordcount程序震嫉,觀察切片個數(shù)為5

(2)在WordcountDriver中增加如下代碼森瘪,運行程序,并觀察運行的切片個數(shù)為1

// 如果不設置InputFormat票堵,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m

CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m


注:在看number of splits時扼睬,和最大值(MaxSplitSize)有關、總體規(guī)律就是和低于最大值是一片换衬、高于最大值1.5倍+痰驱,則為兩片;高于最大值2倍以上則向下取整瞳浦,比如文件大小65MB担映,切片最大值為4MB,那么切片為16個.總體來說,切片差值不超過1個叫潦,不影響整體性能


6)自定義Combiner實現(xiàn)步驟:

(1)自定義一個combiner繼承Reducer蝇完,重寫reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context) throws IOException, InterruptedException {

????????// 1 匯總操作

int count = 0;

for(IntWritable v :values){

count = v.get();

}

????????// 2 寫出

context.write(key, new IntWritable(count));

}

}

(2)在job驅(qū)動類中設置:??

job.setCombinerClass(WordcountCombiner.class);

3.5 ReduceTask工作機制

1)設置ReduceTask并行度(個數(shù))

reducetask的并行度同樣影響整個job的執(zhí)行并發(fā)度和執(zhí)行效率,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同矗蕊,Reducetask數(shù)量的決定是可以直接手動設置:

//默認值是1短蜕,手動設置為4

job.setNumReduceTasks(4);

2)注意

(1)reducetask=0 ,表示沒有reduce階段傻咖,輸出文件個數(shù)和map個數(shù)一致朋魔。

(2)reducetask默認值就是1,所以輸出文件個數(shù)為一個卿操。

(3)如果數(shù)據(jù)分布不均勻警检,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜

(4)reducetask數(shù)量并不是任意設置,還要考慮業(yè)務邏輯需求害淤,有些情況下扇雕,需要計算全局匯總結果,就只能有1個reducetask窥摄。

(5)具體多少個reducetask镶奉,需要根據(jù)集群性能而定。

(6)如果分區(qū)數(shù)不是1,但是reducetask為1哨苛,是否執(zhí)行分區(qū)過程鸽凶。答案是:不執(zhí)行分區(qū)過程。因為在maptask的源碼中移国,執(zhí)行分區(qū)的前提是先判斷reduceNum個數(shù)是否大于1吱瘩。不大于1肯定不執(zhí)行。

3)實驗:測試reducetask多少合適迹缀。

(1)實驗環(huán)境:1個master節(jié)點,16個slave節(jié)點:CPU:8GHZ蜜徽,內(nèi)存:?2G

(2)實驗結論:

表1 改變reduce task (數(shù)據(jù)量為1GB)

Map task =16

Reduce task151015162025304560

總時間8921461109288100128101145104

4)ReduceTask工作機制

(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數(shù)據(jù)祝懂,并針對某一片數(shù)據(jù),如果其大小超過一定閾值拘鞋,則寫到磁盤上砚蓬,否則直接放到內(nèi)存中。

(2)Merge階段:在遠程拷貝數(shù)據(jù)的同時盆色,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并灰蛙,以防止內(nèi)存使用過多或磁盤上文件過多。

(3)Sort階段:按照MapReduce語義隔躲,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)摩梧。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略宣旱。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結果進行了局部排序仅父,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可浑吟。

(4)Reduce階段:reduce()函數(shù)將計算結果寫到HDFS上笙纤。

3.6 OutputFormat數(shù)據(jù)輸出

3.6.1 OutputFormat接口實現(xiàn)類

?OutputFormat是MapReduce輸出的基類,所有實現(xiàn)MapReduce輸出都實現(xiàn)了?OutputFormat接口组力。下面我們介紹幾種常見的OutputFormat實現(xiàn)類省容。

1)文本輸出TextOutputFormat

????????默認的輸出格式是TextOutputFormat,它把每條記錄寫為文本行燎字。它的鍵和值可以是任意類型腥椒,因為TextOutputFormat調(diào)用toString()方法把它們轉(zhuǎn)換為字符串。

2)SequenceFileOutputFormat

?SequenceFileOutputFormat將它的輸出寫為一個順序文件轩触。如果輸出需要作為后續(xù)?MapReduce任務的輸入寞酿,這便是一種好的輸出格式,因為它的格式緊湊脱柱,很容易被壓縮伐弹。

3)自定義OutputFormat

根據(jù)用戶需求,自定義實現(xiàn)輸出。

3.6.2 自定義OutputFormat

為了實現(xiàn)控制最終文件的輸出路徑惨好,可以自定義OutputFormat煌茴。

要在一個mapreduce程序中根據(jù)數(shù)據(jù)的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現(xiàn)日川。

1)自定義OutputFormat步驟

(1)自定義一個類繼承FileOutputFormat蔓腐。

(2)改寫recordwriter,具體改寫輸出數(shù)據(jù)的方法write()龄句。

2)實操案例:

詳見7.5 修改日志內(nèi)容及自定義日志輸出路徑(自定義OutputFormat)回论。

3.7 Join多種應用

3.7.1 Map joinDistributedcache分布式緩存

1)使用場景:一張表十分小、一張表很大分歇。

2)解決方案

在map端緩存多張表傀蓉,提前處理業(yè)務邏輯,這樣增加map端業(yè)務职抡,減少reduce端數(shù)據(jù)的壓力葬燎,盡可能的減少數(shù)據(jù)傾斜。

3)具體辦法:采用distributedcache

(1)在mapper的setup階段缚甩,將文件讀取到緩存集合中谱净。

(2)在驅(qū)動函數(shù)中加載緩存。

job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節(jié)點

4)實操案例:

詳見7.6.2需求2:map端表合并(Distributedcache)

3.7.2 Reduce join

1)原理:

Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區(qū)別不同來源的記錄擅威。然后用連接字段作為key壕探,其余部分和新加的標志作為value,最后進行輸出裕寨。

Reduce端的主要工作:在reduce端以連接字段作為key的分組已經(jīng)完成浩蓉,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在map階段已經(jīng)打標志)分開,最后進行合并就ok了宾袜。

2)該方法的缺點

這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現(xiàn)大量的數(shù)據(jù)傳輸捻艳,效率很低。

3)案例實操

詳見7.6.1 需求1:reduce端表合并(數(shù)據(jù)傾斜)

3.8 數(shù)據(jù)清洗(ETL)

1)概述

在運行核心業(yè)務Mapreduce程序之前庆猫,往往要先對數(shù)據(jù)進行清洗认轨,清理掉不符合用戶要求的數(shù)據(jù)。清理的過程往往只需要運行mapper程序月培,不需要運行reduce程序嘁字。

2)實操案例

詳見7.7 日志清洗(數(shù)據(jù)清洗)。

3.9 計數(shù)器應用

Hadoop為每個作業(yè)維護若干內(nèi)置計數(shù)器杉畜,以描述多項指標纪蜒。例如,某些計數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù)此叠,使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量纯续。

1)API

(1)采用枚舉的方式統(tǒng)計計數(shù)

enum MyCounter{MALFORORMED,NORMAL}

//對枚舉定義的自定義計數(shù)器加1

context.getCounter(MyCounter.MALFORORMED).increment(1);

(2)采用計數(shù)器組、計數(shù)器名稱的方式統(tǒng)計

context.getCounter("counterGroup", "countera").increment(1);

組名和計數(shù)器名稱隨便起,但最好有意義猬错。

(3)計數(shù)結果在程序運行后的控制臺上查看窗看。

2)案例實操

詳見7.7 日志清洗(數(shù)據(jù)清洗)。

3.10 MapReduce開發(fā)總結

在編寫mapreduce程序時倦炒,需要考慮的幾個方面:

1)輸入數(shù)據(jù)接口:InputFormat

???默認使用的實現(xiàn)類是:TextInputFormat

???TextInputFormat的功能邏輯是:一次讀一行文本显沈,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回逢唤。

KeyValueTextInputFormat每一行均為一條記錄拉讯,被分隔符分割為key,value鳖藕。默認分隔符是tab(\t)遂唧。

NlineInputFormat按照指定的行數(shù)N來劃分切片。

CombineTextInputFormat可以把多個小文件合并成一個切片處理吊奢,提高處理效率。

用戶還可以自定義InputFormat纹烹。

2)邏輯處理接口:Mapper ?

???用戶根據(jù)業(yè)務需求實現(xiàn)其中三個方法:map() ??setup() ??cleanup ()

3)Partitioner分區(qū)

有默認實現(xiàn)?HashPartitioner页滚,邏輯是根據(jù)key的哈希值和numReduces來返回一個分區(qū)號;key.hashCode()&Integer.MAXVALUE % numReduces

如果業(yè)務上有特別的需求铺呵,可以自定義分區(qū)裹驰。

4)Comparable排序

當我們用自定義的對象作為key來輸出時,就必須要實現(xiàn)WritableComparable接口片挂,重寫其中的compareTo()方法幻林。

部分排序:對最終輸出的每一個文件進行內(nèi)部排序。

全排序:對所有數(shù)據(jù)進行排序音念,通常只有一個Reduce沪饺。

二次排序:排序的條件有兩個。

5)Combiner合并

Combiner合并可以提高程序執(zhí)行效率闷愤,減少io傳輸整葡。但是使用時必須不能影響原有的業(yè)務處理結果。

6)reduce端分組:Groupingcomparator

reduceTask拿到輸入數(shù)據(jù)(一個partition的所有數(shù)據(jù))后讥脐,首先需要對數(shù)據(jù)進行分組遭居,其分組的默認原則是key相同,然后對每一組kv數(shù)據(jù)調(diào)用一次reduce()方法旬渠,并且將這一組kv中的第一個kv的key作為參數(shù)傳給reduce的key俱萍,將這一組數(shù)據(jù)的value的迭代器傳給reduce()的values參數(shù)。

利用上述這個機制告丢,我們可以實現(xiàn)一個高效的分組取最大值的邏輯枪蘑。

自定義一個bean對象用來封裝我們的數(shù)據(jù),然后改寫其compareTo方法產(chǎn)生倒序排序的效果。然后自定義一個Groupingcomparator腥寇,將bean對象的分組邏輯改成按照我們的業(yè)務分組id來分組(比如訂單號)成翩。這樣,我們要取的最大值就是reduce()方法中傳進來key赦役。

7)邏輯處理接口:Reducer

用戶根據(jù)業(yè)務需求實現(xiàn)其中三個方法:reduce() ??setup() ??cleanup ()

8)輸出數(shù)據(jù)接口:OutputFormat

默認實現(xiàn)類是TextOutputFormat麻敌,功能邏輯是:將每一個KV對向目標文本文件中輸出為一行。

?SequenceFileOutputFormat將它的輸出寫為一個順序文件掂摔。如果輸出需要作為后續(xù)?MapReduce任務的輸入术羔,這便是一種好的輸出格式,因為它的格式緊湊乙漓,很容易被壓縮级历。

用戶還可以自定義OutputFormat。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叭披,一起剝皮案震驚了整個濱河市寥殖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌涩蜘,老刑警劉巖嚼贡,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異同诫,居然都是意外死亡粤策,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門误窖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叮盘,“玉大人,你說我怎么就攤上這事霹俺∪岷穑” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵吭服,是天一觀的道長嚷堡。 經(jīng)常有香客問我,道長艇棕,這世上最難降的妖魔是什么蝌戒? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮沼琉,結果婚禮上北苟,老公的妹妹穿的比我還像新娘。我一直安慰自己打瘪,他們只是感情好友鼻,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布傻昙。 她就那樣靜靜地躺著薪者,像睡著了一般秒赤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上春弥,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天虫碉,我揣著相機與錄音贾惦,去河邊找鬼。 笑死敦捧,一個胖子當著我的面吹牛须板,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播兢卵,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼习瑰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了秽荤?” 一聲冷哼從身側響起甜奄,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窃款,沒想到半個月后贺嫂,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡雁乡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了糜俗。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片踱稍。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖悠抹,靈堂內(nèi)的尸體忽然破棺而出珠月,到底是詐尸還是另有隱情,我是刑警寧澤楔敌,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布啤挎,位于F島的核電站,受9級特大地震影響卵凑,放射性物質(zhì)發(fā)生泄漏庆聘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一勺卢、第九天 我趴在偏房一處隱蔽的房頂上張望伙判。 院中可真熱鬧,春花似錦黑忱、人聲如沸宴抚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽菇曲。三九已至冠绢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間常潮,已是汗流浹背弟胀。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蕊玷,地道東北人邮利。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像垃帅,于是被迫代替她去往敵國和親延届。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容

  • MapReduce工作流程 流程圖如下 解釋上面的流程是整個mapreduce最全工作流程,但是shuffle過程...
    ZFH__ZJ閱讀 559評論 0 3
  • 數(shù)據(jù)切片和MapTask并行度決定機制 1)一個Job的Map階段并行度由客戶端在提交Job時的切片數(shù)決定 2)每...
    bullion閱讀 791評論 0 1
  • 要是你是個Java開發(fā)酱固,那么你肯定聽說過MapReduce械念,下面就來看看這個東東吧 一、簡介 MapReduce:...
    帥可兒妞閱讀 270評論 0 0
  • 一运悲、Mapreduce簡介 ??Mapreduce是一個分布式運算程序的編程框架龄减,是用戶開發(fā)“基于hadoop的數(shù)...
    無劍_君閱讀 568評論 0 0
  • 第一次聽到寂靜法師,是從一個朋友的聊天中班眯,當時只是覺得有這么個人 第二次希停,聽到寂靜法師,還是從這位朋友那署隘,那時候已...
    翁霞閱讀 2,240評論 0 0