15)MapReduce框架原理

數(shù)據(jù)切片和MapTask并行度決定機制

1)一個Job的Map階段并行度由客戶端在提交Job時的切片數(shù)決定

2)每一個Split切片分配一個MapTask并行實例處理

3)默認情況下,切片大小=BlockSize

4)切片時不考慮數(shù)據(jù)集整體筏养,而是逐個針對每一個文件單獨切片


FileInputFormat切片源碼解析

1)程序先找到你數(shù)據(jù)存儲的目錄

2)開始遍歷處理目錄下的每一個文件

3)遍歷第一個文件ss.txt

? ? a)獲取文件大小fs.sizeOf(ss.txt)

? ? b)計算切片大小

computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

? ? c)默認情況下切片大小=blocksize塊大小

? ? d)開始切片炎辨,ss.txt=300M

第一個切片:ss.txt——0:128M?

第二個切片:ss.txt——128:256M

第三個切片:ss.txt——256:300M

(每次切片時,都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)

? ? e)將切片信息寫到一個切片規(guī)劃文件中

? ? f)整個切片的核心過程在getSplit()方法中完成

? ? g)InputSplit只記錄了切片的元數(shù)據(jù)信息,比如起始位置,長度以及所在節(jié)點列表等

4)提交切片規(guī)劃文件到Y(jié)ARN上蟀悦,YARN上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計算開啟MapTask個數(shù)


FileInputFormat切片大小的參數(shù)配置

1)源碼中計算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize))

mapreduce.input.fileinputformat.split.minsize=1 默認值為1

mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認值為Long的最大值

2)切片大小設(shè)置

maxsize(切片最大值):參數(shù)如果調(diào)得比blockSize小,則會讓切片變小氧敢,而且就等于配置的這個參數(shù)的值

minsize(切片最小值):參數(shù)調(diào)得比blockSize大熬芜,則可以讓切片變得比blockSize還大

3)獲取切片信息API

//獲取切片的文件名稱

String name=inputSplit.getPath().getName();

//根據(jù)文件類型獲取切片信息

FileSplit inputSplit=(FileSplit)context.getInputSplit();


CombineTextInputFormat切片機制

????框架默認的TextInputFormat切片機制是對任務(wù)按文件規(guī)劃切片,不管文件多小福稳,都會是一個單獨的切片,都會交給一個MapTask瑞侮,這樣如果有大量小文件就會產(chǎn)生大量的MapTask的圆,處理效率極其低下

????CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中半火,這樣多個小文件就可以交給一個MapTask處理

虛擬存儲切片最大值設(shè)置:

// 如果不設(shè)置InputFormat越妈,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

// 虛擬存儲切片最大值設(shè)置20m = 20971520

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);? ? //4m

切片機制:


FileInputFormat實現(xiàn)類

????TextInputFormat? ? 處理文本

????KeyValueTextInputFormat? ? 處理鍵值對

????NLineInputFormat? ? 按行處理

????CombineTextInputFormat? ? 處理小文件

? ? 自定義InputFormat


TextInputFormat

? ? 是默認的FileInputFormat實現(xiàn)類。按行讀取每條記錄钮糖。鍵是存儲該行在整個文件中的起始字節(jié)偏移量梅掠,LongWritable類型酌住。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符)阎抒,Text類型酪我。

KeyValueTextInputFormat

? ? 每一行均為一條記錄,被分隔符分割為key,value且叁《伎蓿可以通過在驅(qū)動類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");來設(shè)定分隔符。默認分隔符是tab(\t)逞带。

NLineInputFormat

? ? 如果使用NLineInputFormat欺矫,代表每個map進程處理的InputSplit不再按Block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來劃分展氓。即輸入文件的總行數(shù)/N=切片數(shù)穆趴,如果不整數(shù),切片數(shù)=商+1遇汞。

自定義InputFormat

? ? 步驟:

????????1)自定義一個類繼承FileInputFormat

