MapReduce框架結(jié)構(gòu)##
MapReduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型
MapReduce模型主要有Mapper和Reducer兩個抽象類.
Mapper端主要負責(zé)對數(shù)據(jù)的分析處理,最終轉(zhuǎn)化為Key-value的數(shù)據(jù)結(jié)構(gòu)
Reducer端主要是獲取Mapper出來的結(jié)果,對結(jié)果進行統(tǒng)計
MapReduce實現(xiàn)存儲的均衡,未實現(xiàn)計算的均衡
MapReduce框架組成
注意:TaskTracker都需要運行在HDFS的DataNode上
Mapper和Reducer
運行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括:一個Mapper抽象類和一個Reducer抽象類,以及創(chuàng)建JobConf的執(zhí)行程序迫悠,在一些應(yīng)用中還可以包括Combiner類今布,Combiner實際也是Reducer的實現(xiàn)
JobTracker
JobTracker是一個master服務(wù),軟件啟動后JobTracker接收Job,負責(zé)調(diào)度Job的每一個子任務(wù)Task運行于TaskTracker上抹镊,并監(jiān)控它們专钉,如果發(fā)現(xiàn)有失敗的Task就重新運行它。
一般情況下應(yīng)該把JobTracker部署在單獨的機器上
負責(zé)任務(wù)的分發(fā)和監(jiān)控
TaskTracker
運行在多個節(jié)點上的slave服務(wù),TaskTracker主動與JobTracker通信(與DataNode和NameNode相似,通過心跳來實現(xiàn))接收作業(yè)
負責(zé)直接執(zhí)行每一個任務(wù)
JobClient
每一個job都會在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)Configuration打包成JAR文件存儲在HDFS,并把路徑提交到JobTracker的master服務(wù)蒋腮,然后由master創(chuàng)建每一個Task(即Map Task和Reduce Task)將它們分發(fā)到各個TaskTracker服務(wù)中去執(zhí)行
JobInProgress
JobClient提交Job后,JobTracker會創(chuàng)建一個JobInProgress來跟蹤和調(diào)度這個Job,并把它添加到Job隊列里藕各。
JobInProgress會根據(jù)提交的任務(wù)JAR中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對應(yīng)的一批TaskInProgress用戶監(jiān)控和調(diào)度MapTask,同時創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,默認為1個ReduceTask
TaskInProgress
JobTracker啟動任務(wù)時通過每一個TaskInProgress來運行Task,這時會把Task對象(即MapTask和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會創(chuàng)建對應(yīng)的TaskInProgress(此TaskInProgress實現(xiàn)非JobTracker中使用的TaskInProgress管理,通過TaskRunner對象來運行)
TaskRunner會自動裝載任務(wù)JAR文件并設(shè)置好環(huán)境變量后,啟動一個獨立的Java Child進程來執(zhí)行Task,即MapTask或者ReduceTask池摧,但它們不一定運行在同一個TaskTracker中
MapTask和ReduceTask
一個完整的Job會自動依次執(zhí)行Mapper、Combiner(在JobConf指定Combiner時執(zhí)行)和Reducer激况,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行的作彤,Reducer則由ReduceTask調(diào)用
Combiner實際也是Reducer接口類的實現(xiàn)。
MapReduce運行原理
一個MapReduce作業(yè)(Job)通常會把輸入的數(shù)據(jù)集切分為若干獨立的數(shù)據(jù)塊,由Map任務(wù)以完全并行的方式處理它們乌逐〗呋洌框架會對map函數(shù)的輸出先進行排序,然后把結(jié)果輸入給Reduce任務(wù)
通常浙踢,MapReduce框架和分布式文件系統(tǒng)是運行在一組相同的節(jié)點上的绢慢,也就是說,計算節(jié)點和存儲節(jié)點通常在一起的洛波。
作業(yè)的提交
JobClient的runJob()方法用于新建JobClient實例并調(diào)用其submitJob()方法胰舆,這種便捷方式提交作業(yè)后,runJob()每秒輪詢作業(yè)的進度,如果發(fā)現(xiàn)自上次上報后的信息有改動蹬挤,便把進度報告輸出到控制臺缚窿。
Hadoop運行MapReduce作業(yè)的工作原理如圖所示:
JobClient的submitJob()方法實現(xiàn)作業(yè)提交過程如下:
1.向JobTracker請求一個新的作業(yè)ID(通過JobTracker的getNewJobId()獲取)
2.檢查作業(yè)的輸出說明。例如焰扳,如果沒有指定輸出目錄或者它已經(jīng)存在倦零,作業(yè)就不會被提交,并將錯誤返回給MapReduce程序
3.計算作業(yè)的輸出劃分.如果劃分無法計算蓝翰,例如光绕,因為輸入路徑不存在女嘲,作業(yè)就不會被提交畜份,并將錯誤返回給MapReduce程序
4.將運行作業(yè)所需要的資源(包括作業(yè)的JAR文件、配置文件和計算所得的輸入劃分)復(fù)制到一個以作業(yè)ID命名的目錄中JobTracker的文件系統(tǒng)中欣尼。作業(yè)JAR的副本較多(由mapred.submit.replication屬性控制.默認為10)爆雹,因此在TaskTracker運行作業(yè)任務(wù)時,集群能為它們提供許多副本進行訪問
5.調(diào)用JobTracker的submitJob()方法,告訴JobTracker作業(yè)準(zhǔn)備執(zhí)行
6.JobTracker接收到對其submitJob()方法調(diào)用后愕鼓,會把此調(diào)用放入一個內(nèi)部隊列中钙态,交由作業(yè)調(diào)度器進行調(diào)度,并對其進行初始化菇晃。初始化包括創(chuàng)建一個代表該正在運行的作業(yè)對象册倒,它封裝任務(wù)和記錄信息,以便跟蹤任務(wù)的狀態(tài)和進程
7.要創(chuàng)建運行任務(wù)列表磺送,作業(yè)調(diào)度器首先從共享文件系統(tǒng)中獲取JobClient已經(jīng)計算好的輸入劃分信息驻子,然后為每個劃分創(chuàng)建一個Map任務(wù)灿意。創(chuàng)建的Reduce任務(wù)的數(shù)量由JobConf的mapred.reduce.tasks屬性決定,它是用setNumReduceTasks()方法設(shè)定的崇呵。然后調(diào)度器便創(chuàng)建指定個數(shù)的Reduce來運行任務(wù)缤剧。
8.TaskTracker執(zhí)行一個簡單的循環(huán),定期發(fā)送心跳方法調(diào)用JobTracker域慷。作業(yè)心跳方法調(diào)用指TaskTracker會向JobTracker匯報當(dāng)前的狀態(tài),如果正常荒辕,JobTracker會為它分配一個任務(wù),并使用心跳方法的返回值與TaskTracker進行通信
現(xiàn)在TaskTracker已經(jīng)被分配了任務(wù),下面是運行任務(wù)步驟
1:本地化作業(yè)的JAR文件犹褒,將它從共享文件系統(tǒng)復(fù)制到TaskTracker所在文件系統(tǒng)抵窒。同時,將應(yīng)用程序所需要的全部文件從分布式緩存復(fù)制到本地磁盤
2:為任務(wù)新建一個本地工作目錄化漆,并把JAR文件中的內(nèi)容解壓到這個文件夾下
3:新建一個TaskRunner實例來運行任務(wù)
TaskRunner啟動一個新的Java虛擬機來運行每個任務(wù)估脆,使得用戶執(zhí)行任務(wù)啟動map和reduce函數(shù)的任何缺陷都不會影響TaskTracker。但在不同的任務(wù)之間重用JVM還是可能的座云。
作業(yè)初始化(map分發(fā)策略優(yōu)化)
Job初始化過程主要是在JobTracker建立一個slave node對Task的映射模型疙赠,其他都是附屬工作。
首先需要知道的是Task是Job的基本單元朦拖,由JobTracker分發(fā)到TaskTracker來執(zhí)行圃阳。Task分為以下兩類:
Map Task:處理輸入數(shù)據(jù),它就應(yīng)該是輸入數(shù)據(jù)璧帝、Job相關(guān)信息等組成的對象
Reduce Task:匯總Map Task的輸出結(jié)果捍岳,最后生成Job的輸出,它也應(yīng)是Job相關(guān)信息的組成
Job將所有輸入數(shù)據(jù)組裝成邏輯分片睬隶,這些邏輯分片只是在HDFS上物理數(shù)據(jù)Block的索引以及存儲信息锣夹。
Map Task依賴于這些信息來決定將Task分發(fā)到哪些TaskTracker上。JobTracker可以取到Job相關(guān)的metadata信息苏潜,然后由這些信息決定如何分發(fā)Task,這些分片的相關(guān)信息就存放在特定的目錄下银萍,jobTracker通過JobId可以訪問到
Reduce Task不管在哪個TaskTracker上執(zhí)行,都得從其他那些執(zhí)行Map Task的TaskTracker上拉取數(shù)據(jù)恤左,所以對它的分發(fā)JobTracker不需要準(zhǔn)備什么贴唇,只要在合適的時候放到某臺TaskTracker上執(zhí)行即可
JobTracker主要還是關(guān)注Map Task的準(zhǔn)備工作(Reduce Task并不是從所有Map Task拉取臨時數(shù)據(jù)。如果有多個Reduce Task飞袋,每個Reduce Task只是拉取一部分Map Task的臨時數(shù)據(jù))
Map Task的執(zhí)行效率依賴于讀取輸入數(shù)據(jù)的效率戳气。
根據(jù)數(shù)據(jù)所處的位置與TaskTracker的距離,有三種本地數(shù)據(jù)級別
node-local 輸入分片就在TaskTracker本地
rack-local 輸入分片在TaskTracker所在Rack的其他節(jié)點上
off-switch 輸入分片在其他的Rack內(nèi)
>JobTracker在Task分發(fā)時應(yīng)充分考慮本地數(shù)據(jù)級別。
分發(fā)策略對job執(zhí)行效率的影響很大程度是如何優(yōu)化Map Task的本地數(shù)據(jù)
JobTracker可以從Job的metadata中得到并維護這樣一種映射關(guān)系:
job split--->HDFS Block && slave node
這種映射關(guān)系就是生成Map Task的基礎(chǔ)巧鸭。有多少個Split,就會有多少個Map Task
響應(yīng)心跳而選擇Map Task 的處理步驟如下所示:
1:根據(jù)TaskTracker的機器,查看JobTracker中是否存在一個Map Task,它關(guān)聯(lián)的Block(假設(shè)一個Block劃分為一個Split)存儲在 TaskTracker的本地磁盤上,那么就優(yōu)先執(zhí)行這個Map Task
2:如果沒有1可選的Map Task瓶您,那么查看是否有Map關(guān)聯(lián)的Block在TaskTracker所在的Rack內(nèi)
3:如果上面兩步都沒有選到某個Map Task,那么就根據(jù)情況看是否執(zhí)行跨Rack的Task或其他推測式執(zhí)行Task
>當(dāng)用戶開啟Task推測式執(zhí)行,推測式執(zhí)行就會發(fā)生在JobTracker意識到某個Task執(zhí)行效率低的時候,盡量要讓推測式Task是node local級別的。
任務(wù)的分配
每個TaskTracker定期向JobTracker發(fā)送心跳信息,心跳信息包含TaskTracker的狀態(tài)呀袱,是否可以接收新的任務(wù).
JobTracker以此來決定將任務(wù)分配給誰(仍然使用心跳的返回值與TaskTracker通信).
每個TaskTracker會有固定數(shù)量的任務(wù)槽(slot)來處理Map和Reduce(表示TaskTracker可以同時運行兩個Map和Reduce)芯肤,由機器內(nèi)核的數(shù)量和內(nèi)存大小來決定。
jobTracker會先將TaskTracker的Map槽填滿,然后分配Reduce任務(wù)到TaskTracker
JobTracker選擇哪個TaskTracker來運行Map任務(wù)需要考慮網(wǎng)絡(luò)位置,它會選擇一個離輸入分片較近的TaskTracker压鉴,優(yōu)先級是數(shù)據(jù)本地化(data-local) ,然后再到機架本地化(rack-local)
任務(wù)的執(zhí)行
TaskTracker分配到一個任務(wù)后崖咨,首先從HDFS中把作業(yè)的JAR文件復(fù)制到TaskTracker所在的本地文件系統(tǒng)(JAR本地化用來啟動JVM).
同時將應(yīng)用程序所需要的全部文件從分布式緩存復(fù)制到本地磁盤上。
接下來TaskTracker為任務(wù)新建一個本地工作目錄work油吭,并把JAR文件的內(nèi)容解壓到這個文件夾下
TaskTracker新建一個TaskRunner實例來運行該任務(wù).TaskRunner啟動一個新的JVM來運行每個任務(wù),以便客戶的MapReduce不會影響TaskTracker守護進程击蹲。
但在不同任務(wù)之間重用JVM還是可能的。
進度和狀態(tài)的更新
一個作業(yè)和每個任務(wù)都有一個狀態(tài)信息,包括作業(yè)或任務(wù)的運行狀態(tài)(running,successful,failed)婉宰、Map和Reduce的進度歌豺、計數(shù)器值、狀態(tài)消息和描述等
這些信息通過一定的時間間隔由Child JVM-->TaskTracker-->JobTracker匯聚心包。
JobTracker將產(chǎn)生一個聲明所有運行作業(yè)及其任務(wù)狀態(tài)的全局視圖
mapreduce的進度組成
MapReduce構(gòu)成的所有操作如下:
讀入一條輸入記錄(在Mapper或Reducer中)
寫入一條輸入記錄(在Mapper或Reducer中)
在一個Context中設(shè)置狀態(tài)描述
增加計數(shù)器Counter
調(diào)用progress()方法
任務(wù)完成
當(dāng)JobTracker收到作業(yè)最后一個任務(wù)已完成的通知后,便把作業(yè)的狀態(tài)設(shè)置成"成功”
MapReduce容錯
任務(wù)失敗
最常見的是Map或Reduce任務(wù)的失敗类咧,發(fā)生Map或Reduce失敗的時候,子任務(wù)JVM進程會在退出之前向其上一級TaskTracker發(fā)送錯誤報告
另一個錯誤情況就是子進程JVM突然退出,可能由JVM的bug導(dǎo)致蟹腾,從而導(dǎo)致MapReduce用戶代碼執(zhí)行失敗
還有一種情況,如果超時設(shè)置成0將關(guān)閉超時檢測痕惋,所有長時間運行的任務(wù)永遠不會被標(biāo)記為failed
TaskTracker失敗
當(dāng)TaskTracker停止或者很少向JobTracker發(fā)送心跳,JobTracker會注意到此TaskTracker發(fā)送心跳的情況,從而將此TaskTracker從等待任務(wù)調(diào)度的TaskTracker池中移除,JobTracker會安排此TaskTracker上一個成功運行的Map任務(wù)返回
下面介紹兩種TaskTracker失敗的情況
1:如果它們屬于未完成的作業(yè),Reduce階段無法獲取本地Map輸出的文件結(jié)果,任務(wù)都需要重新調(diào)度和執(zhí)行,只要是Map階段失敗必然是重新執(zhí)行這個任務(wù)
2:如果是Reduce階段娃殖,自然是執(zhí)行未完成的Reduce任務(wù)值戳,因為Reduce只要執(zhí)行完就會把輸出寫入到HDFS上
JobTracker失敗
JobTracker失敗應(yīng)該說是最嚴(yán)重的失敗方式,而且在Hadoop中存在單點故障的情況下是相當(dāng)嚴(yán)重的,因為這種情況下作業(yè)最終失敗.
可以通過啟動多個JobTracker炉爆,在這種情況下只運行一個主JobTracker堕虹。使用Zookeeper作為JobTracker的協(xié)調(diào)機制來決定哪一個是主JobTracker
子任務(wù)失敗
Map任務(wù)和Reduce任務(wù)失敗的三種情況:
1:當(dāng)Map或者Reduce子任務(wù)中的代碼拋出異常,JVM進程會在退出之前向TaskTracker進程發(fā)送錯誤報告,TaskTracker會將此(任務(wù)嘗試) task attempt標(biāo)記為failed狀態(tài),釋放一個槽以便運行另外一個任務(wù)
2:對于流任務(wù),如果流進程以非零退出代碼退出執(zhí)行,會標(biāo)記為failed
3:子JVM突然退出,即JVM錯誤,這時TaskTracker會注意到進程已經(jīng)退出芬首,標(biāo)記為failed
TaskTracker將子任務(wù)標(biāo)記為失敗后會將自身計數(shù)器減1,為了JobTracker申請新的任務(wù),也是通過心跳告知JobTracker本地的一個任務(wù)嘗試失敗
JobTracker接到任務(wù)失敗的通知后,會將其重新加入到調(diào)度隊列重新分配給其他的TaskTracker執(zhí)行(避免將失敗的任務(wù)分配給執(zhí)行失敗的TaskTracker)赴捞,但是這個嘗試也是有次數(shù)限制的,默認情況下郁稍,任務(wù)嘗試4次仍然沒有完成赦政,就不會再重試(JobTracker會將其標(biāo)記為Killed),此時整個作業(yè)就執(zhí)行失敗了
任務(wù)失敗反復(fù)次數(shù)的處理方法
當(dāng)Map Task執(zhí)行失敗后會重試,超過重試次數(shù)(由mapred.map.max.attempts指定,默認認為4)整個Job會失敗
Hadoop提供配置參數(shù)mapred.max.map.failures.percent解決這個問題
如果一個Job有200個Map Task艺晴,該參數(shù)設(shè)置為5,則單個Job最多允許10個Map Task(200*5%=10)失敗
把該配置放到mapred-site.xml中即可
Reduce Task也有類似配置mapred.max.reduce.failures.percent屬性
Shuffle階段和sort階段
當(dāng)Map開始產(chǎn)生輸出時昼钻,它并不是簡單地把數(shù)據(jù)寫到磁盤上掸屡,因為頻繁的磁盤操作會導(dǎo)致性能嚴(yán)重下降封寞。它的處理過程更復(fù)雜,數(shù)據(jù)首先寫到內(nèi)存中的一個緩沖區(qū)仅财,并做一些預(yù)排序狈究,以提升效率
Paste_Image.png
map端的shuffle
每個Map任務(wù)都有一個用來寫入輸出數(shù)據(jù)的循環(huán)內(nèi)部緩沖區(qū)。這緩沖區(qū)默認大小是100MB盏求,可以通過io.sort.mb屬性來設(shè)置具體大小抖锥。
當(dāng)緩沖區(qū)的數(shù)據(jù)量達到一個特定閥值時(io.sort.mb*io.srot.spill.percent亿眠,其中io,sort,spill.percent默認是0.8)系統(tǒng)將會啟動一個后臺線程把緩沖區(qū)中的內(nèi)容spill到磁盤
在spill過程中,Map的輸出將會繼續(xù)寫入到緩沖區(qū)磅废,但如果緩沖區(qū)已滿纳像,Map就會被阻塞直到spill完成。spill線程在把緩沖區(qū)的數(shù)據(jù)寫到磁盤前拯勉,會對它進行一個二次快速排序
首先根據(jù)數(shù)據(jù)所屬的Partition排序,然后每個partition中再按Key排序.輸出包括一個索引文件和數(shù)據(jù)文件竟趾。
如果設(shè)定了Combiner,將在排序輸出的基礎(chǔ)上運行
Combiner就是一個Mini Reducer.它在執(zhí)行Map任務(wù)的節(jié)點本身運行宫峦,先對Map的輸出做一次簡單的Reduce,使得Map的輸出更緊湊
spill文件保存在由mapred.local.dir指定的目錄中岔帽,map任務(wù)結(jié)束后刪除
每當(dāng)內(nèi)存中的數(shù)據(jù)達到spill閥值的時候,都會產(chǎn)生一個新的spill文件导绷,所以在map任務(wù)寫完它的最后一個輸出記錄時,可能會有多個spill文件犀勒。在Map任務(wù)完成前,所有的spill文件將會被歸并排序為一個索引文件和數(shù)據(jù)文件妥曲,這是一個多路歸并過程贾费,最大歸并路數(shù)由io.sort.factor控制(默認是10)
如果設(shè)定了Combiner,并且spill文件的數(shù)量至少是3(由min.num.spills.for.combine屬性控制)檐盟,Combiner將在輸出文件被寫入磁盤前運行以壓縮數(shù)據(jù)
默認輸出是不被壓縮的铸本,但可以很簡單的設(shè)置mapred.compress.map.output為true啟用該功能
壓縮所使用的庫由mapred.map.output.compression.codec來設(shè)定
當(dāng)spill文件歸并完畢后,map將刪除所有臨時spill文件遵堵,并告知TaskTracker任務(wù)已完成箱玷。
Reduce端通過HTTP獲取對應(yīng)的數(shù)據(jù)
用來傳輸partitions數(shù)據(jù)的工作線程個數(shù)由tasktracker.http.threads控制,這個設(shè)定是針對每一個TaskTracker的陌宿,并不是單個Map,默認值是40
注意:Map輸出總是寫到本地磁盤锡足,但Reduce輸出不是,一般是寫到HDFS
Reduce任務(wù)的輸入數(shù)據(jù)分布在集群內(nèi)的多個Map任務(wù)的輸出中,Map任務(wù)可能會在不同的時間內(nèi)完成壳坪,只要有其中的一個map任務(wù)完成舶得,Reduce任務(wù)就開始復(fù)制它的 輸出,這個階段稱為Cope階段
Reduce任務(wù)擁有多個cope線程爽蝴,可以并行的獲取Map輸出沐批,可以通過設(shè)定mapred.reduce.parallel.copies來改變線程數(shù),默認是5
reduce端的shuffle
Recduce端怎么知道從哪些TaskTracker中獲取Map端輸出呢蝎亚?
當(dāng)Map任務(wù)完成之后九孩,會通知它們的父TaskTracker,告知狀態(tài)更新发框,然后TaskTracker再轉(zhuǎn)告JobTracker躺彬。
這些通知信息是通過心跳通信機制傳輸?shù)?br> 因此針對一個特定的作業(yè),JobTracker知道Map輸出與TaskTracker的映射關(guān)系
Reduce端中有一個線程會間歇地向JobTracker詢問Map輸出的地址,直到把所有的數(shù)據(jù)都獲取到宪拥。
在Redce取走了Map輸出之后仿野,TaskTracker不會立即刪除這些數(shù)據(jù),因為Reduce可能會失敗她君。它們會在整個作業(yè)完成后脚作,JobTracker告知它們要刪除的時候才去刪除
如果map輸出足夠小,它們會被復(fù)制到Reduce TaskTracker的內(nèi)存中(緩沖區(qū)的大小由mapred.job.shuffle.input.buffer.percent控制缔刹,制定了用于此目的的堆內(nèi)存的百分比)鳖枕;如果緩沖區(qū)空間不足,會被復(fù)制到磁盤上桨螺。當(dāng)被內(nèi)存中的緩沖去用量達到一定的比例閥值(由mapred.job.shuffle.merge.threshold)控制宾符,或者達到了Map輸出的閥值大小(由mapred.inmem.merge.threshold控制),緩沖區(qū)中的數(shù)據(jù)會被歸并然后spill到磁盤
下面分段描述Reduce端的shuffle細節(jié)
1:Copy過程
簡單地拉取數(shù)據(jù)灭翔。Reduce進程啟動一些數(shù)據(jù)copy線程(fetcher)魏烫,通過HTTP方式請求Map Task所在的TaskTracker獲取Map Task的輸出文件
因為Map Task早已結(jié)束,這些文件就歸TaskTracker管理在本地磁盤上
2:Merge階段
這里的merge如Map端的Merge動作,只是數(shù)組中存放的是不同的Map端的數(shù)值
復(fù)制來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)肝箱,它是基于JVM的heap size設(shè)置哄褒,因為Shuffle階段Reduce不運行,所以應(yīng)該把絕大部分的內(nèi)存都給shuffle用
Merge有三種形式:內(nèi)存到內(nèi)存煌张;內(nèi)存到磁盤呐赡;磁盤到磁盤
默認情況下第一種形式不啟用,第二種merge方式一直在運行骏融,直到?jīng)]有Map端的數(shù)據(jù)時才結(jié)束链嘀,然后啟動第三種磁盤到磁盤的merge方式生成最終的文件
3:Reducer的輸入文件
不斷地Merge操作后,最后會生成一個“最終文件”档玻,因為該文件可能存在于磁盤之上怀泊,也可能存在內(nèi)存中
Yarn介紹
Yarn架構(gòu)
Yarn最基本的設(shè)計思想是將JobTracker的兩個主要功能,即資源管理和作業(yè)調(diào)度、監(jiān)控分成兩個獨立的進程误趴。
在該解決方案中包含兩個組件:
全局的ResourceManager(RM)霹琼、與每一個應(yīng)用相關(guān)的ApplicationManager(AM)
這里,“應(yīng)用”指的是一個單獨的MapReduce 作業(yè)或DAG作業(yè)凉当。RM與NM(NodeManager)共同組成整個計算框架枣申。
RM是系統(tǒng)中將資源分配給各個應(yīng)用的最終決策者
AM實際上是一個具體的框架庫,它的任務(wù)是與RM協(xié)商獲取應(yīng)用所需資源和與NM合作,以完成執(zhí)行和監(jiān)控Task的任務(wù)
ResourceManager
ResourceManager有點類似于JobTracker看杭,它有兩個主要的組件:調(diào)度器(Scheduler)和應(yīng)用程序管理器(ApplicationManager)
Scheduler負責(zé)分配資源到各個正在運行的應(yīng)用程序中.
調(diào)度器不執(zhí)行監(jiān)控和應(yīng)用程序忠藤,從這個意義上來,它是純粹的調(diào)度器。此外泊窘,它也不保證重啟失敗的任務(wù)熄驼。
調(diào)度器是基于資源請求來執(zhí)行它的調(diào)度功能,它是基于資源容器的抽象概念
調(diào)度器支持可插入的策略
ApplicationManager
ApplicationManager負責(zé)接送提交的作業(yè)烘豹,協(xié)商第一個執(zhí)行該任務(wù)的容器瓜贾,并提供失敗作業(yè)的重啟。
NodeManager是每個節(jié)點的框架代理携悯,它負責(zé)監(jiān)控資源的使用情況祭芦,并報告給ResourceManager.
每個應(yīng)用的ApplicationMaster負責(zé)與調(diào)度器談判資源占用的Container數(shù)量宝冕,追蹤狀態(tài)和監(jiān)控進程
NodeManager
NodeManager類似于TaskTracker,它負責(zé)啟動應(yīng)用程序Container(類似于JVM)壶冒,監(jiān)控Container的資源(cpu,內(nèi)存喉磁,磁盤诸衔,網(wǎng)絡(luò)等),并將信息上報給ResourceManager.調(diào)度器就是根據(jù)應(yīng)用程序的Container進行調(diào)度的
Yarn工作流程
首先說的概念是"Application Submission Client"市埋,它負責(zé)將“Application”提交到Y(jié)arn的Resourcemanager.客戶端通過ClientRMProtocol協(xié)議與ResourceManager聯(lián)系,如果需要Client拼余,會通過ClientRPProtocol::getNewApplication來獲取新的ApplicationId,然后通過ClientRMProtocol::submitApplication將應(yīng)用提交運行
Yarn上的ResourceManager會在一個獲得的Container上啟動ApplicationMaster.
ApplicationMaster通過AMRMProtocol協(xié)議與ResourceManager通信秉撇,首先ApplicationMaster需要將自身注冊到ResourceManager.
ApplicationMaster為了完成交給它的任務(wù)灵迫,會通過AMRMProtocol::allocate申請Container.
如果獲得了Container,ApplicationMaster會通過ContainerManager::startContainer和NodeManager聯(lián)系為任務(wù)啟動一個Container.
作為啟動Container的一部分照雁,ApplicationManager需要指定ContainerLauchContext
ContainerLauchhContext和ApplicationSubmissionContext相似,包括一些啟動時需要的信息蚕愤,如命令行命令,環(huán)境變量等
一旦任務(wù)完成饺蚊,ApplicationManager會通過AMRProtocol::finishApplicationMaster通知ResourceManager任務(wù)完成萍诱。
同時Client可以通過查詢ResourceManager來獲取Application的狀態(tài)信息,或者如果ApplicationMaster支持也可以直接從ApplicationMaster查詢信息污呼。如果需要裕坊,Client可以通過ClientRMProtocol::forceKillApplication來kill掉Application