數(shù)據(jù)切片和MapTask并行度決定機制
1)一個Job的Map階段并行度由客戶端在提交Job時的切片數(shù)決定
2)每一個Split切片分配一個MapTask并行實例處理
3)默認情況下,切片大小=BlockSize
4)切片時不考慮數(shù)據(jù)集整體筏养,而是逐個針對每一個文件單獨切片
FileInputFormat切片源碼解析
1)程序先找到你數(shù)據(jù)存儲的目錄
2)開始遍歷處理目錄下的每一個文件
3)遍歷第一個文件ss.txt
? ? a)獲取文件大小fs.sizeOf(ss.txt)
? ? b)計算切片大小
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
? ? c)默認情況下切片大小=blocksize塊大小
? ? d)開始切片炎辨,ss.txt=300M
第一個切片:ss.txt——0:128M?
第二個切片:ss.txt——128:256M
第三個切片:ss.txt——256:300M
(每次切片時,都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)
? ? e)將切片信息寫到一個切片規(guī)劃文件中
? ? f)整個切片的核心過程在getSplit()方法中完成
? ? g)InputSplit只記錄了切片的元數(shù)據(jù)信息,比如起始位置,長度以及所在節(jié)點列表等
4)提交切片規(guī)劃文件到Y(jié)ARN上蟀悦,YARN上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計算開啟MapTask個數(shù)
FileInputFormat切片大小的參數(shù)配置
1)源碼中計算切片大小的公式
Math.max(minSize, Math.min(maxSize, blockSize))
mapreduce.input.fileinputformat.split.minsize=1 默認值為1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認值為Long的最大值
2)切片大小設(shè)置
maxsize(切片最大值):參數(shù)如果調(diào)得比blockSize小,則會讓切片變小氧敢,而且就等于配置的這個參數(shù)的值
minsize(切片最小值):參數(shù)調(diào)得比blockSize大熬芜,則可以讓切片變得比blockSize還大
3)獲取切片信息API
//獲取切片的文件名稱
String name=inputSplit.getPath().getName();
//根據(jù)文件類型獲取切片信息
FileSplit inputSplit=(FileSplit)context.getInputSplit();
CombineTextInputFormat切片機制
????框架默認的TextInputFormat切片機制是對任務(wù)按文件規(guī)劃切片,不管文件多小福稳,都會是一個單獨的切片,都會交給一個MapTask瑞侮,這樣如果有大量小文件就會產(chǎn)生大量的MapTask的圆,處理效率極其低下
????CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中半火,這樣多個小文件就可以交給一個MapTask處理
虛擬存儲切片最大值設(shè)置:
// 如果不設(shè)置InputFormat越妈,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
// 虛擬存儲切片最大值設(shè)置20m = 20971520
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);? ? //4m
切片機制:
FileInputFormat實現(xiàn)類
????TextInputFormat? ? 處理文本
????KeyValueTextInputFormat? ? 處理鍵值對
????NLineInputFormat? ? 按行處理
????CombineTextInputFormat? ? 處理小文件
? ? 自定義InputFormat
TextInputFormat
? ? 是默認的FileInputFormat實現(xiàn)類。按行讀取每條記錄钮糖。鍵是存儲該行在整個文件中的起始字節(jié)偏移量梅掠,LongWritable類型酌住。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符)阎抒,Text類型酪我。
KeyValueTextInputFormat
? ? 每一行均為一條記錄,被分隔符分割為key,value且叁《伎蓿可以通過在驅(qū)動類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");來設(shè)定分隔符。默認分隔符是tab(\t)逞带。
NLineInputFormat
? ? 如果使用NLineInputFormat欺矫,代表每個map進程處理的InputSplit不再按Block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來劃分展氓。即輸入文件的總行數(shù)/N=切片數(shù)穆趴,如果不整數(shù),切片數(shù)=商+1遇汞。
自定義InputFormat
? ? 步驟:
????????1)自定義一個類繼承FileInputFormat
????????2)改寫RecordReader未妹,實現(xiàn)一次讀取一個完整文件封裝為KV
????????3)在輸出時使用SequenceFileOutPutFormat輸出合并文件
MapReduce工作流程
? ? Map階段
? ? 1)客戶端形成任務(wù)切片數(shù)
? ? 2)客戶端提交切片信息(.split) 配置信息(.xml)和.jar
? ? 3)Yarn根據(jù)切片數(shù)啟動相應(yīng)的MapTask
? ? 4)MapTask中讀取客戶端提交的數(shù)據(jù),默認使用TextInputFormat按行讀取勺疼,然后在MapTask中進行邏輯運算并寫出
? ? 5)MapTask寫出到環(huán)形緩存區(qū)教寂,環(huán)形緩存區(qū)對數(shù)據(jù)進行分區(qū)排序(字典順序,快排)执庐,環(huán)形緩存區(qū)默認100M酪耕,寫到80%時將環(huán)形緩存區(qū)數(shù)據(jù)溢出到文件(分區(qū)且區(qū)內(nèi)有序),然后在繼續(xù)回寫環(huán)形緩存區(qū)
? ? 6)對溢出的文件根據(jù)分區(qū)歸并排序
? ? 7)Combiner合并
????????案例:
? ? Reduce階段
? ? 1)所有MapTask任務(wù)完成后轨淌,啟動相應(yīng)數(shù)量的ReduceTask(根據(jù)分區(qū)數(shù)決定啟動幾個ReduceTask)
? ? 2)下載MapTask歸并排序后的數(shù)據(jù)到本地磁盤迂烁,如果數(shù)據(jù)小則直接放到緩存中
? ? 3)合并下載的數(shù)據(jù)歸并排序
? ? 4)分組排序
????? ??案例:
????????????WritableComparable排序案例實操(全排序)
????????????WritableComparable排序案例實操(區(qū)內(nèi)排序)
????????????GroupingComparator分組案例實操
? ? 5)將數(shù)據(jù)按key相同的原則(一次讀取一組)傳入Reduce方法中
? ? 6)寫出到Part-r-0000000等文件
Shuffle機制
? ? Map方法之后,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle
Partition分區(qū)
????默認Partition分區(qū)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
? ? public int getPartition(K key, V value, int numReduceTasks){
? ? ? ? return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
????}
}
自定義Partitioner分區(qū)
需求:根據(jù)手機號前三位輸出到不同文件中
? ? 1)自定義類繼承Partitioner递鹉,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
? ? @Override
? ? public int getPartition(Text key, FlowBean value, int i) {
? ? ? ? // key 是手機號
? ? ? ? // value 流量信息
? ? ? ? // 獲取手機號前三位
? ? ? ? String prePhoneNum = key.toString().substring(0, 3);
? ? ? ? int partition = 4;
? ? ? ? if ("136".equals(prePhoneNum)) {
? ? ? ? ? ? partition = 0;
? ? ? ? } else if ("137".equals(prePhoneNum)) {
? ? ? ? ? ? partition = 1;
? ? ? ? } else if ("138".equals(prePhoneNum)) {
? ? ? ? ? ? partition = 2;
? ? ? ? } else if ("139".equals(prePhoneNum)) {
? ? ? ? ? ? partition = 3;
? ? ? ? }
? ? ? ? return partition;
? ? }
}
? ? 2)在Job驅(qū)動中盟步,設(shè)置自定義Partitioner
job.setPartitionerClass(ProvincePartitioner.class);
? ? 3)自定義Partition后,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)數(shù)量的ReduceTask
job.setNumReduceTasks(5);
排序
? ? 默認排序是按照字典順序排序躏结,且實現(xiàn)該排序的方法是快速排序却盘。
排序分類
? ? 1)部分排序
? ? ? ? MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部有序
? ? 2)全排序
? ? ? ? 最終輸出結(jié)果只有一個文件媳拴,且文件每部有序黄橘。實現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低屈溉,因為一臺機器處理所有文件塞关,完全喪失了MapReduce所提供的并行架構(gòu)。
????3)輔助排序:(GroupingComparator分組)
? ? ? ? 在Reduce端對key進行分組子巾。應(yīng)用于:在接受key為Bean對象時帆赢,想讓一個或幾個字段相同(全部字段比較不相同)的key進入到同一個reduce方法時小压,可以采用分組排序
? ? 4)二次排序
? ? ? ? 在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序
案例:
????WritableComparable排序案例實操(全排序)
????WritableComparable排序案例實操(區(qū)內(nèi)排序)
Combiner合并
? ? 1)Combiner是MR程序中Mapper和Reducer之外的一種組件
? ? 2)Combiner組件的父類就是Reducer
? ? 3)Combiner和Reducer的區(qū)別在于運行的位置:
? ? ? ? Combiner是在每一個MapTask所在的節(jié)點運行椰于;
? ? ? ? Reducer是接收全局所有Mapper的輸出結(jié)果怠益;
? ? 4)Combiner的意義就是對每一個 MapTask的輸出進行局部匯總,以減小網(wǎng)絡(luò)傳輸量
? ? 5)Combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯廉羔,而且溉痢,Combiner的輸出kv應(yīng)該跟Reducer的輸入kv類型要對應(yīng)起來
案例:
設(shè)置ReduceTask并行度(個數(shù))
//驅(qū)動類中加入以下代碼即可
//默認值是1 手動設(shè)置為4
job.setNumReduceTasks(4);
注意:
1)ReduceTask如果為0,表示沒有Reduce階段憋他,輸出文件個數(shù)和Map個數(shù)一致
2)ReduceTask默認值就是1孩饼,所以輸出文件個數(shù)為一個
3)如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
4)ReduceTask數(shù)量并不是任意設(shè)置竹挡,還要考慮業(yè)務(wù)邏輯需求镀娶,有些情況下,需要計算全局匯總結(jié)果揪罕,就只能有一個ReduceTask
5)具體多少個ReduceTask梯码,需要根據(jù)集群性能而定
6)如果分區(qū)數(shù)不是1,但是ReduceTask為1好啰,是否執(zhí)行分區(qū)過程轩娶?答案是:不執(zhí)行分區(qū)過程。因為在MapTask的源碼中框往,執(zhí)行分區(qū)的前提是先判斷ReduceNum個數(shù)是否大于1鳄抒。不大于1肯定不執(zhí)行
OutputFormat接口實現(xiàn)類
? ? OutputFormat是MapReduce輸出的基類,所有實現(xiàn)MapReduce輸出都實現(xiàn)了OutputFormat接口
? ? 1.文本輸出TextOutputFormat
????? ? 默認的輸出格式是TextOutputFormat椰弊,它把每條記錄寫為文本行许溅。它的鍵和值可以是任意類型,因為TextOutputFormat調(diào)用toString()方法把他們轉(zhuǎn)換為字符串
? ? 2.SequenceFileOutputFormat
? ? ? ? 將SequenceFileOutputFormat輸出作為后續(xù)MapReduce任務(wù)的輸入秉版,這便是一種好的輸出格式贤重,因為它的格式緊湊,很容易被壓縮
Reduce Join
? ? Map端的主要工作:為來自不同表或文件的key/value對清焕,打標簽以區(qū)別不同來源的記錄并蝗。然后用連接字段作為key,其余部分和新加的標志作為value秸妥,最后進行輸出
? ? Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經(jīng)完成借卧,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在Map階段已經(jīng)打標志)分開,最后進行合并就ok了筛峭。
Reduce Join缺點及解決方案
? ? 缺點:這種方式中,合并的操作是在Reducer階段完成陪每,Reduce端的處理壓力太大影晓,Map節(jié)點的運算負載則很低镰吵,資源利用率不高,且在Reduce階段極易產(chǎn)生數(shù)據(jù)傾斜挂签。
? ? 解決方案:Map端實現(xiàn)數(shù)據(jù)合并
Map Join
? ? 使用場景:Map join適用于一張表十分小疤祭,一張表很大的場景
? ? 問:在Reduce端處理過多的表,非常容易產(chǎn)生數(shù)據(jù)傾斜饵婆。怎么辦勺馆?
? ? 答:在Map端緩存多張表,提前處理業(yè)務(wù)邏輯侨核,這樣增加Map端業(yè)務(wù)草穆,減少Reduce端數(shù)據(jù)的壓力,盡可能的減少數(shù)據(jù)傾斜
計數(shù)器應(yīng)用
? ? Hadoop為每個作業(yè)維護若干個內(nèi)置計數(shù)器搓译,以描述多項指標悲柱。例如:某些計數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù),使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量些己。
? ? 1.計數(shù)器API
? ? ? ? 1)采用枚舉方式統(tǒng)計計數(shù)
? ? ? ? ? ? enum MyCounter {MALFORORMED,NORMAL}
? ? ? ? ? ? //對枚舉定義的自定義計數(shù)器加1
? ? ? ? ? ? context.getCounter(MyCounter.MALFORORMED).increment(1);
? ? ? ? 2)采用計數(shù)器組豌鸡,計數(shù)器名稱的方式統(tǒng)計
? ? ? ? ? ? context.getCounter("counterGroup", "counter").increment(1);
? ? ? ? 3)計數(shù)結(jié)果在程序運行后控制臺上查看
數(shù)據(jù)清洗(ETL)
? ? 在運行核心業(yè)務(wù)MapReducer程序之前,往往要先對數(shù)據(jù)進行清洗段标,清理掉不符合用戶要求的數(shù)據(jù)涯冠。清理過程往往只需要運行Mapper程序,不需要執(zhí)行Reducer程序逼庞。