????????2)改寫RecordReader未妹,實現(xiàn)一次讀取一個完整文件封裝為KV

????????3)在輸出時使用SequenceFileOutPutFormat輸出合并文件


MapReduce工作流程

? ? Map階段

? ? 1)客戶端形成任務(wù)切片數(shù)

? ? 2)客戶端提交切片信息(.split) 配置信息(.xml)和.jar

? ? 3)Yarn根據(jù)切片數(shù)啟動相應(yīng)的MapTask

? ? 4)MapTask中讀取客戶端提交的數(shù)據(jù),默認使用TextInputFormat按行讀取勺疼,然后在MapTask中進行邏輯運算并寫出

? ? 5)MapTask寫出到環(huán)形緩存區(qū)教寂,環(huán)形緩存區(qū)對數(shù)據(jù)進行分區(qū)排序(字典順序,快排)执庐,環(huán)形緩存區(qū)默認100M酪耕,寫到80%時將環(huán)形緩存區(qū)數(shù)據(jù)溢出到文件(分區(qū)且區(qū)內(nèi)有序),然后在繼續(xù)回寫環(huán)形緩存區(qū)

? ? 6)對溢出的文件根據(jù)分區(qū)歸并排序

? ? 7)Combiner合并

????????案例:

????????????Combiner合并案例實操

? ? Reduce階段

? ? 1)所有MapTask任務(wù)完成后轨淌,啟動相應(yīng)數(shù)量的ReduceTask(根據(jù)分區(qū)數(shù)決定啟動幾個ReduceTask)

? ? 2)下載MapTask歸并排序后的數(shù)據(jù)到本地磁盤迂烁,如果數(shù)據(jù)小則直接放到緩存中

? ? 3)合并下載的數(shù)據(jù)歸并排序

? ? 4)分組排序

????? ??案例:

????????????WritableComparable排序案例實操(全排序)

????????????WritableComparable排序案例實操(區(qū)內(nèi)排序)

????????????GroupingComparator分組案例實操

? ? 5)將數(shù)據(jù)按key相同的原則(一次讀取一組)傳入Reduce方法中

? ? 6)寫出到Part-r-0000000等文件


Shuffle機制

? ? Map方法之后,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle


Partition分區(qū)

????默認Partition分區(qū)

public class HashPartitioner<K, V> extends Partitioner<K, V> {

? ? public int getPartition(K key, V value, int numReduceTasks){

? ? ? ? return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

????}

}


自定義Partitioner分區(qū)

需求:根據(jù)手機號前三位輸出到不同文件中

? ? 1)自定義類繼承Partitioner递鹉,重寫getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

? ? @Override

? ? public int getPartition(Text key, FlowBean value, int i) {

? ? ? ? // key 是手機號

? ? ? ? // value 流量信息

? ? ? ? // 獲取手機號前三位

? ? ? ? String prePhoneNum = key.toString().substring(0, 3);


? ? ? ? int partition = 4;

? ? ? ? if ("136".equals(prePhoneNum)) {

? ? ? ? ? ? partition = 0;

? ? ? ? } else if ("137".equals(prePhoneNum)) {

? ? ? ? ? ? partition = 1;

? ? ? ? } else if ("138".equals(prePhoneNum)) {

? ? ? ? ? ? partition = 2;

? ? ? ? } else if ("139".equals(prePhoneNum)) {

? ? ? ? ? ? partition = 3;

? ? ? ? }

? ? ? ? return partition;

? ? }

}

? ? 2)在Job驅(qū)動中盟步,設(shè)置自定義Partitioner

job.setPartitionerClass(ProvincePartitioner.class);

? ? 3)自定義Partition后,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)數(shù)量的ReduceTask

job.setNumReduceTasks(5);


排序

? ? 默認排序是按照字典順序排序躏结,且實現(xiàn)該排序的方法是快速排序却盘。


排序分類

? ? 1)部分排序

? ? ? ? MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部有序

