冰解的破-MapReduce

MapReduce

MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。 概念"Map(映射)"和"Reduce(歸約)"惕味,是它們的主要思想扇调,都是從函數(shù)式編程語言里借來的榛了,還有從矢量編程語言里借來的特性在讶。 它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上忽冻。

一真朗、總覽

首先說下Hadoop 的四大組件: HDFS:分布式存儲系統(tǒng)。 MapReduce:分布式計算系統(tǒng)僧诚。 YARN: hadoop 的資源調(diào)度系統(tǒng)。 Common: 以上三大組件的底層支撐組件蝗碎,主要提供基礎(chǔ)工具包和 RPC 框架等湖笨。 Mapreduce 是一個分布式運算程序的編程框架,是用戶開發(fā)“基于 hadoop的數(shù)據(jù)分析 應(yīng)用”的核心框架蹦骑。 Mapreduce 核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序慈省,并發(fā)運行在一個 hadoop 集群上。

二眠菇、MapReduce作業(yè)運行流程


作業(yè)運行結(jié)構(gòu)

從整體層面上看边败,有五個獨立的實體: - 客戶端,提交 MapReduce 作業(yè)捎废。 - YARN 資源管理器(YARN resource manager)笑窜,負責(zé)協(xié)調(diào)集群上計算機資源的分配。 - YARN 節(jié)點管理器(YARN node manager)登疗,負責(zé)啟動和監(jiān)視集群中機器上的計算容器(container)排截。 - MapReduce的 application master,負責(zé)協(xié)調(diào)MapReduce 作業(yè)的任務(wù)辐益。MRAppMaster 和 MapReduce 任務(wù)運行在容器中断傲,該容器由資源管理器進行調(diào)度(schedule)[此處理解為劃分、分配更為合適] 且由節(jié)點管理器進行管理智政。 - 分布式文件系統(tǒng)(通常是 HDFS)认罩,用來在其他實體間共享作業(yè)文件。

作業(yè)提交(Job Submission)

在 Job 對象上面調(diào)用 submit() 方法续捂,在內(nèi)部創(chuàng)建一個 JobSubmitter 實例垦垂,然后調(diào)用該實例的 submitJobInternal() 方法(圖1步驟1)。如果使用waitForCompletion() 方法來進行提交作業(yè)疾忍,該方法每隔 1 秒輪詢作業(yè)的進度乔外,如果進度有所變化,將該進度報告給控制臺(console)一罩。當(dāng)作業(yè)成功完成杨幼,作業(yè)計數(shù)器被顯示出來。否則,導(dǎo)致作業(yè)失敗的錯誤被記錄到控制臺差购。

JobSubmitter所實現(xiàn)的作業(yè)提交過程如下:

  1. 向資源管理器(ResourceManager)請求一個 application ID四瘫,該 ID 被用作 MapReduce 作業(yè)的 ID(步驟2)。
  2. 檢查作業(yè)指定的輸出(output)目錄欲逃。例如找蜜,如果該輸出目錄沒有被指定或者已經(jīng)存在,作業(yè)不會被提交且一個錯誤被拋出給 MapReduce 程序
  3. 為作業(yè)計算輸入分片(input splits)稳析。如果分片不能被計算(可能因為輸入路徑(input paths)不存在)洗做,該作業(yè)不會被提交且一個錯誤被拋出給 MapReduce 程序。
  4. 拷貝作業(yè)運行必備的資源彰居,包括作業(yè) JAR 文件诚纸,配置文件以及計算的輸入分片,到一個以作業(yè) ID 命名的共享文件系統(tǒng)目錄中(步驟3)陈惰。作業(yè) JAR 文件以一個高副本因子(a high replication factor)進行拷貝(由 mapreduce.client.submit.file.replication 屬性控制畦徘,默認值為 10),所以在作業(yè)任務(wù)運行時抬闯,在集群中有很多的作業(yè) JAR 副本供節(jié)點管理器來訪問井辆。
  5. 通過在資源管理器(ResourceManager)上調(diào)用 submitApplication 來提交作業(yè)(步驟4)。

作業(yè)初始化(Job Initialization)

