這里講的hadoop1.0版本主要還是學(xué)習(xí)mr思想
大家都知道,當(dāng)我們需要編寫(xiě)一個(gè)簡(jiǎn)單的MapReduce作業(yè)時(shí)锁孟,只需實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù)即可,一旦將作業(yè)提交到集群上后,Hadoop內(nèi)部會(huì)將這兩個(gè)函數(shù)封裝到Map Task和Reduce Task中允耿,同時(shí)將它們調(diào)度到多個(gè)節(jié)點(diǎn)上并行執(zhí)行,而任務(wù)執(zhí)行過(guò)程中可能涉及的數(shù)據(jù)跨節(jié)點(diǎn)傳輸扒怖,記錄按key分組等操作均由Task內(nèi)部實(shí)現(xiàn)好了较锡,用戶無(wú)須關(guān)心。
為了深入了解Map Task和Reduce Task內(nèi)部實(shí)現(xiàn)原理盗痒,我們將Map Task分解成Read蚂蕴、Map、Collect俯邓、Spill和Combine五個(gè)階段骡楼,將Reduce Task分解成Shuffle、Merge看成、Sort君编、Reduce和Write五個(gè)階段,并依次詳細(xì)剖析每個(gè)階段的內(nèi)部實(shí)現(xiàn)細(xì)節(jié)川慌。
Task運(yùn)行過(guò)程概述
在MapReduce計(jì)算框架中吃嘿,一個(gè)應(yīng)用程序被劃分成Map和Reduce兩個(gè)計(jì)算階段,它們分別由一個(gè)或者多個(gè)Map Task和Reduce Task組成梦重。其中兑燥,每個(gè)Map Task處理輸入數(shù)據(jù)集合中的一片數(shù)據(jù)(InputSplit),并將產(chǎn)生的若干個(gè)數(shù)據(jù)片段寫(xiě)到本地磁盤上琴拧,而Reduce Task則從每個(gè)Map Task上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)片段降瞳,經(jīng)分組聚集和歸約后,將結(jié)果寫(xiě)到HDFS上作為最終結(jié)果蚓胸,具體如圖8-1所示挣饥。總體上看沛膳,Map Task與Reduce Task之間的數(shù)據(jù)傳輸采用了pull模型扔枫。為了能夠容錯(cuò),Map Task將中間計(jì)算結(jié)果存放到本地磁盤上锹安,而Reduce Task則通過(guò)HTTP請(qǐng)求從各個(gè)Map Task端拖榷碳觥(pull)相應(yīng)的輸入數(shù)據(jù)倚舀。為了更好地支持大量Reduce Task并發(fā)從Map Task端拷貝數(shù)據(jù),Hadoop采用了Jetty Server作為HTTP Server處理并發(fā)數(shù)據(jù)讀請(qǐng)求忍宋。
對(duì)于Map Task而言痕貌,它的執(zhí)行過(guò)程可概述為:首先,通過(guò)用戶提供的InputFormat將對(duì)應(yīng)的InputSplit解析成一系列key/value糠排,并依次交給用戶編寫(xiě)的map()函數(shù)處理舵稠;接著按照指定的Partitioner對(duì)數(shù)據(jù)分片,以確定每個(gè)key/value將交給哪個(gè)Reduce Task處理乳讥;之后將數(shù)據(jù)交給用戶定義的Combiner進(jìn)行一次本地規(guī)約(用戶沒(méi)有定義則直接跳過(guò))柱查;最后將處理結(jié)果保存到本地磁盤上。
對(duì)于Reduce Task而言云石,由于它的輸入數(shù)據(jù)來(lái)自各個(gè)Map Task唉工,因此首先需通過(guò)HTTP請(qǐng)求從各個(gè)已經(jīng)運(yùn)行完成的Map Task上拷貝對(duì)應(yīng)的數(shù)據(jù)分片,待所有數(shù)據(jù)拷貝完成后汹忠,再以key為關(guān)鍵字對(duì)所有數(shù)據(jù)進(jìn)行排序淋硝,通過(guò)排序,key相同的記錄聚集到一起形成若干分組宽菜,然后將每組數(shù)據(jù)交給用戶編寫(xiě)的reduce()函數(shù)處理谣膳,并將數(shù)據(jù)結(jié)果直接寫(xiě)到HDFS上作為最終輸出結(jié)果。
基本數(shù)據(jù)結(jié)構(gòu)和算法
在Map Task和Reduce Task實(shí)現(xiàn)過(guò)程中用到了大量數(shù)據(jù)結(jié)構(gòu)和算法铅乡,我們選取其中幾個(gè)非常核心的部分進(jìn)行介紹继谚。
前面提到,用戶可通過(guò)InputFormat和OuputFormat兩個(gè)組件自定義作業(yè)的輸入輸出格式阵幸,但并不能自定義Map Task的輸出格式(也就是Reduce Task的輸入格式)花履。考慮到Map Task的輸出文件需要到磁盤上并被Reduce Task遠(yuǎn)程拷貝挚赊,為盡可能減少數(shù)據(jù)量以避免不必要的磁盤和網(wǎng)絡(luò)開(kāi)銷诡壁,Hadoop內(nèi)部實(shí)現(xiàn)了支持行壓縮的數(shù)據(jù)存儲(chǔ)格式——IFile。
按照MapReduce語(yǔ)義荠割,Reduce Task需將拷貝自各個(gè)Map Task端的數(shù)據(jù)按照key進(jìn)行分組后才能交給reduce()函數(shù)處理妹卿,為此,Hadoop實(shí)現(xiàn)了基于排序的分組算法蔑鹦。但考慮到若完全由Reduce Task進(jìn)行全局排序會(huì)產(chǎn)生性能瓶頸夺克,Hadoop采用了分布式排序策略:先由各個(gè)Map Task對(duì)輸出數(shù)據(jù)進(jìn)行一次局部排序,然后由Reduce Task進(jìn)行一次全局排序嚎朽。
在任務(wù)運(yùn)行過(guò)程中懊直,為了能夠讓JobTracker獲取任務(wù)執(zhí)行進(jìn)度,各個(gè)任務(wù)會(huì)創(chuàng)建一個(gè)進(jìn)度匯報(bào)線程Reporter火鼻,只要任務(wù)處理一條新數(shù)據(jù)室囊,該線程將通過(guò)RPC告知TaskTracker,并由TaskTracker通過(guò)心跳進(jìn)一步告訴JobTracker魁索。
IFile存儲(chǔ)格式
IFile是一種支持行壓縮的存儲(chǔ)格式融撞。通常而言,Map Task中間輸出結(jié)果和Reduce Task遠(yuǎn)程拷貝結(jié)果被存放在IFile格式的磁盤文件或者內(nèi)存文件中粗蔚。為了盡可能減少M(fèi)ap Task寫(xiě)入磁盤數(shù)據(jù)量和跨網(wǎng)絡(luò)傳輸數(shù)據(jù)量尝偎,IFile支持按行壓縮數(shù)據(jù)記錄。當(dāng)前Hadoop提供了Zlib(默認(rèn)壓縮方式)鹏控、BZip2等壓縮算法致扯。如果用戶想啟用數(shù)據(jù)壓縮功能,則需為作業(yè)添加以下兩個(gè)配置選項(xiàng)当辐。
- mapred. compress.map.output:是否支持中間輸出結(jié)果壓縮抖僵,默認(rèn)為false。
- mapred. map.output.compression.codec:壓縮器(默認(rèn)是基于Zlib算法的壓縮器DefaultCodec)缘揪。任何一個(gè)壓縮器需實(shí)現(xiàn)CompressionCodec接口以提供壓縮輸出流和解壓縮輸入流耍群。
一旦啟用了壓縮機(jī)制,Hadoop會(huì)為每條記錄的key和value值進(jìn)行壓縮找筝。IFile定義的文件格式非常簡(jiǎn)單蹈垢,整個(gè)文件順次保存數(shù)據(jù)記錄,每條數(shù)據(jù)記錄格式為:
<key-len, value-len, key, value>
由于Map Task會(huì)按照key值對(duì)輸出數(shù)據(jù)進(jìn)行排序袖裕,因此IFile通常保存的是有序數(shù)據(jù)集曹抬。
IFile文件讀寫(xiě)操作由類IFile實(shí)現(xiàn),該類中包含兩個(gè)重要內(nèi)部類:Writer和Reader急鳄,分別用于Map Task生成IFile和Reduce Task讀取一個(gè)IFile(對(duì)于內(nèi)存中的數(shù)據(jù)讀取谤民,則使用InMemoryReader)。此外攒岛,為了保證數(shù)據(jù)一致性赖临,Hadoop分別為Writer和Reader提供了IFileOutputStream和IFileInputStream兩個(gè)支持CRC32校驗(yàn)的類,具體如圖8-2所示灾锯。
排序
排序是MapReduce框架中最重要的操作之一兢榨。Map Task和Reduce Task均會(huì)對(duì)數(shù)據(jù)(按照key)進(jìn)行排序。該操作屬于Hadoop的默認(rèn)行為顺饮。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序吵聪,而不管邏輯上是否需要。
對(duì)于Map Task兼雄,它會(huì)將處理的結(jié)果暫時(shí)放到一個(gè)緩沖區(qū)中吟逝,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后,再對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次排序赦肋,并將這些有序數(shù)據(jù)以IFile文件形式寫(xiě)到磁盤上块攒,而當(dāng)數(shù)據(jù)處理完畢后励稳,它會(huì)對(duì)磁盤上所有文件進(jìn)行一次合并,以將這些文件合并成一個(gè)大的有序文件囱井。
對(duì)于Reduce Task驹尼,它從每個(gè)Map Task上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過(guò)一定閾值庞呕,則放到磁盤上新翎,否則放到內(nèi)存中。如果磁盤上文件數(shù)目達(dá)到一定閾值住练,則進(jìn)行一次合并以生成一個(gè)更大文件地啰;如果內(nèi)存中文件大小或者數(shù)目超過(guò)一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)寫(xiě)到磁盤上讲逛。當(dāng)所有數(shù)據(jù)拷貝完畢后亏吝,Reduce Task統(tǒng)一對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次合并。
在Map Task和Reduce Task運(yùn)行過(guò)程中妆绞,緩沖區(qū)數(shù)據(jù)排序使用了Hadoop自己實(shí)現(xiàn)快速排序算法顺呕,而IFile文件合并則使用了基于堆實(shí)現(xiàn)的優(yōu)先隊(duì)列。
Reporter
前面文章提到括饶,所有Task需周期性向TaskTracker匯報(bào)最新進(jìn)度和計(jì)數(shù)器值株茶,而這正是由Reporter組件實(shí)現(xiàn)的。在Map/Reduce Task中图焰,TaskReporter類實(shí)現(xiàn)了Reporter接口启盛,并且以線程形式啟動(dòng)。TaskReporter匯報(bào)的信息中包含兩部分:任務(wù)執(zhí)行進(jìn)度和任務(wù)計(jì)數(shù)器值技羔。
1.任務(wù)執(zhí)行進(jìn)度
任務(wù)執(zhí)行進(jìn)度信息被封裝到類Progress中僵闯,且每個(gè)Progress實(shí)例以樹(shù)的形式存在。Hadoop采用了簡(jiǎn)單的線性模型計(jì)算每個(gè)階段的進(jìn)度值:如果一個(gè)大階段可被分解成若干個(gè)子階段藤滥,則可將大階段看作一棵樹(shù)的父節(jié)點(diǎn)鳖粟,而子階段可看作父節(jié)點(diǎn)對(duì)應(yīng)的子節(jié)點(diǎn),且大階段的進(jìn)度值可被均攤到各個(gè)子階段中拙绊;如果一個(gè)階段不可再分解向图,則該階段進(jìn)度值可表示成已讀取數(shù)據(jù)量占總數(shù)據(jù)量的比例。
對(duì)于Map Task而言标沪,它作為一個(gè)大階段不可再分解榄攀,為了簡(jiǎn)便,我們直接將已讀取數(shù)據(jù)量占總數(shù)據(jù)量的比例作為任務(wù)當(dāng)前執(zhí)行進(jìn)度值金句。
對(duì)于Reduce Task而言檩赢,我們可將其分解成三個(gè)階段:Shuffle、Sort和Reduce违寞,每個(gè)階段占任務(wù)總進(jìn)度的1/3贞瞒∨挤浚考慮到在Shuffle階段,Reduce Task需從M(M為Map Task數(shù)目)個(gè)Map Task上讀取一片數(shù)據(jù)憔狞,因此蝴悉,可被分解成M個(gè)階段,每個(gè)階段占Shuffle進(jìn)度的1/M瘾敢,具體如圖8-5所示。
對(duì)于TaskReporter線程而言尿这,它并不會(huì)總是每隔一段時(shí)間匯報(bào)進(jìn)度和計(jì)數(shù)器值簇抵,而是僅當(dāng)發(fā)現(xiàn)以下兩種情況之一時(shí)才會(huì)匯報(bào)。
- 任務(wù)執(zhí)行進(jìn)度發(fā)生變化射众;
- 任務(wù)的某個(gè)計(jì)數(shù)器值發(fā)生變化碟摆。
在某個(gè)時(shí)間間隔內(nèi),如果任務(wù)執(zhí)行進(jìn)度和計(jì)數(shù)器值均未發(fā)生變化叨橱,則Task只會(huì)簡(jiǎn)單地通過(guò)調(diào)用RPC函數(shù)ping探測(cè)TaskTracker是否活著典蜕。在一定時(shí)間內(nèi),如果某個(gè)任務(wù)的執(zhí)行進(jìn)度和計(jì)數(shù)器值均未發(fā)生變化罗洗,則TaskTracker認(rèn)為它處于懸掛(hang up)狀態(tài)愉舔,直接將其殺掉。為了防止某條記錄因處理時(shí)間過(guò)長(zhǎng)導(dǎo)致被殺伙菜,用戶可采用以下兩種方法:
- 每隔一段時(shí)間調(diào)用一次TaskReporter.progress()函數(shù)轩缤,以告訴TaskTracker自己仍然活著。
- 增大任務(wù)超時(shí)參數(shù)mapred.task.timeout(默認(rèn)是10 min)對(duì)應(yīng)的值贩绕。
2.任務(wù)計(jì)數(shù)器
任務(wù)計(jì)數(shù)器(Counter)是Hadoop提供的火的,用于實(shí)現(xiàn)跟蹤任務(wù)運(yùn)行進(jìn)度的全局計(jì)數(shù)功能。任務(wù)計(jì)數(shù)器(Counter)是Hadoop提供的淑倾,用于實(shí)現(xiàn)跟蹤任務(wù)運(yùn)行進(jìn)度的全局計(jì)數(shù)功能馏鹤。用戶可在自己的應(yīng)用程序中添加計(jì)數(shù)器。任務(wù)計(jì)數(shù)器由兩部分組成:<name, value>娇哆,其中湃累,name表示計(jì)數(shù)器名稱,value表示計(jì)數(shù)器值(long類型)迂尝。計(jì)數(shù)器通常以組為單位管理脱茉,一個(gè)計(jì)數(shù)器屬于一個(gè)計(jì)數(shù)器組(CounterGroup)。此外垄开,Hadoop規(guī)定一個(gè)作業(yè)最多包含120個(gè)計(jì)數(shù)器(可通過(guò)參數(shù)mapreduce.job.counters.limit設(shè)定)琴许,50個(gè)計(jì)數(shù)器組。
對(duì)于同一個(gè)任務(wù)而言溉躲,所有任務(wù)包含的計(jì)數(shù)器相同榜田,每個(gè)任務(wù)更新自己的計(jì)數(shù)器值益兄,然后匯報(bào)給TaskTracker,并由TaskTracker通過(guò)心跳匯報(bào)給JobTracker箭券,最后由JobTracker以作業(yè)為單位對(duì)所有計(jì)數(shù)器進(jìn)行累加净捅。作業(yè)的計(jì)數(shù)器分為兩類:MapReduce內(nèi)置計(jì)數(shù)器和用戶自定義計(jì)數(shù)器。
(1)MapReduce內(nèi)置計(jì)數(shù)器
MapReduce框架內(nèi)部為每個(gè)任務(wù)添加了三個(gè)計(jì)數(shù)器組辩块,分別位于File Input Format Counters, File Output Format Counters和Map-Reduce Framework中蛔六。它們包含的計(jì)數(shù)器分別見(jiàn)表8-1,表8-2和表8-3废亭。
(2)用戶自定義計(jì)數(shù)器
Map Task內(nèi)部實(shí)現(xiàn)
前面提到国章,Map Task分為4種,分別是Job-setup Task豆村、Job-cleanup Task液兽、Task-cleanup Task和Map Task。
其中掌动,Job-setup Task和Job-cleanup Task分別是作業(yè)運(yùn)行時(shí)啟動(dòng)的第一個(gè)任務(wù)和最后一個(gè)任務(wù)四啰,主要工作分別是進(jìn)行一些作業(yè)初始化和收尾工作,比如創(chuàng)建和刪除作業(yè)臨時(shí)輸出目錄粗恢;而Task-cleanup Task則是任務(wù)失敗或者被殺死后柑晒,用于清理已寫(xiě)入臨時(shí)目錄中數(shù)據(jù)的任務(wù)。本節(jié)主要講解第四種任務(wù)——普通的Map Task适滓。它需要處理數(shù)據(jù)敦迄,并將計(jì)算結(jié)果存到本地磁盤上。
Map Task整體流程
Map Task的整體計(jì)算流程如圖8-6所示凭迹,共分為5個(gè)階段罚屋,分別是:
- Read階段:Map Task通過(guò)用戶編寫(xiě)的RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)key/value嗅绸。
- Map階段:該階段主要是將解析出的key/value交給用戶編寫(xiě)的map()函數(shù)處理脾猛,并產(chǎn)生一系列新的key/value。
- Collect階段:在用戶編寫(xiě)的map()函數(shù)中鱼鸠,當(dāng)數(shù)據(jù)處理完成后猛拴,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部蚀狰,它會(huì)將生成的key/value分片(通過(guò)調(diào)用Partitioner)愉昆,并寫(xiě)入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
- Spill階段:即“溢寫(xiě)”麻蹋,當(dāng)環(huán)形緩沖區(qū)滿后跛溉,MapReduce會(huì)將數(shù)據(jù)寫(xiě)到本地磁盤上,生成一個(gè)臨時(shí)文件。需要注意的是芳室,將數(shù)據(jù)寫(xiě)入本地磁盤之前专肪,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并堪侯、壓縮等操作嚎尤。
-
Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,Map Task對(duì)所有臨時(shí)文件進(jìn)行一次合并伍宦,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件芽死。
MapReduce框架提供了兩套API,默認(rèn)情況下采用舊API雹拄,用戶可通過(guò)設(shè)置參數(shù)mapred.mapper.new-api為true啟用新API裆操。新API在封裝性和擴(kuò)展性等方面優(yōu)于舊API芹关,但性能上并沒(méi)有改進(jìn)。這里主要以舊API為例進(jìn)行講解叹阔。
在Map Task中质蕉,最重要的部分是輸出結(jié)果在內(nèi)存和磁盤中的組織方式势篡,具體涉及Collect、Spill和Combine三個(gè)階段模暗,也就是用戶調(diào)用OutputCollector.collect()函數(shù)之后依次經(jīng)歷的幾個(gè)階段禁悠。我們將在下面幾小節(jié)深入分析這幾個(gè)階段。
Collect過(guò)程分析
待map()函數(shù)處理完一對(duì)key/value兑宇,并產(chǎn)生新的key/value后碍侦,會(huì)調(diào)用OutputCollector.collect()函數(shù)輸出結(jié)果。本小節(jié)重點(diǎn)剖析該函數(shù)內(nèi)部實(shí)現(xiàn)機(jī)制隶糕。
跟蹤進(jìn)入Map Task的入口函數(shù)run()瓷产,可發(fā)現(xiàn),如果用戶選用舊API枚驻,則會(huì)調(diào)用runOldMapper函數(shù)處理數(shù)據(jù)濒旦。該函數(shù)根據(jù)實(shí)際的配置創(chuàng)建合適的MapRunnable以迭代調(diào)用用戶編寫(xiě)的map()函數(shù),而map()函數(shù)的參數(shù)OutputCollector正是MapRunnable傳入的OldOutputCollector對(duì)象再登。
OldOutputCollector根據(jù)作業(yè)是否包含Reduce Task封裝了不同的MapOutputCollector實(shí)現(xiàn)尔邓,如果Reduce Task數(shù)目為0,則封裝DirectMapOutputCollector對(duì)象直接將結(jié)果寫(xiě)入HDFS中作為最終結(jié)果锉矢,否則封裝MapOutputBuffer對(duì)象暫時(shí)將結(jié)果寫(xiě)入本地磁盤上以供Reduce Task進(jìn)一步處理梯嗽。本小節(jié)主要分析Reduce Task數(shù)目非0的情況。
用戶在map()函數(shù)中調(diào)用OldOutputCollector.collect(key, value)后沽损,在該函數(shù)內(nèi)部灯节,首先會(huì)調(diào)用Partitioner.getPartition()函數(shù)獲取記錄的分區(qū)號(hào)partition,然后將三元組<key, value, partition>傳遞給MapOutputBuffer.collect()函數(shù)做進(jìn)一步處理。
MapOutputBuffer內(nèi)部使用了一個(gè)緩沖區(qū)暫時(shí)存儲(chǔ)用戶輸出數(shù)據(jù)显晶,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后贷岸,再將緩沖區(qū)中的數(shù)據(jù)寫(xiě)到磁盤上。數(shù)據(jù)緩沖區(qū)的設(shè)計(jì)方式直接影響到Map Task的寫(xiě)效率磷雇,而現(xiàn)有多種數(shù)據(jù)結(jié)構(gòu)可供選擇偿警,最簡(jiǎn)單的是單向緩沖區(qū),生產(chǎn)者向緩沖區(qū)中單向?qū)懭胼敵鑫希?dāng)緩沖區(qū)寫(xiě)滿后螟蒸,一次性寫(xiě)到磁盤上,就這樣崩掘,不斷寫(xiě)緩沖區(qū)七嫌,直到所有數(shù)據(jù)寫(xiě)到磁盤上。單向緩沖區(qū)最大的問(wèn)題是性能不高苞慢,不能支持同時(shí)讀寫(xiě)數(shù)據(jù)诵原。雙緩沖區(qū)是對(duì)單向緩沖區(qū)的一個(gè)改進(jìn),它使用兩個(gè)緩沖區(qū)挽放,其中一個(gè)用于寫(xiě)入數(shù)據(jù)绍赛,另一個(gè)將寫(xiě)滿的數(shù)據(jù)寫(xiě)到磁盤上,這樣辑畦,兩個(gè)緩沖區(qū)交替讀寫(xiě)吗蚌,進(jìn)而提高效率。實(shí)際上纯出,雙緩沖區(qū)只能一定程度上讓讀寫(xiě)并行蚯妇,仍會(huì)存在讀寫(xiě)等待問(wèn)題。一種更好的緩沖區(qū)設(shè)計(jì)方式是采用環(huán)形緩沖區(qū):當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后暂筝,便開(kāi)始向磁盤上寫(xiě)入數(shù)據(jù)箩言,同時(shí),生產(chǎn)者仍可以向不斷增加的剩余空間中循環(huán)寫(xiě)入數(shù)據(jù)乖杠,進(jìn)而達(dá)到真正的讀寫(xiě)并行分扎。三種緩沖區(qū)結(jié)構(gòu)如圖8-7所示。
MapOutputBuffer正是采用了環(huán)形內(nèi)存緩沖區(qū)保存數(shù)據(jù)胧洒,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后畏吓,由線程SpillThread將數(shù)據(jù)寫(xiě)到一個(gè)臨時(shí)文件中,當(dāng)所有數(shù)據(jù)處理完畢后卫漫,對(duì)所有臨時(shí)文件進(jìn)行一次合并以生成一個(gè)最終文件菲饼。環(huán)形緩沖區(qū)使得Map Task的Collect階段和Spill階段可并行進(jìn)行。
MapOutputBuffer內(nèi)部采用了兩級(jí)索引結(jié)構(gòu)(見(jiàn)圖8-8)列赎,涉及三個(gè)環(huán)形內(nèi)存緩沖區(qū)宏悦,分別是kvoffsets、kvindices和kvbuffer,這三個(gè)緩沖區(qū)所占內(nèi)存空間總大小為io.sort.mb(默認(rèn)是100 MB)饼煞。下面分別介紹這三個(gè)緩沖區(qū)的含義源葫。
(1)kvoffsets
kvoffsets即偏移量索引數(shù)組,用于保存key/value信息在位置索引kvindices中的偏移量砖瞧∠⑻茫考慮到一對(duì)key/value需占用數(shù)組kvoffsets的1個(gè)int(整型)大小,數(shù)組kvindices的3個(gè)int大锌榇佟(分別保存所在partition號(hào)荣堰、key開(kāi)始位置和value開(kāi)始位置),所以Hadoop按比例1:3將大小為{io.sort.mb}的內(nèi)存空間分配給數(shù)組kvoffsets和kvindices竭翠,其間涉及的緩沖區(qū)分配方式見(jiàn)圖8-9振坚,計(jì)算過(guò)程如下:
private static final int ACCTSIZE=3;//每對(duì)key/value占用kvindices中的三項(xiàng)
private static final int RECSIZE=(ACCTSIZE+1)*4斋扰;//每對(duì)key/value共占用
kvoffsets和kvindices中的4個(gè)字節(jié)(4*4=16 byte)
final float recper=job.getFloat("io.sort.record.percent"渡八,(float)0.05);
final int sortmb=job.getInt("io.sort.mb"传货,100)呀狼;
int maxMemUsage=sortmb<<20;//將內(nèi)存單位轉(zhuǎn)化為字節(jié)
int recordCapacity=(int)(maxMemUsage*recper)损离;
recordCapacity-=recordCapacity%RECSIZE;//保證recordCapacity是4*4的整數(shù)倍
recordCapacity/=RECSIZE绝编;//計(jì)算內(nèi)存中最多保存key/value數(shù)目
kvoffsets=new int[recordCapacity]僻澎;//kvoffsets占用1:3中的1
kvindices=new int[recordCapacity*ACCTSIZE];//kvindices占用1:3中的3
當(dāng)該數(shù)組使用率超過(guò)io.sort.spill.percent后十饥,便會(huì)觸發(fā)線程SpillThread將數(shù)據(jù)寫(xiě)入磁盤窟勃。
(2)kvindices
kvindices即位置索引數(shù)組,用于保存key/value值在數(shù)據(jù)緩沖區(qū)kvbuffer中的起始位置逗堵。
(3)kvbuffer
kvbuffer即數(shù)據(jù)緩沖區(qū)秉氧,用于保存實(shí)際的key/value值,默認(rèn)情況下最多可使用io.sort.mb中的95%蜒秤,當(dāng)該緩沖區(qū)使用率超過(guò)io.sort.spill.percent后汁咏,便會(huì)觸發(fā)線程SpillThread將數(shù)據(jù)寫(xiě)入磁盤。
以上幾個(gè)緩沖區(qū)讀寫(xiě)采用了典型的單生產(chǎn)者消費(fèi)者模型作媚,其中攘滩,MapOutputBuffer的collect方法和MapOutputBuffer.Buffer的write方法是生產(chǎn)者,spillThread線程是消費(fèi)者纸泡,它們之間同步是通過(guò)可重入的互斥鎖spillLock和spillLock上的兩個(gè)條件變量(spillDone和spillReady)完成的漂问。生產(chǎn)者主要的偽代碼如下:
//取得下一個(gè)可寫(xiě)入的位置
spillLock.lock();
if(緩沖區(qū)使用率達(dá)到閾值){
//喚醒SpillThread線程,將緩沖區(qū)數(shù)據(jù)寫(xiě)入磁盤
spillReady.signal()蚤假;
}
if(緩沖區(qū)滿){
//等待SpillThread線程結(jié)束
spillDone.wait()栏饮;
}
spillLock.lock();
//將數(shù)據(jù)寫(xiě)入緩沖區(qū)
下面分別介紹環(huán)形緩沖區(qū)kvoffsets和kvbuffer的數(shù)據(jù)寫(xiě)入過(guò)程磷仰。
(1)環(huán)形緩沖區(qū)kvoffsets
通常用一個(gè)線性緩沖區(qū)模擬實(shí)現(xiàn)環(huán)形緩沖區(qū)袍嬉,并通過(guò)取模操作實(shí)現(xiàn)循環(huán)數(shù)據(jù)存儲(chǔ)。下面介紹環(huán)形緩沖區(qū)kvoffsets的寫(xiě)數(shù)據(jù)過(guò)程芒划。該過(guò)程由指針kvstart/kvend/kvindex控制冬竟,其中kvstart表示存有數(shù)據(jù)的內(nèi)存段初始位置,kvindex表示未存儲(chǔ)數(shù)據(jù)的內(nèi)存段初始位置民逼,而在正常寫(xiě)入情況下泵殴,kvend=kvstart,一旦滿足溢寫(xiě)條件拼苍,則kvend=kvindex笑诅,此時(shí)指針區(qū)間[kvstart, kvend)為有效數(shù)據(jù)區(qū)間。具體涉及的操作如下疮鲫。
操作1:寫(xiě)入緩沖區(qū)吆你。
直接將數(shù)據(jù)寫(xiě)入kvindex指針指向的內(nèi)存空間,同時(shí)移動(dòng)kvindex指向下一個(gè)可寫(xiě)入的內(nèi)存空間首地址俊犯,kvindex移動(dòng)公式為:kvindex=(kvindex+1)%kvoffsets.length妇多。由于kvoffsets為環(huán)形緩沖區(qū),因此可能涉及兩種寫(xiě)入情況燕侠。
情況1:kvindex>kvend者祖,如圖8-10所示。在這種情況下绢彤,指針kvindex在指針kvend后面七问,如果向緩沖區(qū)中寫(xiě)入一個(gè)字符串,則kvindex指針后移一位茫舶。
情況2:kvindex<=kvend械巡,如圖8-11所示。在這種情況下饶氏,指針kvindex位于指針kvend前面讥耗,如果向緩沖區(qū)中寫(xiě)入一個(gè)字符串,則kvindex指針后移一位嚷往。
操作2:溢寫(xiě)到磁盤葛账。
當(dāng)kvoffsets內(nèi)存空間使用率超過(guò)io.sort.spill.percent(默認(rèn)是80%)后,需將內(nèi)存中數(shù)據(jù)寫(xiě)到磁盤上皮仁。為了判斷是否滿足該條件籍琳,需先求出kvoffsets已使用內(nèi)存菲宴。如果kvindex>kvend,則已使用內(nèi)存大小為kvindex-kvend趋急;否則喝峦,已使用內(nèi)存大小為kvoffsets.length-(kvend-kvindex)。
(2)環(huán)形緩沖區(qū)kvbuffer
環(huán)形緩沖區(qū)kvbuffer的讀寫(xiě)操作過(guò)程由指針bufstart/bufend/bufvoid/bufindex/bufmark控制呜达,其中谣蠢,bufstart/bufend/bufindex含義與kvstart/kvend/kvindex相同,而bufvoid指向kvbuffer中有效內(nèi)存結(jié)束為止查近,kvbuffer表示最后寫(xiě)入的一個(gè)完整key/value結(jié)束位置眉踱,具體寫(xiě)入過(guò)程中涉及的狀態(tài)和操作如下:
情況1:初始狀態(tài)。
初始狀態(tài)下霜威,bufstart=bufend=bufindex=bufmark=0谈喳,bufvoid=kvbuffer.length,如圖8-12所示戈泼。
情況2:寫(xiě)入一個(gè)key婿禽。
寫(xiě)入一個(gè)key后,需移動(dòng)bufindex指針到可寫(xiě)入內(nèi)存初始位置大猛,如圖8-13所示扭倾。
情況3:寫(xiě)入一個(gè)value。
寫(xiě)入key對(duì)應(yīng)的value后挽绩,除移動(dòng)bufindex指針外膛壹,還要移動(dòng)bufmark指針,表示已經(jīng)寫(xiě)入一個(gè)完整的key/value唉堪,具體如圖8-14所示恢筝。
情況4:不斷寫(xiě)入key/value,直到滿足溢寫(xiě)條件巨坊,即kvoffsets或者kvbuffer空間使用率超過(guò)io.sort.spill.percent(默認(rèn)值為80%)。此時(shí)需要將數(shù)據(jù)寫(xiě)到磁盤上此改,如圖8-15所示趾撵。
情況5:溢寫(xiě)。
如果達(dá)到溢寫(xiě)條件共啃,則令bufend←bufindex占调,并將緩沖區(qū)[bufstart, bufend)之間的數(shù)據(jù)寫(xiě)到磁盤上,具體如圖8-16所示移剪。
溢寫(xiě)完成之后究珊,恢復(fù)正常寫(xiě)入狀態(tài),令bufstart←bufend纵苛,如圖8-17所示剿涮。
在溢寫(xiě)的同時(shí)言津,Map Task仍可向kvbuffer中寫(xiě)入數(shù)據(jù),如圖8-18所示取试。
情況6:某個(gè)key或者value太大悬槽,以至于整個(gè)緩沖區(qū)不能容納它。
如果一條記錄的key或value太大瞬浓,整個(gè)緩沖區(qū)都不能容納它初婆,則Map Task會(huì)拋出MapBufferTooSmallException異常,并將該記錄單獨(dú)輸出到一個(gè)文件中猿棉。
前面提到磅叛,Map Task將可用的緩沖區(qū)空間io.sort.mb按照一定比例(由參數(shù)io.sort.record.percent決定)靜態(tài)分配給了kvoffsets、kvindices和kvbuffer三個(gè)緩沖區(qū)萨赁,而正如條件1所述弊琴,只要任何一個(gè)緩沖區(qū)的使用率達(dá)到一定比例,就會(huì)發(fā)生溢寫(xiě)現(xiàn)象位迂,即使另外的緩沖區(qū)使用率非常低访雪。因此,設(shè)置合理的io.sort.record.percent參數(shù)掂林,對(duì)于充分利用緩沖區(qū)空間和減少溢寫(xiě)次數(shù)臣缀,是十分必要的⌒喊铮考慮到每條數(shù)據(jù)(一個(gè)key/value對(duì))需占用索引大小為16 B精置,設(shè)置io.sort.record.percent:
io.sort.record.percent=16/(16+R)
其中R為平均每條記錄的長(zhǎng)度。
【實(shí)例】假設(shè)一個(gè)作業(yè)的Map Task輸入數(shù)據(jù)量和輸出數(shù)據(jù)量相同锣杂,每個(gè)Map Task輸入數(shù)據(jù)量大小為128 MB脂倦,且共有1 342 177條記錄,每條記錄大小約為100 B元莫,則需要索引大小為16*1 342 177=20.9 MB赖阻。根據(jù)這些信息,可設(shè)置參數(shù)如下:
根據(jù)這些信息踱蠢,可設(shè)置參數(shù)如下:
- io. sort.mb:128 MB+20.9 MB=148.9 MB
- io. sort.record.percent:16/(16+100)=0.138
- io. sort.spill.percent:1.0
這樣配置可保證數(shù)據(jù)只“落”一次地火欧,效率最高!當(dāng)然茎截,實(shí)際使用時(shí)可能很難達(dá)到這種情況苇侵,比如每個(gè)Map Task輸出數(shù)據(jù)量非常大,緩沖區(qū)難以全部容納它們企锌,但你至少可以設(shè)置合理的io.sort.record.percent以更充分地利用io.sort.mb并盡可能減少中間文件數(shù)目榆浓。
Spill過(guò)程分析
Spill過(guò)程由SpillThread線程完成。在前一小節(jié)中已經(jīng)提到撕攒,SpillThread線程實(shí)際上是緩沖區(qū)kvbuffer的消費(fèi)者陡鹃,其主要代碼如下:
spillLock.lock()烘浦;
while(true){
spillDone.signal();
while(kvstart==kvend){
spillReady.await()杉适;
}
spillLock.unlock()谎倔;
sortAndSpill();//排序猿推,然后將緩沖區(qū)kvbuffer中的數(shù)據(jù)寫(xiě)到磁盤上
spillLock.lock()片习;
//重置各個(gè)指針,以便為下一次溢寫(xiě)做準(zhǔn)備
if(bufend<bufindex&&bufindex<bufstart){
bufvoid=kvbuffer.length蹬叭;
}
vstart=kvend藕咏;
bufstart=bufend;
}
spillLock.unlock()秽五;
線程SpillThread調(diào)用函數(shù)sortAndSpill()將環(huán)形緩沖區(qū)kvbuffer中區(qū)間[bufstart, bufend)內(nèi)的數(shù)據(jù)寫(xiě)到磁盤上孽查。函數(shù)sortAndSpill()內(nèi)部工作流程如下:
步驟1 利用快速排序算法對(duì)緩沖區(qū)kvbuffer中區(qū)間[bufstart, bufend)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是坦喘,先按照分區(qū)編號(hào)partition進(jìn)行排序盲再,然后按照key進(jìn)行排序。這樣瓣铣,經(jīng)過(guò)排序后答朋,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序棠笑。
步驟2 按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫(xiě)入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當(dāng)前溢寫(xiě)次數(shù))中梦碗。如果用戶設(shè)置了Combiner,則寫(xiě)入文件之前蓖救,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作洪规。
步驟3 將分區(qū)數(shù)據(jù)的元信息寫(xiě)到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量循捺、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小斩例。如果當(dāng)前內(nèi)存中索引大小超過(guò)1 MB,則將內(nèi)存索引寫(xiě)到文件output/spillN.out.index中从橘。
Combine過(guò)程分析
當(dāng)所有數(shù)據(jù)處理完后樱拴,Map Task會(huì)將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件output/file.out中洋满,同時(shí)生成相應(yīng)的索引文件output/file.out.index。
在進(jìn)行文件合并過(guò)程中珍坊,Map Task以分區(qū)為單位進(jìn)行合并牺勾。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式:每輪合并io.sort.factor(默認(rèn)為100)個(gè)文件阵漏,并將產(chǎn)生的文件重新加入待合并列表中驻民,對(duì)文件排序后翻具,重復(fù)以上過(guò)程,直到最終得到一個(gè)大文件回还。讓每個(gè)Map Task最終只生成一個(gè)數(shù)據(jù)文件裆泳,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來(lái)的開(kāi)銷.
Reduce Task內(nèi)部實(shí)現(xiàn)
與Map Task一樣,Reduce Task也分為四種柠硕,即Job-setup Task, Job-cleanup Task, Task-cleanup Task和Reduce Task工禾。本節(jié)中重點(diǎn)介紹第四種——普通Reduce Task。Reduce Task要從各個(gè)Map Task上讀取一片數(shù)據(jù)蝗柔,經(jīng)排序后闻葵,以組為單位交給用戶編寫(xiě)的reduce()函數(shù)處理,并將結(jié)果寫(xiě)到HDFS上癣丧。本節(jié)將深入剖析Reduce Task內(nèi)部各個(gè)階段的實(shí)現(xiàn)原理槽畔。
Reduce Task整體流程
Reduce Task的整體計(jì)算流程如圖8-22所示,共分為5個(gè)階段胁编。
- Shuffle階段:也稱為Copy階段厢钧。Reduce Task從各個(gè)Map Task上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù)嬉橙,如果其大小超過(guò)一定閾值早直,則寫(xiě)到磁盤上,否則直接放到內(nèi)存中憎夷。
- Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)莽鸿,Reduce Task啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過(guò)多或磁盤上文件過(guò)多拾给。
- Sort階段:按照MapReduce語(yǔ)義祥得,用戶編寫(xiě)的reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起蒋得,Hadoop采用了基于排序的策略级及。由于各個(gè)Map Task已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此额衙,Reduce Task只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可饮焦。
- Reduce階段:在該階段中,Reduce Task將每組數(shù)據(jù)依次交給用戶編寫(xiě)的reduce()函數(shù)處理窍侧。
- Write階段:reduce()函數(shù)將計(jì)算結(jié)果寫(xiě)到HDFS上县踢。
在接下來(lái)幾小節(jié)中,我們將詳細(xì)介紹Shuffle伟件、Merge硼啤、Sort和Reduce四個(gè)階段「耍考慮到Write階段比較簡(jiǎn)單谴返,我們不再介紹煞肾。
Shuffle和Merge階段分析
在Reduce Task中,Shuffle階段和Merge階段是并行進(jìn)行的嗓袱。當(dāng)遠(yuǎn)程拷貝數(shù)據(jù)量達(dá)到一定閾值后籍救,便會(huì)觸發(fā)相應(yīng)的合并線程對(duì)數(shù)據(jù)進(jìn)行合并。這兩個(gè)階段均是由類ReduceCopier實(shí)現(xiàn)的渠抹,如圖8-23所示蝙昙,總體上看,Shuffle&Merge階段可進(jìn)一步劃分為三個(gè)子階段逼肯。
(1)準(zhǔn)備運(yùn)行完成的Map Task列表
GetMapEventsThread線程周期性通過(guò)RPC從TaskTracker獲取已完成Map Task列表耸黑,并保存到映射表mapLocations(保存了TaskTracker Host與已完成任務(wù)列表的映射關(guān)系)中。為防止出現(xiàn)網(wǎng)絡(luò)熱點(diǎn)篮幢,Reduce Task通過(guò)對(duì)所有TaskTracker Host進(jìn)行“混洗”操作以打亂數(shù)據(jù)拷貝順序大刊,并將調(diào)整后的Map Task輸出數(shù)據(jù)位置保存到scheduledCopies列表中。
(2)遠(yuǎn)程拷貝數(shù)據(jù)
Reduce Task同時(shí)啟動(dòng)多個(gè)MapOutputCopier線程三椿,這些線程從scheduledCopies列表中獲取Map Task輸出位置缺菌,并通過(guò)HTTP Get遠(yuǎn)程拷貝數(shù)據(jù)。對(duì)于獲取的數(shù)據(jù)分片搜锰,如果大小超過(guò)一定閾值伴郁,則存放到磁盤上,否則直接放到內(nèi)存中蛋叼。
(3)合并內(nèi)存文件和磁盤文件
為了防止內(nèi)存或者磁盤上的文件數(shù)據(jù)過(guò)多焊傅,Reduce Task啟動(dòng)了LocalFSMerger和InMemFSMergeThread兩個(gè)線程分別對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并。
接下來(lái)狈涮,我們將詳細(xì)剖析每個(gè)階段的內(nèi)部實(shí)現(xiàn)細(xì)節(jié)狐胎。
(1)準(zhǔn)備運(yùn)行完成的Map Task列表
我們知道TaskTracker啟動(dòng)了MapEventsFetcherThread線程。該線程會(huì)周期性(周期為心跳時(shí)間間隔)通過(guò)RPC從JobTracker上獲取已經(jīng)運(yùn)行完成的Map Task列表歌馍,并保存到TaskCompletionEvent類型列表allMapEvents中握巢。
而對(duì)于Reduce Task而言,它會(huì)啟動(dòng)GetMapEventsThread線程松却。該線程周期性通過(guò)RPC從TaskTracker上獲取已運(yùn)行完成的Map Task列表暴浦,并將成功運(yùn)行完成的Map Task放到列表mapLocations中,具體如圖8-24所示晓锻。
為了避免出現(xiàn)數(shù)據(jù)訪問(wèn)熱點(diǎn)(大量進(jìn)程集中讀取某個(gè)TaskTracker上的數(shù)據(jù))歌焦,Reduce Task不會(huì)直接將列表mapLocations中的Map Task輸出數(shù)據(jù)位置交給MapOutputCopier線程,而是事先進(jìn)行一次預(yù)處理:將所有TaskTracker Host進(jìn)行混洗操作(隨機(jī)打亂順序)砚哆,然后保存到scheduledCopies列表中独撇,而MapOutputCopier線程將從該列表中獲取待拷貝的Map Task輸出數(shù)據(jù)位置。需要注意的是,對(duì)于一個(gè)TaskTracker而言券勺,曾拷貝失敗的Map Task將優(yōu)先獲得拷貝機(jī)會(huì)。
(2)遠(yuǎn)程拷貝數(shù)據(jù)
Reduce Task同時(shí)啟動(dòng)mapred.reduce.parallel.copies(默認(rèn)是5)個(gè)數(shù)據(jù)拷貝線程MapOutputCopier灿里。該線程從scheduledCopies列表中獲取Map Task數(shù)據(jù)輸出描述對(duì)象关炼,并利用HTTP Get從對(duì)應(yīng)的TaskTracker遠(yuǎn)程拷貝數(shù)據(jù),如果數(shù)據(jù)分片大小超過(guò)一定閾值匣吊,則將數(shù)據(jù)臨時(shí)寫(xiě)到工作目錄下儒拂,否則直接保存到內(nèi)存中。不管是保存到內(nèi)存中還是磁盤上色鸳,MapOutputCopier均會(huì)保存一個(gè)MapOutput對(duì)象描述數(shù)據(jù)的元信息社痛。如果數(shù)據(jù)被保存到內(nèi)存中,則將該對(duì)象添加到列表mapOutputsFilesInMemory中命雀,否則將該對(duì)象保存到列表mapOutputFilesOnDisk中蒜哀。
在Reduce Task中,大部分內(nèi)存用于緩存從Map Task端拷貝的數(shù)據(jù)分片吏砂,這些內(nèi)存占到JVM Max Heap Size(由參數(shù)-Xmx指定)的mapred.job.shuffle.input.buffer.percent(默認(rèn)是0.70)倍撵儿,并由類ShuffleRamManager管理。Reduce Task規(guī)定狐血,如果一個(gè)數(shù)據(jù)分片大小未超過(guò)該內(nèi)存的0.25倍淀歇,則可存放到內(nèi)存中。如果MapOutputCopier線程要拷貝的數(shù)據(jù)分片可存放到內(nèi)存中匈织,則它先要向ShuffleRamManager申請(qǐng)相應(yīng)的內(nèi)存浪默,待同意后才會(huì)正式拷貝數(shù)據(jù),否則需要等待它釋放內(nèi)存缀匕。
由于遠(yuǎn)程拷貝數(shù)據(jù)可能需要跨網(wǎng)絡(luò)讀取多個(gè)節(jié)點(diǎn)上的數(shù)據(jù)纳决,期間很容易由于網(wǎng)絡(luò)或者磁盤等原因造成讀取失敗,因此提供良好的容錯(cuò)機(jī)制是非常有必要的弦追。當(dāng)出現(xiàn)拷貝錯(cuò)誤時(shí)岳链,Reduce Task提供了以下幾個(gè)容錯(cuò)機(jī)制:
如果拷貝數(shù)據(jù)出錯(cuò)次數(shù)超過(guò)abortFailureLimit,則殺死該Reduce Task(等待調(diào)度器重新調(diào)度執(zhí)行)劲件,其中掸哑,abortFailureLimit計(jì)算方法如下:
abortFailureLimit=max{30,numMaps/10}如果拷貝數(shù)據(jù)出錯(cuò)次數(shù)超過(guò)maxFetchFailuresBeforeReporting(可通過(guò)參數(shù)mapreduce.reduce.shuffle.maxfetchfailures設(shè)置零远,默認(rèn)是10)苗分,則進(jìn)行一些必要的檢查以決定是否殺死該Reduce Task。
如果前兩個(gè)條件均不滿足牵辣,則采用對(duì)數(shù)回歸模型推遲一段時(shí)間后重新拷貝對(duì)應(yīng)MapTask的輸出數(shù)據(jù)摔癣,其中延遲時(shí)間delayTime的計(jì)算方法如下:
delayTime=10 000×1. 3noFailedFetches
其中noFailedFetches為拷貝錯(cuò)誤次數(shù)。
(3)合并內(nèi)存文件和磁盤文件
前面提到,Reduce Task從Map Task端拷貝的數(shù)據(jù)择浊,可能保存到內(nèi)存或者磁盤上戴卜。隨著拷貝數(shù)據(jù)的增多,內(nèi)存或者磁盤上的文件數(shù)目也必將增加琢岩,為了減少文件數(shù)目投剥,在數(shù)據(jù)拷貝過(guò)程中,線程LocalFSMerger和InMemFSMergeThread將分別對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并担孔。
對(duì)于磁盤上文件江锨,當(dāng)文件數(shù)目超過(guò)(2*ioSortFactor-1)后(ioSortFactor值由參數(shù)io.sort.factor指定,默認(rèn)是10)糕篇,線程LocalFSMerger會(huì)從列表mapOutputFilesOnDisk中取出最小的ioSortFactor個(gè)文件進(jìn)行合并啄育,并將合并后的文件再次寫(xiě)到磁盤上。
對(duì)于內(nèi)存中的文件拌消,當(dāng)滿足以下幾個(gè)條件之一時(shí)挑豌,InMemFSMergeThread線程會(huì)將內(nèi)存中所有數(shù)據(jù)合并后寫(xiě)到磁盤上:
- 所有數(shù)據(jù)拷貝完畢后,關(guān)閉ShuffleRamManager拼坎。
- ShuffleRamManager中已使用內(nèi)存超過(guò)可用內(nèi)存的mapred.job.shuffle.merge.percent(默認(rèn)是66%)倍且內(nèi)存文件數(shù)目超過(guò)2個(gè)痘儡。
- 內(nèi)存中的文件數(shù)目超過(guò)mapred.inmem.merge.threshold(默認(rèn)是1 000)捌蚊。
Sort和Reduce階段分析
當(dāng)所有數(shù)據(jù)拷貝完成后黔衡,數(shù)據(jù)可能存放在內(nèi)存中或者磁盤上奏纪,此時(shí)還不能將數(shù)據(jù)直接交給用戶編寫(xiě)的reduce()函數(shù)處理。根據(jù)MapReduce語(yǔ)義盛龄,Reduce Task需將key值相同的數(shù)據(jù)聚集到一起饰迹,并按組將數(shù)據(jù)交給reduce()函數(shù)處理。為此余舶,Hadoop采用了基于排序的數(shù)據(jù)聚集策略啊鸭。前面提到,各個(gè)Map Task已經(jīng)事先對(duì)自己的輸出分片進(jìn)行了局部排序匿值,因此赠制,Reduce Task只需進(jìn)行一次歸并排序即可保證數(shù)據(jù)整體有序。為了提高效率挟憔,Hadoop將Sort階段和Reduce階段并行化钟些。在Sort階段,Reduce Task為內(nèi)存和磁盤中的文件建立了小頂堆绊谭,保存了指向該小頂堆根節(jié)點(diǎn)的迭代器政恍,且該迭代器保證了以下兩個(gè)約束條件:
- 磁盤上文件數(shù)目小于io.sort.factor(默認(rèn)是10)。
- 當(dāng)Reduce階段開(kāi)始時(shí)达传,內(nèi)存中數(shù)據(jù)量小于最大可用內(nèi)存(JVM Max Heap Size)的mapred.job.reduce.input.buffer.percent(默認(rèn)是0)
在Reduce階段篙耗,Reduce Task不斷地移動(dòng)迭代器迫筑,以將key相同的數(shù)據(jù)順次交給reduce()函數(shù)處理,期間移動(dòng)迭代器的過(guò)程實(shí)際上就是不斷調(diào)整小頂堆的過(guò)程宗弯,這樣脯燃,Sort和Reduce可并行進(jìn)行。
Map/Reduce Task優(yōu)化
參數(shù)調(diào)優(yōu)
由于參數(shù)調(diào)優(yōu)與應(yīng)用程序的特點(diǎn)直接相關(guān)蒙保,因此本小節(jié)僅列出了Map Task和Reduce Task中直接影響任務(wù)性能的一些可調(diào)整參數(shù)(見(jiàn)表8-4和表8-5)曲伊,具體調(diào)整為何值需由用戶根據(jù)作業(yè)特點(diǎn)自行決定。
考慮到Hadoop中用戶可配置參數(shù)非常多追他,為了簡(jiǎn)化參數(shù)配置,一些研究機(jī)構(gòu)嘗試自動(dòng)調(diào)優(yōu)參數(shù)岛蚤。
系統(tǒng)優(yōu)化
Shuffle階段內(nèi)部?jī)?yōu)化
(1)Map端——用Netty代替Jetty
1.0.0版本中邑狸,TaskTracker采用了Jetty服務(wù)器處理來(lái)自各個(gè)Reduce Task的數(shù)據(jù)讀取請(qǐng)求。由于Jetty采用了非常簡(jiǎn)單的網(wǎng)絡(luò)模型涤妒,因此性能比較低单雾。在Apache Hadoop 2.0.0版本中,Hadoop改用Netty她紫,它是另一種開(kāi)源的客戶/服務(wù)器端編程框架硅堆。由于它內(nèi)部采用了Java NIO技術(shù),相比Jetty更加高效贿讹,且Netty社區(qū)更加活躍渐逃,其穩(wěn)定性比Jetty好。
(2)Reduce端——批拷貝
1.0.0版本中民褂,在Shuffle過(guò)程中茄菊,Reduce Task會(huì)為每個(gè)數(shù)據(jù)分片建立一個(gè)專門的HTTP連接(One-connection-per-map),即使多個(gè)分片同時(shí)出現(xiàn)在一個(gè)TaskTracker上赊堪,也是如此面殖。為了提高數(shù)據(jù)拷貝效率,Apache Hadoop 2.0.0嘗試采用批拷貝技術(shù):不再為每個(gè)Map Task建立一個(gè)HTTP連接哭廉,而是為同一個(gè)TaskTracker上的多個(gè)Map Task建立一個(gè)HTTP連接脊僚,進(jìn)而能夠一次讀取多個(gè)數(shù)據(jù)分片,具體如圖8-25所示遵绰。
將Shuffle階段從Reduce Task中拆分出來(lái)
前面提到辽幌,對(duì)于一個(gè)作業(yè)而言,當(dāng)一定比例(默認(rèn)是5%)的Map Task運(yùn)行完成后街立,Reduce Task才開(kāi)始被調(diào)度舶衬,且僅當(dāng)所有Map Task運(yùn)行完成后,Reduce Task才可能運(yùn)行完成赎离。在所有Map Task運(yùn)行完成之前逛犹,已經(jīng)啟動(dòng)的Reduce Task將始終處于Shuffle階段,此時(shí)它們不斷從已經(jīng)完成的Map Task上遠(yuǎn)程拷貝中間處理結(jié)果,由于隨著時(shí)間推移虽画,不斷會(huì)有新的Map Task運(yùn)行完成舞蔽,因此Reduce Task會(huì)一直處于“等待—拷貝—等待—拷貝……”的狀態(tài)。待所有Map Task運(yùn)行完成后码撰,Reduce Task才可能將結(jié)果全部拷貝過(guò)來(lái)渗柿,這時(shí)候才能夠進(jìn)一步調(diào)用用戶編寫(xiě)的reduce()函數(shù)處理數(shù)據(jù)。從以上Reduce Task內(nèi)部運(yùn)行流程分析可知脖岛,Shuffle階段會(huì)帶來(lái)兩個(gè)問(wèn)題:slot Hoarding和資源利用率低下朵栖。
(1)Slot Hoarding現(xiàn)象
Slot Hoarding是一種資源囤積現(xiàn)象,具體表現(xiàn)是:對(duì)于任意一個(gè)MapReduce作業(yè)而言柴梆,在所有Map Task運(yùn)行完成之前陨溅,已經(jīng)啟動(dòng)的Reduce Task將一直占用著slot不釋放。Slot Hoarding可能會(huì)導(dǎo)致一些作業(yè)產(chǎn)生饑餓現(xiàn)象绍在。下面給出一個(gè)例子進(jìn)行說(shuō)明门扇。
【實(shí)例】如圖8-26所示,整個(gè)集群中有三個(gè)作業(yè)偿渡,分別是job1臼寄、job2和job3,其中溜宽,job1的Map Task數(shù)目非常多吉拳,而其他兩個(gè)作業(yè)的Map Task相對(duì)較少。在t0時(shí)刻适揉,job1和job2的Reduce Task開(kāi)始被調(diào)度合武;在t3時(shí)刻,job2的所有Map Task運(yùn)行完成涡扼,不久之后(t3'時(shí)刻)稼跳,job2的第一批Reduce Task運(yùn)行完成;在t4'時(shí)刻吃沪,job2所有Reduce Task運(yùn)行完成汤善;在t4時(shí)刻,job3的Map Task開(kāi)始運(yùn)行并在t7時(shí)刻運(yùn)行完成票彪,但由于此時(shí)所有Reduce slot均被job1占用著红淡,因此,除非job1的所有Map Task運(yùn)行完成降铸,否則job3的Reduce Task永遠(yuǎn)不可能得到調(diào)度在旱。
(2)資源利用率低下
從資源利用率角度看,為了保證較高的系統(tǒng)資源利用率推掸,所有Task都應(yīng)充分使用一個(gè)slot所隱含的資源桶蝎,包括內(nèi)存驻仅、CPU、I/O等資源登渣。然而噪服,對(duì)單個(gè)Reduce Task而言,在整個(gè)運(yùn)行過(guò)程中胜茧,它的資源利用率很不均衡粘优,總體上看,剛開(kāi)始它主要使用I/O資源(Shuffle階段)呻顽,之后主要使用CPU資源(Reduce階段)雹顺。如圖8-27所示,t4時(shí)刻之前廊遍,所有已經(jīng)啟動(dòng)的Reduce Task處于Shuffle階段无拗,此時(shí)主要使用網(wǎng)絡(luò)I/O和磁盤I/O資源,而在t4時(shí)刻之后昧碉,所有Map Task運(yùn)行完成,則第一批Reduce Task逐漸開(kāi)始進(jìn)入Reduce階段揽惹,此時(shí)主要消耗CPU資源被饿。由此可見(jiàn),Reduce Task運(yùn)行過(guò)程中使用的資源依次以I/O搪搏、CPU為主狭握,并沒(méi)有重疊使用這兩種資源,這使得系統(tǒng)整體資源利用率低下疯溺。
經(jīng)過(guò)以上分析可知论颅,I/O密集型的數(shù)據(jù)拷貝(Shuffle階段)和CPU密集型的數(shù)據(jù)計(jì)算(Reduce階段)緊耦合在一起是導(dǎo)致“Slot Hoarding”現(xiàn)象和系統(tǒng)資源利用率低下的主要原因。為了解決該問(wèn)題囱嫩,一種可行的解決方案是將Shuffle階段從Reduce Task中分離出來(lái)恃疯,當(dāng)前主要有以下兩種具體的實(shí)現(xiàn)方案。
Copy-Compute Splitting:這是Berkeley的一篇論文提出的解決方案墨闲。該方案從邏輯上將Reduce Task拆分成“Copy Task”和“Compute Task”今妄,其中,Copy Task用于數(shù)據(jù)拷貝鸳碧,而Compute Task用于數(shù)據(jù)計(jì)算(調(diào)用用戶編寫(xiě)的reduce()函數(shù)處理數(shù)據(jù))盾鳞。當(dāng)一個(gè)Copy Task運(yùn)行完成后,它會(huì)觸發(fā)一個(gè)Compute Task進(jìn)行數(shù)據(jù)計(jì)算瞻离,同時(shí)另外一個(gè)Copy Task將被啟動(dòng)拷貝另外的數(shù)據(jù)腾仅,從而實(shí)現(xiàn)I/O和CPU資源重疊使用。
將Shuffle階段變?yōu)楠?dú)立的服務(wù):將Shuffle階段從Reduce Task處理邏輯中出來(lái)變成為一個(gè)獨(dú)立的服務(wù)套利,不再讓其占用Reduce slot推励,這樣也可達(dá)到I/O和CPU資源重疊使用的目的鹤耍。“百度”曾采用了這一方案吹艇。
小結(jié)
本文將Map Task分解成Read惰蜜、Map、Collect受神、Spill和Combine五個(gè)階段抛猖,并詳細(xì)介紹了后三個(gè)階段:map()函數(shù)處理完結(jié)果后,Map Task會(huì)將處理結(jié)果存放到一個(gè)內(nèi)存緩沖區(qū)中(Collect階段)鼻听,待緩沖區(qū)使用率達(dá)到一定閾值后财著,再將數(shù)據(jù)溢寫(xiě)到磁盤上(Spill階段),而當(dāng)所有數(shù)據(jù)處理完后撑碴,Map Task會(huì)將磁盤上所有文件合并成一個(gè)大文件(Combine階段)撑教。這幾個(gè)階段形成的流水線如圖8-28所示。
本章將Reduce Task分解成Shuffle醉拓、Merge伟姐、Sort、Reduce和Write五個(gè)階段亿卤,且重點(diǎn)介紹了前三個(gè)階段:Reduce Task首先進(jìn)入Shuffle階段愤兵,在該階段中,它會(huì)啟動(dòng)若干個(gè)線程排吴,從各個(gè)完成的Map Task上拷貝數(shù)據(jù)秆乳,并將數(shù)據(jù)放到磁盤上或者內(nèi)存中,待文件數(shù)目超過(guò)一定閾值后進(jìn)行一次合并(Merge階段)钻哩,當(dāng)所有數(shù)據(jù)拷貝完成后屹堰,再對(duì)所有數(shù)據(jù)進(jìn)行一次排序(Sort階段),并將key相同的記錄分組依次交給reduce()函數(shù)處理街氢。這幾個(gè)階段形成的流水線如圖8-29所示扯键。
Hadoop MapReduce shuffle的特性:
Reducer從Map端拉取屬于自己Partition的數(shù)據(jù)時(shí),該P(yáng)artition的數(shù)據(jù)已經(jīng)在Map端排好序珊肃。Reducer將屬于它的所有的partition拉取過(guò)去后忧陪,進(jìn)行Reducer端的歸并排序(歸并排序的原因是Reducer會(huì)從多個(gè)Mapper拉取相應(yīng)的Partition,Reducer需要將所有這些Partition進(jìn)行排序)
如果客戶端定義了Combiner近范,那么在數(shù)據(jù)在排好序后嘶摊,會(huì)調(diào)用CombinerClass對(duì)數(shù)據(jù)已經(jīng)combine,然后才spill到磁盤评矩。這就是說(shuō)Sort操作在Combine操作之前執(zhí)行叶堆,而Partititon操作在Sort之前執(zhí)行,也就是Parttion->Sort->Combine的過(guò)程
reducer如何得知map已經(jīng)產(chǎn)生了一個(gè)分區(qū)的輸出文件斥杜?在Hadoop2中虱颗,mapper直接通知ApplicationMaster沥匈。在Hadoop1中,mapper通知TaskTracker忘渔,任務(wù)已經(jīng)執(zhí)行完成高帖,而TaskTracker則通知JobTracker,那么JobTracker則會(huì)通知Reducer已經(jīng)有Mapper任務(wù)執(zhí)行完成并且數(shù)據(jù)的位置在什么地方(會(huì)通知嗎畦粮?難道不是自己去TaskTracker查詢散址?)(此處可見(jiàn),JobTracker確實(shí)承擔(dān)了很多的職責(zé))
reducer拉取分區(qū)數(shù)據(jù)后宣赔,如果拉過(guò)來(lái)的數(shù)據(jù)量較小预麸,那么直接加載到內(nèi)存;如果較大儒将,則存放到磁盤上吏祸。這跟Mapper端的處理過(guò)程類似,此時(shí)Reducer的內(nèi)存大小是50M钩蚊,隨著拉取的數(shù)據(jù)越來(lái)越多贡翘,內(nèi)存容不下,Reducer開(kāi)啟Spill到磁盤操作
每個(gè)partition經(jīng)過(guò)Map后得到一個(gè)排序的文件砰逻,那么這個(gè)文件中的數(shù)據(jù)只被一個(gè)Reducer消費(fèi)還是被所有的Reducer消費(fèi)鸣驱?
是被所有的Reducer消費(fèi),也就是說(shuō)诱渤,一個(gè)Map輸出文件包含了很多個(gè)Partition,Reducer只關(guān)心屬于自己的Partition谈况。比如一個(gè)Map產(chǎn)生的最終輸出文件包含了3個(gè)Partition勺美,而每個(gè)Partition由對(duì)應(yīng)的reducer進(jìn)行消費(fèi)。
Mapreduce中Map與Reduce任務(wù)的個(gè)數(shù)
1碑韵、Map任務(wù)的個(gè)數(shù)
讀取數(shù)據(jù)產(chǎn)生多少個(gè)Mapper赡茸??
Mapper數(shù)據(jù)過(guò)大的話祝闻,會(huì)產(chǎn)生大量的小文件,過(guò)多的Mapper創(chuàng)建和初始化都會(huì)消耗大量的硬件資源占卧,Mapper數(shù)太小,并發(fā)度過(guò)小联喘,Job執(zhí)行時(shí)間過(guò)長(zhǎng)华蜒,無(wú)法充分利用分布式硬件資源
Mapper數(shù)量由什么決定?豁遭?
1)輸入文件數(shù)目(2)輸入文件的大邪认病(3)配置參數(shù) 這三個(gè)因素決定的。
輸入的目錄中文件的數(shù)量決定多少個(gè)map會(huì)被運(yùn)行起來(lái)蓖谢,應(yīng)用針對(duì)每一個(gè)分片運(yùn)行一個(gè)map捂蕴,一般而言譬涡,對(duì)于每一個(gè)輸入的文件會(huì)有一個(gè)map split。如果輸入文件太大啥辨,超過(guò)了hdfs塊的大形性取(128M)那么對(duì)于同一個(gè)輸入文件我們會(huì)有多余2個(gè)的map運(yùn)行起來(lái)。
涉及參數(shù):
mapreduce.input.fileinputformat.split.minsize //啟動(dòng)map最小的split size大小溉知,默認(rèn)0
mapreduce.input.fileinputformat.split.maxsize //啟動(dòng)map最大的split size大小陨瘩,默認(rèn)256M
dfs.block.size//block塊大小,默認(rèn)128M
計(jì)算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
下面是FileInputFormat class 的getSplits()的偽代碼:
num_splits = 0
for each input file f:
remaining = f.length
while remaining / split_size > split_slope:
num_splits += 1
remaining -= split_size
where:
split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小約等于hdfs塊大小
會(huì)有一個(gè)比例進(jìn)行運(yùn)算來(lái)進(jìn)行切片着倾,為了減少資源的浪費(fèi)
例如一個(gè)文件大小為260M拾酝,在進(jìn)行MapReduce運(yùn)算時(shí),會(huì)首先使用260M/128M卡者,得出的結(jié)果和1.1進(jìn)行比較
大于則切分出一個(gè)128M作為一個(gè)分片蒿囤,剩余132M,再次除以128崇决,得到結(jié)果為1.03材诽,小于1.1
則將132作為一個(gè)切片,即最終260M被切分為兩個(gè)切片進(jìn)行處理恒傻,而非3個(gè)切片脸侥。
2、reduce任務(wù)的個(gè)數(shù)
Reduce任務(wù)是一個(gè)數(shù)據(jù)聚合的步驟盈厘,數(shù)量默認(rèn)為1睁枕。而使用過(guò)多的Reduce任務(wù)則意味著復(fù)雜的shuffle,并使輸出文件的數(shù)量激增沸手。
一個(gè)job的ReduceTasks數(shù)量是通過(guò)mapreduce.job.reduces參數(shù)設(shè)置
也可以通過(guò)編程的方式外遇,調(diào)用Job對(duì)象的setNumReduceTasks()方法來(lái)設(shè)置
一個(gè)節(jié)點(diǎn)Reduce任務(wù)數(shù)量上限由mapreduce.tasktracker.reduce.tasks.maximum設(shè)置(默認(rèn)2)。
可以采用以下探試法來(lái)決定Reduce任務(wù)的合理數(shù)量:
1.每個(gè)reducer都可以在Map任務(wù)完成后立即執(zhí)行:
0.95 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum)
2.較快的節(jié)點(diǎn)在完成第一個(gè)Reduce任務(wù)后契吉,馬上執(zhí)行第二個(gè):
1.75 * (節(jié)點(diǎn)數(shù)量 * mapreduce.tasktracker.reduce.tasks.maximum)