? ? 2)全排序

? ? ? ? 最終輸出結(jié)果只有一個文件媳拴,且文件每部有序黄橘。實現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低屈溉,因為一臺機器處理所有文件塞关,完全喪失了MapReduce所提供的并行架構(gòu)。

????3)輔助排序:(GroupingComparator分組)

? ? ? ? 在Reduce端對key進行分組子巾。應(yīng)用于:在接受key為Bean對象時帆赢,想讓一個或幾個字段相同(全部字段比較不相同)的key進入到同一個reduce方法時小压,可以采用分組排序

? ? 4)二次排序

? ? ? ? 在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序

案例:

????WritableComparable排序案例實操(全排序)

????WritableComparable排序案例實操(區(qū)內(nèi)排序)

????GroupingComparator分組案例實操


Combiner合并

? ? 1)Combiner是MR程序中Mapper和Reducer之外的一種組件

? ? 2)Combiner組件的父類就是Reducer

? ? 3)Combiner和Reducer的區(qū)別在于運行的位置:

? ? ? ? Combiner是在每一個MapTask所在的節(jié)點運行椰于;

? ? ? ? Reducer是接收全局所有Mapper的輸出結(jié)果怠益;

? ? 4)Combiner的意義就是對每一個 MapTask的輸出進行局部匯總,以減小網(wǎng)絡(luò)傳輸量

? ? 5)Combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯廉羔,而且溉痢,Combiner的輸出kv應(yīng)該跟Reducer的輸入kv類型要對應(yīng)起來

案例:

????Combiner合并案例實操


設(shè)置ReduceTask并行度(個數(shù))

//驅(qū)動類中加入以下代碼即可

//默認值是1 手動設(shè)置為4

job.setNumReduceTasks(4);

注意:

1)ReduceTask如果為0,表示沒有Reduce階段憋他,輸出文件個數(shù)和Map個數(shù)一致

2)ReduceTask默認值就是1孩饼,所以輸出文件個數(shù)為一個

3)如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜

4)ReduceTask數(shù)量并不是任意設(shè)置竹挡,還要考慮業(yè)務(wù)邏輯需求镀娶,有些情況下,需要計算全局匯總結(jié)果揪罕,就只能有一個ReduceTask

5)具體多少個ReduceTask梯码,需要根據(jù)集群性能而定

6)如果分區(qū)數(shù)不是1,但是ReduceTask為1好啰,是否執(zhí)行分區(qū)過程轩娶?答案是:不執(zhí)行分區(qū)過程。因為在MapTask的源碼中框往,執(zhí)行分區(qū)的前提是先判斷ReduceNum個數(shù)是否大于1鳄抒。不大于1肯定不執(zhí)行


OutputFormat接口實現(xiàn)類

? ? OutputFormat是MapReduce輸出的基類,所有實現(xiàn)MapReduce輸出都實現(xiàn)了OutputFormat接口

? ? 1.文本輸出TextOutputFormat

????? ? 默認的輸出格式是TextOutputFormat椰弊,它把每條記錄寫為文本行许溅。它的鍵和值可以是任意類型,因為TextOutputFormat調(diào)用toString()方法把他們轉(zhuǎn)換為字符串

? ? 2.SequenceFileOutputFormat

? ? ? ? 將SequenceFileOutputFormat輸出作為后續(xù)MapReduce任務(wù)的輸入秉版,這便是一種好的輸出格式贤重,因為它的格式緊湊,很容易被壓縮

????3.自定義OutputFormat


Reduce Join

? ? Map端的主要工作:為來自不同表或文件的key/value對清焕,打標簽以區(qū)別不同來源的記錄并蝗。然后用連接字段作為key,其余部分和新加的標志作為value秸妥,最后進行輸出

? ? Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經(jīng)完成借卧,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在Map階段已經(jīng)打標志)分開,最后進行合并就ok了筛峭。

????Reduce Join案例實操

Reduce Join缺點及解決方案