當(dāng)資源管理器接受到 submitApplication() 方法的調(diào)用溶握,它把請求遞交給 YARN 調(diào)度器(scheduler)杯缺。調(diào)度器分配了一個容器(container),資源管理器在該容器中啟動 application master 進程奈虾,該進程被節(jié)點管理器(NodeManager)管理(步驟5a 和 5b)夺谁。

MapReduce 作業(yè)的 application master 是一個 Java 應(yīng)用,它的主類是 MRAppMaster肉微。它通過創(chuàng)建一定數(shù)量的簿記對象(bookkeeping object)跟蹤作業(yè)進度來初始化作業(yè)(步驟6)匾鸥,該簿記對象接受任務(wù)報告的進度和完成情況。接下來碉纳,application master 從共享文件系統(tǒng)中獲取客戶端計算的輸入分片(步驟7)勿负。然后它為每個分片創(chuàng)建一個 map 任務(wù),同樣創(chuàng)建由 mapreduce.job.reduces 屬性控制的多個reduce 任務(wù)對象(或者在 Job 對象上通過 setNumReduceTasks() 方法設(shè)置)劳曹。任務(wù)ID在此時分配奴愉。

Applcation master 必須決定如何運行組成 MapReduce 作業(yè)的任務(wù)。如果作業(yè)比較小铁孵,application master 可能選擇在和它自身運行的 JVM 上運行這些任務(wù)锭硼。這種情況發(fā)生的前提是,application master 判斷分配和運行任務(wù)在一個新的容器上的開銷超過并行運行這些任務(wù)所帶來的回報蜕劝,據(jù)此和順序地在同一個節(jié)點上運行這些任務(wù)進行比較檀头。這樣的作業(yè)被稱為 uberized轰异,或者作為一個 uber 任務(wù)運行。

一個小的作業(yè)具有哪些資格暑始?默認的情況下搭独,它擁有少于 10 個 mapper,只有一個 reducer廊镜,且單個輸入的 size 小于 HDFS block 的牙肝。(注意,這些值可以通過 mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, mapreduce.job.ubertask.maxbytes 進行設(shè)置)嗤朴。Uber 任務(wù)必須顯示地將 mapreduce.job.ubertask.enable 設(shè)置為 true

最后配椭,在任何任務(wù)運行之前, application master 調(diào)用 OutputCommiter 的 setupJob() 方法雹姊。系統(tǒng)默認是使用 FileOutputCommiter颂郎,它為作業(yè)創(chuàng)建最終的輸出目錄和任務(wù)輸出創(chuàng)建臨時工作空間(temporary working space)。

任務(wù)分配(Task Assignment)

如果作業(yè)沒有資格作為 uber 任務(wù)來運行容为,那么 application master 為作業(yè)中的 map 任務(wù)和 reduce 任務(wù)向資源管理器(ResourceManager)請求容器(container)(步驟8)。首先要為 map 任務(wù)發(fā)送請求寺酪,該請求優(yōu)先級高于 reduce 任務(wù)的請求坎背,因為所有的 map 任務(wù)必須在 reduce 的排序階段(sort phase)能夠啟動之前完成。reduce 任務(wù)的請求至少有 5% 的 map 任務(wù)已經(jīng)完成才會發(fā)出(可配置)寄雀。

