MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。 概念"Map(映射)"和"Reduce(歸約)"惕味,是它們的主要思想扇调,都是從函數(shù)式編程語言里借來的榛了,還有從矢量編程語言里借來的特性在讶。 它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上忽冻。
一真朗、總覽
首先說下Hadoop 的四大組件: HDFS:分布式存儲系統(tǒng)。 MapReduce:分布式計算系統(tǒng)僧诚。 YARN: hadoop 的資源調(diào)度系統(tǒng)。 Common: 以上三大組件的底層支撐組件蝗碎,主要提供基礎(chǔ)工具包和 RPC 框架等湖笨。 Mapreduce 是一個分布式運算程序的編程框架,是用戶開發(fā)“基于 hadoop的數(shù)據(jù)分析 應(yīng)用”的核心框架蹦骑。 Mapreduce 核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序慈省,并發(fā)運行在一個 hadoop 集群上。
二眠菇、MapReduce作業(yè)運行流程
從整體層面上看边败,有五個獨立的實體: - 客戶端,提交 MapReduce 作業(yè)捎废。 - YARN 資源管理器(YARN resource manager)笑窜,負責(zé)協(xié)調(diào)集群上計算機資源的分配。 - YARN 節(jié)點管理器(YARN node manager)登疗,負責(zé)啟動和監(jiān)視集群中機器上的計算容器(container)排截。 - MapReduce的 application master,負責(zé)協(xié)調(diào)MapReduce 作業(yè)的任務(wù)辐益。MRAppMaster 和 MapReduce 任務(wù)運行在容器中断傲,該容器由資源管理器進行調(diào)度(schedule)[此處理解為劃分、分配更為合適] 且由節(jié)點管理器進行管理智政。 - 分布式文件系統(tǒng)(通常是 HDFS)认罩,用來在其他實體間共享作業(yè)文件。
作業(yè)提交(Job Submission)
在 Job 對象上面調(diào)用 submit() 方法续捂,在內(nèi)部創(chuàng)建一個 JobSubmitter 實例垦垂,然后調(diào)用該實例的 submitJobInternal() 方法(圖1步驟1)。如果使用waitForCompletion() 方法來進行提交作業(yè)疾忍,該方法每隔 1 秒輪詢作業(yè)的進度乔外,如果進度有所變化,將該進度報告給控制臺(console)一罩。當(dāng)作業(yè)成功完成杨幼,作業(yè)計數(shù)器被顯示出來。否則,導(dǎo)致作業(yè)失敗的錯誤被記錄到控制臺差购。
JobSubmitter所實現(xiàn)的作業(yè)提交過程如下:
- 向資源管理器(ResourceManager)請求一個 application ID四瘫,該 ID 被用作 MapReduce 作業(yè)的 ID(步驟2)。
- 檢查作業(yè)指定的輸出(output)目錄欲逃。例如找蜜,如果該輸出目錄沒有被指定或者已經(jīng)存在,作業(yè)不會被提交且一個錯誤被拋出給 MapReduce 程序
- 為作業(yè)計算輸入分片(input splits)稳析。如果分片不能被計算(可能因為輸入路徑(input paths)不存在)洗做,該作業(yè)不會被提交且一個錯誤被拋出給 MapReduce 程序。
- 拷貝作業(yè)運行必備的資源彰居,包括作業(yè) JAR 文件诚纸,配置文件以及計算的輸入分片,到一個以作業(yè) ID 命名的共享文件系統(tǒng)目錄中(步驟3)陈惰。作業(yè) JAR 文件以一個高副本因子(a high replication factor)進行拷貝(由 mapreduce.client.submit.file.replication 屬性控制畦徘,默認值為 10),所以在作業(yè)任務(wù)運行時抬闯,在集群中有很多的作業(yè) JAR 副本供節(jié)點管理器來訪問井辆。
- 通過在資源管理器(ResourceManager)上調(diào)用 submitApplication 來提交作業(yè)(步驟4)。
作業(yè)初始化(Job Initialization)
當(dāng)資源管理器接受到 submitApplication() 方法的調(diào)用溶握,它把請求遞交給 YARN 調(diào)度器(scheduler)杯缺。調(diào)度器分配了一個容器(container),資源管理器在該容器中啟動 application master 進程奈虾,該進程被節(jié)點管理器(NodeManager)管理(步驟5a 和 5b)夺谁。
MapReduce 作業(yè)的 application master 是一個 Java 應(yīng)用,它的主類是 MRAppMaster肉微。它通過創(chuàng)建一定數(shù)量的簿記對象(bookkeeping object)跟蹤作業(yè)進度來初始化作業(yè)(步驟6)匾鸥,該簿記對象接受任務(wù)報告的進度和完成情況。接下來碉纳,application master 從共享文件系統(tǒng)中獲取客戶端計算的輸入分片(步驟7)勿负。然后它為每個分片創(chuàng)建一個 map 任務(wù),同樣創(chuàng)建由 mapreduce.job.reduces 屬性控制的多個reduce 任務(wù)對象(或者在 Job 對象上通過 setNumReduceTasks() 方法設(shè)置)劳曹。任務(wù)ID在此時分配奴愉。
Applcation master 必須決定如何運行組成 MapReduce 作業(yè)的任務(wù)。如果作業(yè)比較小铁孵,application master 可能選擇在和它自身運行的 JVM 上運行這些任務(wù)锭硼。這種情況發(fā)生的前提是,application master 判斷分配和運行任務(wù)在一個新的容器上的開銷超過并行運行這些任務(wù)所帶來的回報蜕劝,據(jù)此和順序地在同一個節(jié)點上運行這些任務(wù)進行比較檀头。這樣的作業(yè)被稱為 uberized轰异,或者作為一個 uber 任務(wù)運行。
一個小的作業(yè)具有哪些資格暑始?默認的情況下搭独,它擁有少于 10 個 mapper,只有一個 reducer廊镜,且單個輸入的 size 小于 HDFS block 的牙肝。(注意,這些值可以通過 mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, mapreduce.job.ubertask.maxbytes 進行設(shè)置)嗤朴。Uber 任務(wù)必須顯示地將 mapreduce.job.ubertask.enable 設(shè)置為 true
最后配椭,在任何任務(wù)運行之前, application master 調(diào)用 OutputCommiter 的 setupJob() 方法雹姊。系統(tǒng)默認是使用 FileOutputCommiter颂郎,它為作業(yè)創(chuàng)建最終的輸出目錄和任務(wù)輸出創(chuàng)建臨時工作空間(temporary working space)。
任務(wù)分配(Task Assignment)
如果作業(yè)沒有資格作為 uber 任務(wù)來運行容为,那么 application master 為作業(yè)中的 map 任務(wù)和 reduce 任務(wù)向資源管理器(ResourceManager)請求容器(container)(步驟8)。首先要為 map 任務(wù)發(fā)送請求寺酪,該請求優(yōu)先級高于 reduce 任務(wù)的請求坎背,因為所有的 map 任務(wù)必須在 reduce 的排序階段(sort phase)能夠啟動之前完成。reduce 任務(wù)的請求至少有 5% 的 map 任務(wù)已經(jīng)完成才會發(fā)出(可配置)寄雀。
reduce 任務(wù)可以運行在集群中的任何地方得滤,但是 map 任務(wù)的請求有數(shù)據(jù)本地約束(data locality constraint),調(diào)度器盡力遵守該約束(try to honor)盒犹。在最佳的情況下懂更,任務(wù)的輸入是數(shù)據(jù)本地的(data local)-- 也就是任務(wù)運行在分片駐留的節(jié)點上〖卑颍或者沮协,任務(wù)可能是機架本地的(rack local),也就是和分片在同一個機架上卓嫂,而不是同一個節(jié)點上慷暂。有一些任務(wù)既不是數(shù)據(jù)本地的也不是機架本地的,該任務(wù)從不同機架上面獲取數(shù)據(jù)而不是任務(wù)本身運行的節(jié)點上晨雳。對于特定的作業(yè)行瑞,你可以通過查看作業(yè)計數(shù)器(job's counters)來確定任務(wù)的位置級別(locality level)。
請求也為任務(wù)指定內(nèi)存需求和 CPU 數(shù)量餐禁。默認血久,每個 map 和 recude 任務(wù)被分配 1024 MB的內(nèi)存和一個虛擬的核(virtual core)。這些值可以通過如下屬性(mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores, mapreduce.reduce.cpu.vcores)在每個作業(yè)基礎(chǔ)上進行配置(遵守 Memory settings in YARN and MapReduce 中描述的最小最大值)帮非。
任務(wù)執(zhí)行
一旦資源調(diào)度器在一個特定的節(jié)點上為一個任務(wù)分配一個容器所需的資源氧吐,application master 通過連接節(jié)點管理器來啟動這個容器(步驟9a 和9b)讹蘑。任務(wù)通過一個主類為 YarnChild 的 Java 應(yīng)用程序來執(zhí)行。在它運行任務(wù)之前副砍,它會將任務(wù)所需的資源本地化衔肢,包括作業(yè)配置,JAR 文件以及一些在分布式緩存中的文件(步驟10)豁翎。最后角骤,它運行 map 或者 reduce 任務(wù)(步驟11)。
YarnChild 在一個指定的 JVM 中運行心剥,所以任何用戶自定義的 map 和 reduce 函數(shù)的 bugs(或者甚至在 YarnChild)都不會影響到節(jié)點管理器(NodeManager) -- 比如造成節(jié)點管理的崩潰或者掛起邦尊。
每個任務(wù)能夠執(zhí)行計劃(setup)和提交(commit)動作,它們運行在和任務(wù)本身相同的 JVM 當(dāng)中优烧,由作業(yè)的 OutputCommiter 來確定蝉揍。對于基于文件的作業(yè),提交動作把任務(wù)的輸出從臨時位置移動到最終位置畦娄。提交協(xié)議確保當(dāng)推測執(zhí)行可用時又沾,在復(fù)制的任務(wù)中只有一個被提交,其他的都被取消掉熙卡。
進度和狀態(tài)的更新
MapReduce 作業(yè)是長時間運行的批處理作業(yè)(long-running batch jobs)杖刷,運行時間從幾十秒到幾小時。由于可能運行時間很長驳癌,所以用戶得到該作業(yè)的處理進度反饋是很重要的滑燃。
作業(yè)和任務(wù)都含有一個狀態(tài),包括運行狀態(tài)颓鲜、maps 和 reduces 的處理進度表窘,作業(yè)計數(shù)器的值,以及一個狀態(tài)消息或描述(可能在用戶代碼中設(shè)置)甜滨。這些狀態(tài)會在作業(yè)的過程中改變乐严。那么它是如何與客戶端進行通信的?
當(dāng)一個任務(wù)運行艳吠,它會保持進度的跟蹤(就是任務(wù)完成的比例)麦备。對于 map 任務(wù),就是被處理的輸入的比例昭娩。對于 reduce 任務(wù)凛篙,稍微復(fù)雜一點,但是系統(tǒng)任然能夠估算已處理的 reduce 輸入的比例栏渺。通過把整個過程分為三個部分呛梆,對應(yīng)于 shuffle 的三個階段。例如磕诊,如果一個任務(wù)運行 reducer 完成了一半的輸入填物,該任務(wù)的進度就是 5/6纹腌,因為它已經(jīng)完成了 copy 和 sort 階段(1/3 each)以及 reduce 階段完成了一半(1/6)。
MapReduce 的進度組成 進度不總是可測的滞磺,但是它告訴 Hadoop 一個任務(wù)在做的一些事情升薯。例如,任務(wù)的寫輸出記錄是有進度的击困,即使不能用總進度的百分比(因為它自己也可能不知道到底有多少輸出要寫涎劈,也可能不知道需要寫的總量)來表示進度報告非常重要,Hadoop 不會使一個報告進度的任務(wù)失斣牟琛(not fail a task that's making progress)蛛枚。如下的操作構(gòu)成了進度: - 讀取輸入記錄(在 mapper 或者 reducer 中)。 - 寫輸出記錄(在 mapper 或者 reducer 中)脸哀。 - 設(shè)置狀態(tài)描述(由 Reporter 的或 TaskAttempContext 的 setStatus() 方法設(shè)置)蹦浦。 - 計數(shù)器的增長(使用 Reporter 的 incrCounter() 方法 或者 Counter 的 increment() 方法)。 - 調(diào)用 Reporter 的或者 TaskAttemptContext 的 progress() 方法撞蜂。
任務(wù)有一些計數(shù)器盲镶,它們在任務(wù)運行時記錄各種事件,這些計數(shù)器要么是框架內(nèi)置的蝌诡,例如:已寫入的map輸出記錄數(shù)徒河,要么是用戶自定義的。
當(dāng) map 或 reduce 任務(wù)運行時送漠,子進程使用 umbilical 接口和父 application master 進行通信。任務(wù)每隔三秒鐘通過 umbilical 接口報告其進度和狀態(tài)(包括計數(shù)器)給 application master由蘑,application master會形成一個作業(yè)的聚合視圖闽寡。
在作業(yè)執(zhí)行的過程中,客戶端每秒通過輪詢 application master 獲取最新的狀態(tài)(間隔通過 mapreduce.client.progressmonitor.polinterval 設(shè)置)尼酿∫罚客戶端也可使用 Job 的 getStatus() 方法獲取一個包含作業(yè)所有狀態(tài)信息的 JobStatus 實例,過程如下:
作業(yè)完成(Job Completion)
當(dāng) application master 接受到最后一個任務(wù)完成的通知裳擎,它改變該作業(yè)的狀態(tài)為 “successful”涎永。當(dāng) Job 對象輪詢狀態(tài),它知道作業(yè)已經(jīng)成功完成鹿响,所以它打印一條消息告訴用戶以及從 waitForCompletion() 方法返回羡微。此時,作業(yè)的統(tǒng)計信息和計數(shù)器被打印到控制臺惶我。
Application master 也可以發(fā)送一條 HTTP 作業(yè)通知妈倔,如果配置了的話。當(dāng)客戶端想要接受回調(diào)時绸贡,可以通過 mapreduce.job.end-notification.url 屬性進行配置盯蝴。
最后毅哗,當(dāng)作業(yè)完成,application master 和作業(yè)容器清理他們的工作狀態(tài)(所以中間輸入會被刪除)捧挺,然后 OutputCommiter 的 commitJob() 方法被調(diào)用虑绵。作業(yè)的信息被作業(yè)歷史服務(wù)器存檔,以便日后用戶查詢闽烙。
三翅睛、 MapReduce計算流程
這里先給出官網(wǎng)上關(guān)于這個過程的經(jīng)典流程圖:
樹形圖如下:
Map
在進行海量數(shù)據(jù)處理時蒿往,外存文件數(shù)據(jù)I/O訪問會成為一個制約系統(tǒng)性能的瓶頸终畅,因此,Hadoop的Map過程實現(xiàn)的一個重要原則就是:計算靠近數(shù)據(jù)壹堰,這里主要指兩個方面
- 代碼靠近數(shù)據(jù):
原則:本地化數(shù)據(jù)處理(locality)摊溶,即一個計算節(jié)點盡可能處理本地磁盤上所存儲的數(shù)據(jù)爬骤;
盡量選擇數(shù)據(jù)所在DataNode啟動Map任務(wù);
這樣可以減少數(shù)據(jù)通信莫换,提高計算效率霞玄; -
數(shù)據(jù)靠近代碼:
當(dāng)本地沒有數(shù)據(jù)處理時,盡可能從同一機架或最近其他節(jié)點傳輸數(shù)據(jù)進行處理(host選擇算法)拉岁。
map-shuffle
輸入
- map task只讀取split分片坷剧,split與block(hdfs的最小存儲單位,默認為64MB)可能是一對一也能是一對多喊暖,但是對于一個split只會對應(yīng)一個文件的一個block或多個block惫企,不允許一個split對應(yīng)多個文件的多個block;
- 這里切分和輸入數(shù)據(jù)的時會涉及到InputFormat的文件切分算法和host選擇算法陵叽。
文件切分算法狞尔,主要用于確定InputSplit的個數(shù)以及每個InputSplit對應(yīng)的數(shù)據(jù)段。FileInputFormat以文件為單位切分生成InputSplit巩掺,對于每個文件偏序,由以下三個屬性值決定其對應(yīng)的InputSplit的個數(shù):
- goalSize: 它是根據(jù)用戶期望的InputSplit數(shù)目計算出來的,即totalSize/numSplits胖替。其中研儒,totalSize為文件的總大小独令;numSplits為用戶設(shè)定的Map Task個數(shù)端朵,默認情況下是1;
- minSize:InputSplit的最小值燃箭,由配置參數(shù)mapred.min.split.size確定逸月,默認是1;
- blockSize:文件在hdfs中存儲的block大小遍膜,不同文件可能不同碗硬,默認是64MB瓤湘。
這三個參數(shù)共同決定InputSplit的最終大小,計算方法如下:
splitSize=max{minSize, min{gogalSize,blockSize}}
Partition
- 作用:將map的結(jié)果發(fā)送到相應(yīng)的reduce端恩尾,總的partition的數(shù)目等于reducer的數(shù)量弛说。
- 實現(xiàn)功能:
- map輸出的是key/value對,決定于當(dāng)前的mapper的part交給哪個reduce的方法是:mapreduce提供的Partitioner接口翰意,對key進行hash后木人,再以reducetask數(shù)量取模,然后到指定的job上(HashPartitioner冀偶,可以通過
job.setPartitionerClass(MyPartition.class
)自定義)醒第。- 然后將數(shù)據(jù)寫入到內(nèi)存緩沖區(qū),緩沖區(qū)的作用是批量收集map結(jié)果进鸠,減少磁盤IO的影響稠曼。key/value對以及Partition的結(jié)果都會被寫入緩沖區(qū)。在寫入之前客年,key與value值都會被序列化成字節(jié)數(shù)組霞幅。
- 要求:負載均衡,效率量瓜;
spill(溢寫):sort & combiner
- 作用:把內(nèi)存緩沖區(qū)中的數(shù)據(jù)寫入到本地磁盤司恳,在寫入本地磁盤時先按照partition、再按照key進行排序(quick sort)绍傲;
- 注意:
- 這個spill是由另外單獨的線程來完成扔傅,不影響往緩沖區(qū)寫map結(jié)果的線程;
- 內(nèi)存緩沖區(qū)默認大小限制為100MB烫饼,它有個溢寫比例(spill.percent)铅鲤,默認為0.8,當(dāng)緩沖區(qū)的數(shù)據(jù)達到閾值時枫弟,溢寫線程就會啟動,先鎖定這80MB的內(nèi)存鹏往,執(zhí)行溢寫過程淡诗,maptask的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響伊履。然后再重新利用這塊緩沖區(qū)韩容,因此Map的內(nèi)存緩沖區(qū)又叫做環(huán)形緩沖區(qū)(兩個指針的方向不會變,下面會詳述)唐瀑;
- 在將數(shù)據(jù)寫入磁盤之前群凶,先要對要寫入磁盤的數(shù)據(jù)進行一次排序操作,先按<key,value,partition>中的partition分區(qū)號排序哄辣,然后再按key排序请梢,這個就是sort操作赠尾,最后溢出的小文件是分區(qū)的,且同一個分區(qū)內(nèi)是保證key有序的毅弧;
combine:
執(zhí)行combine操作要求開發(fā)者必須在程序中設(shè)置了combine(程序中通過job.setCombinerClass(myCombine.class
)自定義combine操作)气嫁。
- 程序中有兩個階段可能會執(zhí)行combine操作:
- map輸出數(shù)據(jù)根據(jù)分區(qū)排序完成后,在寫入文件之前會執(zhí)行一次combine操作(前提是作業(yè)中設(shè)置了這個操作)够坐;
- 如果map輸出比較大寸宵,溢出文件個數(shù)大于3(此值可以通過屬性min.num.spills.for.combine配置)時,在merge的過程(多個spill文件合并為一個大文件)中還會執(zhí)行combine操作元咙;
- combine主要是把形如<aa,1>,<aa,2>這樣的key值相同的數(shù)據(jù)進行計算梯影,計算規(guī)則與reduce一致,比如:當(dāng)前計算是求key對應(yīng)的值求和庶香,則combine操作后得到<aa,3>這樣的結(jié)果甲棍。
- 注意事項:不是每種作業(yè)都可以做combine操作的,只有滿足以下條件才可以:
- reduce的輸入輸出類型都一樣脉课,因為combine本質(zhì)上就是reduce操作救军;
- 計算邏輯上,combine操作后不會影響計算結(jié)果倘零,像求和就不會影響唱遭;
merge
- merge過程:當(dāng)map很大時,每次溢寫會產(chǎn)生一個spill_file呈驶,這樣會有多個spill_file拷泽,而最終的一個map task輸出只有一個文件,因此袖瞻,最終的結(jié)果輸出之前會對多個中間過程進行多次溢寫文件(spill_file)的合并司致,此過程就是merge過程。也即是聋迎,待Map Task任務(wù)的所有數(shù)據(jù)都處理完后脂矫,會對任務(wù)產(chǎn)生的所有中間數(shù)據(jù)文件做一次合并操作,以確保一個Map Task最終只生成一個中間數(shù)據(jù)文件霉晕。
- 注意:
- 如果生成的文件太多庭再,可能會執(zhí)行多次合并,每次最多能合并的文件數(shù)默認為10牺堰,可以通過屬性min.num.spills.for.combine配置拄轻;
- 多個溢出文件合并時,會進行一次排序伟葫,排序算法是多路歸并排序恨搓;
- 是否還需要做combine操作,一是看是否設(shè)置了combine,二是看溢出的文件數(shù)是否大于等于3斧抱;
- 最終生成的文件格式與單個溢出文件一致常拓,也是按分區(qū)順序存儲,并且輸出文件會有一個對應(yīng)的索引文件夺姑,記錄每個分區(qū)數(shù)據(jù)的起始位置墩邀,長度以及壓縮長度,這個索引文件名叫做file.out.index盏浙。
內(nèi)存緩沖區(qū)
- 在Map Task任務(wù)的業(yè)務(wù)處理方法map()中眉睹,最后一步通過OutputCollector.collect(key,value)或context.write(key,value)輸出Map Task的中間處理結(jié)果,在相關(guān)的collect(key,value)方法中废膘,會調(diào)用Partitioner.getPartition(K2 key, V2 value, int numPartitions)方法獲得輸出的key/value對應(yīng)的分區(qū)號(分區(qū)號可以認為對應(yīng)著一個要執(zhí)行Reduce Task的節(jié)點)竹海,然后將<key,value,partition>暫時保存在內(nèi)存中的MapOutputBuffe內(nèi)部的環(huán)形數(shù)據(jù)緩沖區(qū),該緩沖區(qū)的默認大小是100MB丐黄,可以通過參數(shù)io.sort.mb來調(diào)整其大小斋配。
- 當(dāng)緩沖區(qū)中的數(shù)據(jù)使用率達到一定閥值后,觸發(fā)一次Spill操作灌闺,將環(huán)形緩沖區(qū)中的部分數(shù)據(jù)寫到磁盤上艰争,生成一個臨時的Linux本地數(shù)據(jù)的spill文件;然后在緩沖區(qū)的使用率再次達到閥值后桂对,再次生成一個spill文件甩卓。直到數(shù)據(jù)處理完畢,在磁盤上會生成很多的臨時文件蕉斜。
- 緩存有一個閥值比例配置逾柿,當(dāng)達到整個緩存的這個比例時,會觸發(fā)spill操作宅此;觸發(fā)時机错,map輸出還會接著往剩下的空間寫入,但是寫滿的空間會被鎖定父腕,數(shù)據(jù)溢出寫入磁盤弱匪。當(dāng)這部分溢出的數(shù)據(jù)寫完后,空出的內(nèi)存空間可以接著被使用璧亮,形成像環(huán)一樣的被循環(huán)使用的效果萧诫,所以又叫做環(huán)形內(nèi)存緩沖區(qū);
-
MapOutputBuffer內(nèi)部存數(shù)的數(shù)據(jù)采用了兩個索引結(jié)構(gòu)杜顺,涉及三個環(huán)形內(nèi)存緩沖區(qū)。下來看一下兩級索引結(jié)構(gòu):
buffer
這三個環(huán)形緩沖區(qū)的含義分別如下:
- kvoffsets緩沖區(qū):也叫偏移量索引數(shù)組蘸炸,用于保存key/value信息在位置索引 kvindices 中的偏移量躬络。當(dāng) kvoffsets 的使用率超過 io.sort.spill.percent (默認為80%)后,便會觸發(fā)一次 SpillThread 線程的“溢寫”操作搭儒,也就是開始一次 Spill 階段的操作穷当。
- kvindices緩沖區(qū):也叫位置索引數(shù)組提茁,用于保存 key/value 在數(shù)據(jù)緩沖區(qū) kvbuffer 中的起始位置。
- kvbuffer即數(shù)據(jù)緩沖區(qū):用于保存實際的 key/value 的值馁菜。默認情況下該緩沖區(qū)最多可以使用 io.sort.mb 的95%茴扁,當(dāng) kvbuffer 使用率超過 io.sort.spill.percent (默認為80%)后,便會觸發(fā)一次 SpillThread 線程的“溢寫”操作汪疮,也就是開始一次 Spill 階段的操作峭火。
寫入到本地磁盤時,對數(shù)據(jù)進行排序智嚷,實際上是對kvoffsets這個偏移量索引數(shù)組進行排序卖丸。
Reduce
Reduce過程的經(jīng)典流程圖如下:
copy過程
- 作用:拉取數(shù)據(jù);
- 過程:Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher)盏道,通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件稍浆。因為這時map task早已結(jié)束,這些文件就歸NodeManager管理在本地磁盤中猜嘱。
- 默認情況下衅枫,當(dāng)整個MapReduce作業(yè)的所有已執(zhí)行完成的Map Task任務(wù)數(shù)超過Map Task總數(shù)的5%后,JobTracker便會開始調(diào)度執(zhí)行Reduce Task任務(wù)朗伶。然后Reduce Task任務(wù)默認啟動mapred.reduce.parallel.copies(默認為5)個MapOutputCopier線程到已完成的Map Task任務(wù)節(jié)點上分別copy一份屬于自己的數(shù)據(jù)弦撩。 這些copy的數(shù)據(jù)會首先保存的內(nèi)存緩沖區(qū)中,當(dāng)內(nèi)沖緩沖區(qū)的使用率達到一定閥值后腕让,則寫到磁盤上孤钦。
內(nèi)存緩沖區(qū)
- 這個內(nèi)存緩沖區(qū)大小的控制就不像map那樣可以通過io.sort.mb來設(shè)定了,而是通過另外一個參數(shù)來設(shè)置:mapred.job.shuffle.input.buffer.percent(default 0.7)纯丸, 這個參數(shù)其實是一個百分比偏形,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task觉鼻。
- 如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設(shè)置俊扭,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)。默認情況下坠陈,reduce會使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)萨惑。如果reduce的heap由于業(yè)務(wù)原因調(diào)整的比較大,相應(yīng)的緩存大小也會變大仇矾,這也是為什么reduce用來做緩存的參數(shù)是一個百分比庸蔼,而不是一個固定的值了。
merge過程
- Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中贮匕,這里的緩沖區(qū)大小要比 map 端的更為靈活姐仅,它基于 JVM 的heap size設(shè)置,因為 Shuffle 階段 Reducer 不運行,所以應(yīng)該把絕大部分的內(nèi)存都給 Shuffle 用掏膏。
- 這里需要強調(diào)的是劳翰,merge 有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤。默認情況下第一種形式是不啟用的馒疹。當(dāng)內(nèi)存中的數(shù)據(jù)量到達一定閾值佳簸,就啟動內(nèi)存到磁盤的 merge(圖中的第一個merge,之所以進行merge是因為reduce端在從多個map端copy數(shù)據(jù)的時候颖变,并沒有進行sort生均,只是把它們加載到內(nèi)存,當(dāng)達到閾值寫入磁盤時悼做,需要進行merge) 疯特。這和map端的很類似,這實際上就是溢寫的過程肛走,在這個過程中如果你設(shè)置有Combiner漓雅,它也是會啟用的,然后在磁盤中生成了眾多的溢寫文件朽色,這種merge方式一直在運行邻吞,直到?jīng)]有 map 端的數(shù)據(jù)時才結(jié)束,然后才會啟動第三種磁盤到磁盤的 merge (圖中的第二個merge)方式生成最終的那個文件葫男。
- 在遠程copy數(shù)據(jù)的同時抱冷,Reduce Task在后臺啟動了兩個后臺線程對內(nèi)存和磁盤上的數(shù)據(jù)文件做合并操作,以防止內(nèi)存使用過多或磁盤生的文件過多梢褐。
reducer的輸入文件
- merge的最后會生成一個文件旺遮,大多數(shù)情況下存在于磁盤中,但是需要將其放入內(nèi)存中盈咳。當(dāng)reducer 輸入文件已定耿眉,整個 Shuffle 階段才算結(jié)束。然后就是 Reducer 執(zhí)行鱼响,把結(jié)果放到 HDFS 上鸣剪。
小結(jié):
- shuffle是mapreduce優(yōu)化的重點地方;
- 環(huán)形內(nèi)存緩沖區(qū) :因此map寫入磁盤的過程十分的復(fù)雜丈积,更何況map輸出時候要對結(jié)果進行排序筐骇,內(nèi)存開銷是很大的,所以開啟環(huán)形內(nèi)存緩沖區(qū)專門用于輸出江滨;默認是100MB铛纬,閾值是0.8;
- spill(溢寫):緩沖區(qū)>80%唬滑,寫入磁盤告唆;溢寫前先排序莫秆,后合并,寫入磁盤悔详;
- Partition:Partitioner操作和map階段的輸入分片(Input split)很像,Partitioner會找到對應(yīng)的map輸出文件惹挟,然后進行復(fù)制操作茄螃,作為reduce的輸入;
- reduce階段:和map函數(shù)一樣也是程序員編寫的连锯,最終結(jié)果是存儲在hdfs上的归苍。
Split是怎么劃分的? 參考FileInputFormat類中split切分算法和host選擇算法介紹
參考:
MapReduce 過程詳解:https://www.cnblogs.com/npumenglei/p/3631244.html
MapReduce之Shuffle過程詳述:http://matt33.com/2016/03/02/hadoop-shuffle/
MapReduce過程簡析:http://www.reibang.com/p/0ec306605df8
MapReduce執(zhí)行過程:https://zhuanlan.zhihu.com/p/45305945