? ? 缺點:這種方式中,合并的操作是在Reducer階段完成陪每,Reduce端的處理壓力太大影晓,Map節(jié)點的運算負載則很低镰吵,資源利用率不高,且在Reduce階段極易產(chǎn)生數(shù)據(jù)傾斜挂签。

? ? 解決方案:Map端實現(xiàn)數(shù)據(jù)合并


Map Join

? ? 使用場景:Map join適用于一張表十分小疤祭,一張表很大的場景

? ? 問:在Reduce端處理過多的表,非常容易產(chǎn)生數(shù)據(jù)傾斜饵婆。怎么辦勺馆?

? ? 答:在Map端緩存多張表,提前處理業(yè)務(wù)邏輯侨核,這樣增加Map端業(yè)務(wù)草穆,減少Reduce端數(shù)據(jù)的壓力,盡可能的減少數(shù)據(jù)傾斜

????Map Join案例實操


計數(shù)器應(yīng)用

? ? Hadoop為每個作業(yè)維護若干個內(nèi)置計數(shù)器搓译,以描述多項指標悲柱。例如:某些計數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù),使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量些己。

? ? 1.計數(shù)器API

? ? ? ? 1)采用枚舉方式統(tǒng)計計數(shù)

? ? ? ? ? ? enum MyCounter {MALFORORMED,NORMAL}

? ? ? ? ? ? //對枚舉定義的自定義計數(shù)器加1

? ? ? ? ? ? context.getCounter(MyCounter.MALFORORMED).increment(1);

? ? ? ? 2)采用計數(shù)器組豌鸡,計數(shù)器名稱的方式統(tǒng)計

? ? ? ? ? ? context.getCounter("counterGroup", "counter").increment(1);

? ? ? ? 3)計數(shù)結(jié)果在程序運行后控制臺上查看

計數(shù)器案例實操


數(shù)據(jù)清洗(ETL)

? ? 在運行核心業(yè)務(wù)MapReducer程序之前,往往要先對數(shù)據(jù)進行清洗段标,清理掉不符合用戶要求的數(shù)據(jù)涯冠。清理過程往往只需要運行Mapper程序,不需要執(zhí)行Reducer程序逼庞。

數(shù)據(jù)清洗案例實操

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蛇更,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子往堡,更是在濱河造成了極大的恐慌械荷,老刑警劉巖瀑构,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朗徊,死亡現(xiàn)場離奇詭異,居然都是意外死亡蛮粮,警方通過查閱死者的電腦和手機穆咐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門颤诀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人对湃,你說我怎么就攤上這事崖叫。” “怎么了拍柒?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵心傀,是天一觀的道長。 經(jīng)常有香客問我拆讯,道長脂男,這世上最難降的妖魔是什么养叛? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮宰翅,結(jié)果婚禮上弃甥,老公的妹妹穿的比我還像新娘。我一直安慰自己汁讼,他們只是感情好淆攻,可當我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嘿架,像睡著了一般瓶珊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上眶明,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天艰毒,我揣著相機與錄音,去河邊找鬼搜囱。 笑死丑瞧,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的蜀肘。 我是一名探鬼主播绊汹,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扮宠!你這毒婦竟也來了西乖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤坛增,失蹤者是張志新(化名)和其女友劉穎获雕,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體收捣,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡届案,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了罢艾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楣颠。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖咐蚯,靈堂內(nèi)的尸體忽然破棺而出童漩,到底是詐尸還是另有隱情,我是刑警寧澤春锋,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布矫膨,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏豆拨。R本人自食惡果不足惜直奋,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望施禾。 院中可真熱鬧,春花似錦搁胆、人聲如沸弥搞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽攀例。三九已至,卻和暖如春顾腊,著一層夾襖步出監(jiān)牢的瞬間粤铭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工杂靶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梆惯,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓吗垮,卻偏偏與公主長得像垛吗,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子烁登,可洞房花燭夜當晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容