reduce 任務(wù)可以運行在集群中的任何地方得滤,但是 map 任務(wù)的請求有數(shù)據(jù)本地約束(data locality constraint),調(diào)度器盡力遵守該約束(try to honor)盒犹。在最佳的情況下懂更,任務(wù)的輸入是數(shù)據(jù)本地的(data local)-- 也就是任務(wù)運行在分片駐留的節(jié)點上〖卑颍或者沮协,任務(wù)可能是機架本地的(rack local),也就是和分片在同一個機架上卓嫂,而不是同一個節(jié)點上慷暂。有一些任務(wù)既不是數(shù)據(jù)本地的也不是機架本地的,該任務(wù)從不同機架上面獲取數(shù)據(jù)而不是任務(wù)本身運行的節(jié)點上晨雳。對于特定的作業(yè)行瑞,你可以通過查看作業(yè)計數(shù)器(job's counters)來確定任務(wù)的位置級別(locality level)。

請求也為任務(wù)指定內(nèi)存需求和 CPU 數(shù)量餐禁。默認血久,每個 map 和 recude 任務(wù)被分配 1024 MB的內(nèi)存和一個虛擬的核(virtual core)。這些值可以通過如下屬性(mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores, mapreduce.reduce.cpu.vcores)在每個作業(yè)基礎(chǔ)上進行配置(遵守 Memory settings in YARN and MapReduce 中描述的最小最大值)帮非。

任務(wù)執(zhí)行

一旦資源調(diào)度器在一個特定的節(jié)點上為一個任務(wù)分配一個容器所需的資源氧吐,application master 通過連接節(jié)點管理器來啟動這個容器(步驟9a 和9b)讹蘑。任務(wù)通過一個主類為 YarnChild 的 Java 應(yīng)用程序來執(zhí)行。在它運行任務(wù)之前副砍,它會將任務(wù)所需的資源本地化衔肢,包括作業(yè)配置,JAR 文件以及一些在分布式緩存中的文件(步驟10)豁翎。最后角骤,它運行 map 或者 reduce 任務(wù)(步驟11)。

YarnChild 在一個指定的 JVM 中運行心剥,所以任何用戶自定義的 map 和 reduce 函數(shù)的 bugs(或者甚至在 YarnChild)都不會影響到節(jié)點管理器(NodeManager) -- 比如造成節(jié)點管理的崩潰或者掛起邦尊。

每個任務(wù)能夠執(zhí)行計劃(setup)和提交(commit)動作,它們運行在和任務(wù)本身相同的 JVM 當(dāng)中优烧,由作業(yè)的 OutputCommiter 來確定蝉揍。對于基于文件的作業(yè),提交動作把任務(wù)的輸出從臨時位置移動到最終位置畦娄。提交協(xié)議確保當(dāng)推測執(zhí)行可用時又沾,在復(fù)制的任務(wù)中只有一個被提交,其他的都被取消掉熙卡。

進度和狀態(tài)的更新

MapReduce 作業(yè)是長時間運行的批處理作業(yè)(long-running batch jobs)杖刷,運行時間從幾十秒到幾小時。由于可能運行時間很長驳癌,所以用戶得到該作業(yè)的處理進度反饋是很重要的滑燃。

作業(yè)和任務(wù)都含有一個狀態(tài),包括運行狀態(tài)颓鲜、maps 和 reduces 的處理進度表窘,作業(yè)計數(shù)器的值,以及一個狀態(tài)消息或描述(可能在用戶代碼中設(shè)置)甜滨。這些狀態(tài)會在作業(yè)的過程中改變乐严。那么它是如何與客戶端進行通信的?

當(dāng)一個任務(wù)運行艳吠,它會保持進度的跟蹤(就是任務(wù)完成的比例)麦备。對于 map 任務(wù),就是被處理的輸入的比例昭娩。對于 reduce 任務(wù)凛篙,稍微復(fù)雜一點,但是系統(tǒng)任然能夠估算已處理的 reduce 輸入的比例栏渺。通過把整個過程分為三個部分呛梆,對應(yīng)于 shuffle 的三個階段。例如磕诊,如果一個任務(wù)運行 reducer 完成了一半的輸入填物,該任務(wù)的進度就是 5/6纹腌,因為它已經(jīng)完成了 copy 和 sort 階段(1/3 each)以及 reduce 階段完成了一半(1/6)。

MapReduce 的進度組成 進度不總是可測的滞磺,但是它告訴 Hadoop 一個任務(wù)在做的一些事情升薯。例如,任務(wù)的寫輸出記錄是有進度的击困,即使不能用總進度的百分比(因為它自己也可能不知道到底有多少輸出要寫涎劈,也可能不知道需要寫的總量)來表示進度報告非常重要,Hadoop 不會使一個報告進度的任務(wù)失斣牟琛(not fail a task that's making progress)蛛枚。如下的操作構(gòu)成了進度: - 讀取輸入記錄(在 mapper 或者 reducer 中)。 - 寫輸出記錄(在 mapper 或者 reducer 中)脸哀。 - 設(shè)置狀態(tài)描述(由 Reporter 的或 TaskAttempContext 的 setStatus() 方法設(shè)置)蹦浦。 - 計數(shù)器的增長(使用 Reporter 的 incrCounter() 方法 或者 Counter 的 increment() 方法)。 - 調(diào)用 Reporter 的或者 TaskAttemptContext 的 progress() 方法撞蜂。

任務(wù)有一些計數(shù)器盲镶,它們在任務(wù)運行時記錄各種事件,這些計數(shù)器要么是框架內(nèi)置的蝌诡,例如:已寫入的map輸出記錄數(shù)徒河,要么是用戶自定義的。

當(dāng) map 或 reduce 任務(wù)運行時送漠,子進程使用 umbilical 接口和父 application master 進行通信。任務(wù)每隔三秒鐘通過 umbilical 接口報告其進度和狀態(tài)(包括計數(shù)器)給 application master由蘑,application master會形成一個作業(yè)的聚合視圖闽寡。

在作業(yè)執(zhí)行的過程中,客戶端每秒通過輪詢 application master 獲取最新的狀態(tài)(間隔通過 mapreduce.client.progressmonitor.polinterval 設(shè)置)尼酿∫罚客戶端也可使用 Job 的 getStatus() 方法獲取一個包含作業(yè)所有狀態(tài)信息的 JobStatus 實例,過程如下:

狀態(tài)查詢

作業(yè)完成(Job Completion)

當(dāng) application master 接受到最后一個任務(wù)完成的通知裳擎,它改變該作業(yè)的狀態(tài)為 “successful”涎永。當(dāng) Job 對象輪詢狀態(tài),它知道作業(yè)已經(jīng)成功完成鹿响,所以它打印一條消息告訴用戶以及從 waitForCompletion() 方法返回羡微。此時,作業(yè)的統(tǒng)計信息和計數(shù)器被打印到控制臺惶我。

Application master 也可以發(fā)送一條 HTTP 作業(yè)通知妈倔,如果配置了的話。當(dāng)客戶端想要接受回調(diào)時绸贡,可以通過 mapreduce.job.end-notification.url 屬性進行配置盯蝴。

最后毅哗,當(dāng)作業(yè)完成,application master 和作業(yè)容器清理他們的工作狀態(tài)(所以中間輸入會被刪除)捧挺,然后 OutputCommiter 的 commitJob() 方法被調(diào)用虑绵。作業(yè)的信息被作業(yè)歷史服務(wù)器存檔,以便日后用戶查詢闽烙。

三翅睛、 MapReduce計算流程


這里先給出官網(wǎng)上關(guān)于這個過程的經(jīng)典流程圖:


官方流程圖

樹形圖如下:

MR處理結(jié)構(gòu)

Map

在進行海量數(shù)據(jù)處理時蒿往,外存文件數(shù)據(jù)I/O訪問會成為一個制約系統(tǒng)性能的瓶頸终畅,因此,Hadoop的Map過程實現(xiàn)的一個重要原則就是:計算靠近數(shù)據(jù)壹堰,這里主要指兩個方面

  1. 代碼靠近數(shù)據(jù):
    原則:本地化數(shù)據(jù)處理(locality)摊溶,即一個計算節(jié)點盡可能處理本地磁盤上所存儲的數(shù)據(jù)爬骤;
    盡量選擇數(shù)據(jù)所在DataNode啟動Map任務(wù);
    這樣可以減少數(shù)據(jù)通信莫换,提高計算效率霞玄;
  2. 數(shù)據(jù)靠近代碼:
    當(dāng)本地沒有數(shù)據(jù)處理時,盡可能從同一機架或最近其他節(jié)點傳輸數(shù)據(jù)進行處理(host選擇算法)拉岁。


    map-shuffle

輸入

  1. map task只讀取split分片坷剧,split與block(hdfs的最小存儲單位,默認為64MB)可能是一對一也能是一對多喊暖,但是對于一個split只會對應(yīng)一個文件的一個block或多個block惫企,不允許一個split對應(yīng)多個文件的多個block;
  2. 這里切分和輸入數(shù)據(jù)的時會涉及到InputFormat的文件切分算法和host選擇算法陵叽。

文件切分算法狞尔,主要用于確定InputSplit的個數(shù)以及每個InputSplit對應(yīng)的數(shù)據(jù)段。FileInputFormat以文件為單位切分生成InputSplit巩掺,對于每個文件偏序,由以下三個屬性值決定其對應(yīng)的InputSplit的個數(shù):

  • goalSize: 它是根據(jù)用戶期望的InputSplit數(shù)目計算出來的,即totalSize/numSplits胖替。其中研儒,totalSize為文件的總大小独令;numSplits為用戶設(shè)定的Map Task個數(shù)端朵,默認情況下是1;
  • minSize:InputSplit的最小值燃箭,由配置參數(shù)mapred.min.split.size確定逸月,默認是1;
  • blockSize:文件在hdfs中存儲的block大小遍膜,不同文件可能不同碗硬,默認是64MB瓤湘。

這三個參數(shù)共同決定InputSplit的最終大小,計算方法如下:
splitSize=max{minSize, min{gogalSize,blockSize}}

Partition

  • 作用:將map的結(jié)果發(fā)送到相應(yīng)的reduce端恩尾,總的partition的數(shù)目等于reducer的數(shù)量弛说。
  • 實現(xiàn)功能:
  1. map輸出的是key/value對,決定于當(dāng)前的mapper的part交給哪個reduce的方法是:mapreduce提供的Partitioner接口翰意,對key進行hash后木人,再以reducetask數(shù)量取模,然后到指定的job上(HashPartitioner冀偶,可以通過job.setPartitionerClass(MyPartition.class)自定義)醒第。
  2. 然后將數(shù)據(jù)寫入到內(nèi)存緩沖區(qū),緩沖區(qū)的作用是批量收集map結(jié)果进鸠,減少磁盤IO的影響稠曼。key/value對以及Partition的結(jié)果都會被寫入緩沖區(qū)。在寫入之前客年,key與value值都會被序列化成字節(jié)數(shù)組霞幅。
  • 要求:負載均衡,效率量瓜;

spill(溢寫):sort & combiner

  • 作用:把內(nèi)存緩沖區(qū)中的數(shù)據(jù)寫入到本地磁盤司恳,在寫入本地磁盤時先按照partition、再按照key進行排序(quick sort)绍傲;
  • 注意:
  1. 這個spill是由另外單獨的線程來完成扔傅,不影響往緩沖區(qū)寫map結(jié)果的線程;
  2. 內(nèi)存緩沖區(qū)默認大小限制為100MB烫饼,它有個溢寫比例(spill.percent)铅鲤,默認為0.8,當(dāng)緩沖區(qū)的數(shù)據(jù)達到閾值時枫弟,溢寫線程就會啟動,先鎖定這80MB的內(nèi)存鹏往,執(zhí)行溢寫過程淡诗,maptask的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響伊履。然后再重新利用這塊緩沖區(qū)韩容,因此Map的內(nèi)存緩沖區(qū)又叫做環(huán)形緩沖區(qū)(兩個指針的方向不會變,下面會詳述)唐瀑;
  3. 在將數(shù)據(jù)寫入磁盤之前群凶,先要對要寫入磁盤的數(shù)據(jù)進行一次排序操作,先按<key,value,partition>中的partition分區(qū)號排序哄辣,然后再按key排序请梢,這個就是sort操作赠尾,最后溢出的小文件是分區(qū)的,且同一個分區(qū)內(nèi)是保證key有序的毅弧;

combine:

執(zhí)行combine操作要求開發(fā)者必須在程序中設(shè)置了combine(程序中通過job.setCombinerClass(myCombine.class)自定義combine操作)气嫁。

  • 程序中有兩個階段可能會執(zhí)行combine操作:
  1. map輸出數(shù)據(jù)根據(jù)分區(qū)排序完成后,在寫入文件之前會執(zhí)行一次combine操作(前提是作業(yè)中設(shè)置了這個操作)够坐;
  2. 如果map輸出比較大寸宵,溢出文件個數(shù)大于3(此值可以通過屬性min.num.spills.for.combine配置)時,在merge的過程(多個spill文件合并為一個大文件)中還會執(zhí)行combine操作元咙;
  • combine主要是把形如<aa,1>,<aa,2>這樣的key值相同的數(shù)據(jù)進行計算梯影,計算規(guī)則與reduce一致,比如:當(dāng)前計算是求key對應(yīng)的值求和庶香,則combine操作后得到<aa,3>這樣的結(jié)果甲棍。
  • 注意事項:不是每種作業(yè)都可以做combine操作的,只有滿足以下條件才可以:
  1. reduce的輸入輸出類型都一樣脉课,因為combine本質(zhì)上就是reduce操作救军;
  2. 計算邏輯上,combine操作后不會影響計算結(jié)果倘零,像求和就不會影響唱遭;

merge

  • merge過程:當(dāng)map很大時,每次溢寫會產(chǎn)生一個spill_file呈驶,這樣會有多個spill_file拷泽,而最終的一個map task輸出只有一個文件,因此袖瞻,最終的結(jié)果輸出之前會對多個中間過程進行多次溢寫文件(spill_file)的合并司致,此過程就是merge過程。也即是聋迎,待Map Task任務(wù)的所有數(shù)據(jù)都處理完后脂矫,會對任務(wù)產(chǎn)生的所有中間數(shù)據(jù)文件做一次合并操作,以確保一個Map Task最終只生成一個中間數(shù)據(jù)文件霉晕。
  • 注意:
  1. 如果生成的文件太多庭再,可能會執(zhí)行多次合并,每次最多能合并的文件數(shù)默認為10牺堰,可以通過屬性min.num.spills.for.combine配置拄轻;
  2. 多個溢出文件合并時,會進行一次排序伟葫,排序算法是多路歸并排序恨搓;
  3. 是否還需要做combine操作,一是看是否設(shè)置了combine,二是看溢出的文件數(shù)是否大于等于3斧抱;
  4. 最終生成的文件格式與單個溢出文件一致常拓,也是按分區(qū)順序存儲,并且輸出文件會有一個對應(yīng)的索引文件夺姑,記錄每個分區(qū)數(shù)據(jù)的起始位置墩邀,長度以及壓縮長度,這個索引文件名叫做file.out.index盏浙。

內(nèi)存緩沖區(qū)

  1. 在Map Task任務(wù)的業(yè)務(wù)處理方法map()中眉睹,最后一步通過OutputCollector.collect(key,value)或context.write(key,value)輸出Map Task的中間處理結(jié)果,在相關(guān)的collect(key,value)方法中废膘,會調(diào)用Partitioner.getPartition(K2 key, V2 value, int numPartitions)方法獲得輸出的key/value對應(yīng)的分區(qū)號(分區(qū)號可以認為對應(yīng)著一個要執(zhí)行Reduce Task的節(jié)點)竹海,然后將<key,value,partition>暫時保存在內(nèi)存中的MapOutputBuffe內(nèi)部的環(huán)形數(shù)據(jù)緩沖區(qū),該緩沖區(qū)的默認大小是100MB丐黄,可以通過參數(shù)io.sort.mb來調(diào)整其大小斋配。
  2. 當(dāng)緩沖區(qū)中的數(shù)據(jù)使用率達到一定閥值后,觸發(fā)一次Spill操作灌闺,將環(huán)形緩沖區(qū)中的部分數(shù)據(jù)寫到磁盤上艰争,生成一個臨時的Linux本地數(shù)據(jù)的spill文件;然后在緩沖區(qū)的使用率再次達到閥值后桂对,再次生成一個spill文件甩卓。直到數(shù)據(jù)處理完畢,在磁盤上會生成很多的臨時文件蕉斜。
  3. 緩存有一個閥值比例配置逾柿,當(dāng)達到整個緩存的這個比例時,會觸發(fā)spill操作宅此;觸發(fā)時机错,map輸出還會接著往剩下的空間寫入,但是寫滿的空間會被鎖定父腕,數(shù)據(jù)溢出寫入磁盤弱匪。當(dāng)這部分溢出的數(shù)據(jù)寫完后,空出的內(nèi)存空間可以接著被使用璧亮,形成像環(huán)一樣的被循環(huán)使用的效果萧诫,所以又叫做環(huán)形內(nèi)存緩沖區(qū);
  4. MapOutputBuffer內(nèi)部存數(shù)的數(shù)據(jù)采用了兩個索引結(jié)構(gòu)杜顺,涉及三個環(huán)形內(nèi)存緩沖區(qū)。下來看一下兩級索引結(jié)構(gòu):


    buffer

    這三個環(huán)形緩沖區(qū)的含義分別如下:

  1. kvoffsets緩沖區(qū):也叫偏移量索引數(shù)組蘸炸,用于保存key/value信息在位置索引 kvindices 中的偏移量躬络。當(dāng) kvoffsets 的使用率超過 io.sort.spill.percent (默認為80%)后,便會觸發(fā)一次 SpillThread 線程的“溢寫”操作搭儒,也就是開始一次 Spill 階段的操作穷当。
  2. kvindices緩沖區(qū):也叫位置索引數(shù)組提茁,用于保存 key/value 在數(shù)據(jù)緩沖區(qū) kvbuffer 中的起始位置。
  3. kvbuffer即數(shù)據(jù)緩沖區(qū):用于保存實際的 key/value 的值馁菜。默認情況下該緩沖區(qū)最多可以使用 io.sort.mb 的95%茴扁,當(dāng) kvbuffer 使用率超過 io.sort.spill.percent (默認為80%)后,便會觸發(fā)一次 SpillThread 線程的“溢寫”操作汪疮,也就是開始一次 Spill 階段的操作峭火。

寫入到本地磁盤時,對數(shù)據(jù)進行排序智嚷,實際上是對kvoffsets這個偏移量索引數(shù)組進行排序卖丸。

Reduce

Reduce過程的經(jīng)典流程圖如下:


reduce-shuffle

copy過程

  • 作用:拉取數(shù)據(jù);
  • 過程:Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher)盏道,通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件稍浆。因為這時map task早已結(jié)束,這些文件就歸NodeManager管理在本地磁盤中猜嘱。
  • 默認情況下衅枫,當(dāng)整個MapReduce作業(yè)的所有已執(zhí)行完成的Map Task任務(wù)數(shù)超過Map Task總數(shù)的5%后,JobTracker便會開始調(diào)度執(zhí)行Reduce Task任務(wù)朗伶。然后Reduce Task任務(wù)默認啟動mapred.reduce.parallel.copies(默認為5)個MapOutputCopier線程到已完成的Map Task任務(wù)節(jié)點上分別copy一份屬于自己的數(shù)據(jù)弦撩。 這些copy的數(shù)據(jù)會首先保存的內(nèi)存緩沖區(qū)中,當(dāng)內(nèi)沖緩沖區(qū)的使用率達到一定閥值后腕让,則寫到磁盤上孤钦。

內(nèi)存緩沖區(qū)

  • 這個內(nèi)存緩沖區(qū)大小的控制就不像map那樣可以通過io.sort.mb來設(shè)定了,而是通過另外一個參數(shù)來設(shè)置:mapred.job.shuffle.input.buffer.percent(default 0.7)纯丸, 這個參數(shù)其實是一個百分比偏形,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task觉鼻。
  • 如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設(shè)置俊扭,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)。默認情況下坠陈,reduce會使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)萨惑。如果reduce的heap由于業(yè)務(wù)原因調(diào)整的比較大,相應(yīng)的緩存大小也會變大仇矾,這也是為什么reduce用來做緩存的參數(shù)是一個百分比庸蔼,而不是一個固定的值了。

merge過程

  • Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中贮匕,這里的緩沖區(qū)大小要比 map 端的更為靈活姐仅,它基于 JVM 的heap size設(shè)置,因為 Shuffle 階段 Reducer 不運行,所以應(yīng)該把絕大部分的內(nèi)存都給 Shuffle 用掏膏。
  • 這里需要強調(diào)的是劳翰,merge 有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤。默認情況下第一種形式是不啟用的馒疹。當(dāng)內(nèi)存中的數(shù)據(jù)量到達一定閾值佳簸,就啟動內(nèi)存到磁盤的 merge(圖中的第一個merge,之所以進行merge是因為reduce端在從多個map端copy數(shù)據(jù)的時候颖变,并沒有進行sort生均,只是把它們加載到內(nèi)存,當(dāng)達到閾值寫入磁盤時悼做,需要進行merge) 疯特。這和map端的很類似,這實際上就是溢寫的過程肛走,在這個過程中如果你設(shè)置有Combiner漓雅,它也是會啟用的,然后在磁盤中生成了眾多的溢寫文件朽色,這種merge方式一直在運行邻吞,直到?jīng)]有 map 端的數(shù)據(jù)時才結(jié)束,然后才會啟動第三種磁盤到磁盤的 merge (圖中的第二個merge)方式生成最終的那個文件葫男。
  • 在遠程copy數(shù)據(jù)的同時抱冷,Reduce Task在后臺啟動了兩個后臺線程對內(nèi)存和磁盤上的數(shù)據(jù)文件做合并操作,以防止內(nèi)存使用過多或磁盤生的文件過多梢褐。

reducer的輸入文件

  • merge的最后會生成一個文件旺遮,大多數(shù)情況下存在于磁盤中,但是需要將其放入內(nèi)存中盈咳。當(dāng)reducer 輸入文件已定耿眉,整個 Shuffle 階段才算結(jié)束。然后就是 Reducer 執(zhí)行鱼响,把結(jié)果放到 HDFS 上鸣剪。

小結(jié):

  1. shuffle是mapreduce優(yōu)化的重點地方;
  2. 環(huán)形內(nèi)存緩沖區(qū) :因此map寫入磁盤的過程十分的復(fù)雜丈积,更何況map輸出時候要對結(jié)果進行排序筐骇,內(nèi)存開銷是很大的,所以開啟環(huán)形內(nèi)存緩沖區(qū)專門用于輸出江滨;默認是100MB铛纬,閾值是0.8;
  3. spill(溢寫):緩沖區(qū)>80%唬滑,寫入磁盤告唆;溢寫前先排序莫秆,后合并,寫入磁盤悔详;
  4. Partition:Partitioner操作和map階段的輸入分片(Input split)很像,Partitioner會找到對應(yīng)的map輸出文件惹挟,然后進行復(fù)制操作茄螃,作為reduce的輸入;
  5. reduce階段:和map函數(shù)一樣也是程序員編寫的连锯,最終結(jié)果是存儲在hdfs上的归苍。

Split是怎么劃分的? 參考FileInputFormat類中split切分算法和host選擇算法介紹

參考:

MapReduce 過程詳解:https://www.cnblogs.com/npumenglei/p/3631244.html

MapReduce之Shuffle過程詳述:http://matt33.com/2016/03/02/hadoop-shuffle/

MapReduce過程簡析:http://www.reibang.com/p/0ec306605df8

MapReduce執(zhí)行過程:https://zhuanlan.zhihu.com/p/45305945

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末运怖,一起剝皮案震驚了整個濱河市拼弃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌摇展,老刑警劉巖吻氧,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異咏连,居然都是意外死亡盯孙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門祟滴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來振惰,“玉大人,你說我怎么就攤上這事垄懂∑锞В” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵草慧,是天一觀的道長桶蛔。 經(jīng)常有香客問我,道長冠蒋,這世上最難降的妖魔是什么羽圃? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮抖剿,結(jié)果婚禮上朽寞,老公的妹妹穿的比我還像新娘。我一直安慰自己斩郎,他們只是感情好脑融,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缩宜,像睡著了一般肘迎。 火紅的嫁衣襯著肌膚如雪甥温。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天妓布,我揣著相機與錄音姻蚓,去河邊找鬼。 笑死匣沼,一個胖子當(dāng)著我的面吹牛狰挡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播释涛,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼加叁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了唇撬?” 一聲冷哼從身側(cè)響起它匕,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窖认,沒想到半個月后豫柬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡扑浸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年轮傍,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片首装。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡创夜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出仙逻,到底是詐尸還是另有隱情驰吓,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布系奉,位于F島的核電站檬贰,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏缺亮。R本人自食惡果不足惜翁涤,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望萌踱。 院中可真熱鬧葵礼,春花似錦、人聲如沸并鸵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽园担。三九已至届谈,卻和暖如春枯夜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背艰山。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工湖雹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人曙搬。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓劝枣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親织鲸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355