MRAppMaster
MRAppMaster是MapReduce的ApplicationMaster實(shí)現(xiàn)撒汉,它使得MapReduce計(jì)算框架可以運(yùn)行于YARN之上蟋字。在YARN中狱庇,MRAppMaster負(fù)責(zé)管理MapReduce作業(yè)的生命周期塘娶,包括創(chuàng)建MapReduce作業(yè)约巷,向ResourceManager申請資源记某,與NodeManage通信要求其啟動(dòng)Container司训,監(jiān)控作業(yè)的運(yùn)行狀態(tài),當(dāng)任務(wù)失敗時(shí)重新啟動(dòng)任務(wù)等液南。
YARN使用了基于事件驅(qū)動(dòng)的異步編程模型壳猜,它通過事件將各個(gè)組件聯(lián)系起來,并由一個(gè)中央事件調(diào)度器統(tǒng)一將各種事件分配給對應(yīng)的事件處理器滑凉。在YARN中统扳,每種組件是一種事件處理器喘帚,MRAppMaster在整個(gè)MapReduce任務(wù)中負(fù)責(zé)管理整個(gè)任務(wù)的生命周期。它是一個(gè)獨(dú)立的進(jìn)程org.apache.hadoop.mapreduce.v2.app.MRAppMaster#main咒钟,由AppClient向Yarn申請Container后啟動(dòng)吹由。MRAppMaster是MapReduce對ApplicationMaster的實(shí)現(xiàn),它讓MapReduce任務(wù)能運(yùn)行在Yarn上朱嘴。它主要作用在于管理作業(yè)的生命周期:
- 作業(yè)的管理:作業(yè)的創(chuàng)建倾鲫,初始化以及啟動(dòng)等
- 向RM申請資源和再分配資源
- Container的啟動(dòng)與釋放
- 監(jiān)控作業(yè)運(yùn)行狀態(tài)
- 作業(yè)恢復(fù)
當(dāng)MRAppMaster啟動(dòng)時(shí),它們會(huì)以服務(wù)的形式注冊到MRAppMaster的中央事件調(diào)度器上萍嬉,并告訴調(diào)度器它們處理的事件類型乌昔,這樣,當(dāng)出現(xiàn)某一種事件時(shí)壤追,MRAppMaster會(huì)查詢<事件磕道,事件處理器>表,并將該事件分配給對應(yīng)的事件處理器行冰。
接下來溺蕉,我們分別介紹MRAppMaster各種組件/服務(wù)的功能。
ContainerAllocator: 與ResourceManager通信资柔,為作業(yè)申請資源焙贷。作業(yè)的每個(gè)任務(wù)資源需求可描述為四元組<Priority, hostname,capability贿堰,containers>,分別表示作業(yè)優(yōu)先級啡彬、期望資源所在的host羹与,資源量(當(dāng)前僅支持內(nèi)存),container數(shù)目庶灿。ContainerAllocator周期性通過RPC與ResourceManager通信纵搁,而ResourceManager會(huì)為之返回已經(jīng)分配的container列表,完成的container列表等信息往踢。
ClientService: ClientService是一個(gè)接口腾誉,由MRClientService實(shí)現(xiàn)。MRClientService實(shí)現(xiàn)了MRClientProtocol協(xié)議峻呕,客戶端可通過該協(xié)議獲取作業(yè)的執(zhí)行狀態(tài)(而不必通過ResourceManager)控制作業(yè)(比如殺死作業(yè)等)利职。
Job
表示一個(gè)MapReduce作業(yè),與MRv1的JobInProgress功能一樣瘦癌,負(fù)責(zé)監(jiān)控作業(yè)的運(yùn)行狀態(tài)猪贪。它維護(hù)了一個(gè)作業(yè)狀態(tài)機(jī),以實(shí)現(xiàn)異步控制各種作業(yè)操作讯私。Task
表示一個(gè)MapReduce作業(yè)中的某個(gè)任務(wù)热押,與MRv1中的TaskInProgress功能類似西傀,負(fù)責(zé)監(jiān)控一個(gè)任務(wù)的運(yùn)行狀態(tài)。它維護(hù)了一個(gè)任務(wù)狀態(tài)機(jī)桶癣,以實(shí)現(xiàn)異步控制各種任務(wù)操作拥褂。TaskAttempt:
表示一個(gè)任務(wù)運(yùn)行實(shí)例,同MRv1中的概念一樣牙寞。Speculator:完成推測執(zhí)行功能饺鹃。當(dāng)一個(gè)任務(wù)運(yùn)行速度明顯慢于其他任務(wù)時(shí),Speculator會(huì)為該任務(wù)啟動(dòng)一個(gè)備份任務(wù)碎税,讓其同慢任務(wù)一同處理同一份數(shù)據(jù)尤慰,誰先計(jì)算完成則將誰的結(jié)果作為最終結(jié)果,另一個(gè)任務(wù)將被殺掉雷蹂。該機(jī)制可有效防止“拖后腿”任務(wù)拖慢整個(gè)作業(yè)的執(zhí)行進(jìn)度伟端。
ContainerLauncher
與NodeManager通信,要求其啟動(dòng)一個(gè)Container匪煌。當(dāng)ResourceManager為作業(yè)分配資源后责蝠,ContainerLauncher會(huì)將資源信息封裝成container,包括任務(wù)運(yùn)行所需資源萎庭、任務(wù)運(yùn)行命令霜医、任務(wù)運(yùn)行環(huán)境、任務(wù)依賴的外部文件等驳规,然后與對應(yīng)的節(jié)點(diǎn)通信肴敛,要求其啟動(dòng)container。TaskAttemptListener
管理各個(gè)任務(wù)的心跳信息吗购,如果一個(gè)任務(wù)一段時(shí)間內(nèi)未匯報(bào)心跳医男,則認(rèn)為它死掉了,會(huì)將其從系統(tǒng)中移除捻勉。同MRv1中的TaskTracker類似镀梭,它實(shí)現(xiàn)了TaskUmbilicalProtocol協(xié)議,任務(wù)會(huì)通過該協(xié)議匯報(bào)心跳踱启,并詢問是否能夠提交最終結(jié)果报账。JobHistoryEventHandler
對作業(yè)的各個(gè)事件記錄日志,比如作業(yè)創(chuàng)建埠偿、作業(yè)開始運(yùn)行透罢、一個(gè)任務(wù)開始運(yùn)行等,這些日志會(huì)被寫到HDFS的某個(gè)目錄下胚想,這對于作業(yè)恢復(fù)非常有用琐凭。當(dāng)MRAppMaster出現(xiàn)故障時(shí),YARN會(huì)將其重新調(diào)度到另外一個(gè)節(jié)點(diǎn)上浊服,為了避免重新計(jì)算统屈,MRAppMaster首先會(huì)從HDFS上讀取上次運(yùn)行產(chǎn)生的運(yùn)行日志胚吁,以恢復(fù)已經(jīng)運(yùn)行完成的任務(wù),進(jìn)而能夠只運(yùn)行尚未運(yùn)行完成的任務(wù)愁憔。Recovery
當(dāng)一個(gè)MRAppMaster故障后腕扶,它將被調(diào)度到另外一個(gè)節(jié)點(diǎn)上重新運(yùn)行,為了避免重新計(jì)算吨掌,MRAppMaster首先會(huì)從HDFS上讀取上次運(yùn)行產(chǎn)生的運(yùn)行日志半抱,并恢復(fù)作業(yè)運(yùn)行狀態(tài)。
MRAppMaster工作流程
- 用戶向YARN中(RM)提交應(yīng)用程序膜宋,其中包括ApplicationMaster程序窿侈、啟動(dòng)ApplicationMaster的命令、用戶程序等秋茫。
- ResourceManager為該應(yīng)用程序分配第一個(gè)Container史简,ResouceManag與某個(gè)NodeManager通信,啟動(dòng)應(yīng)用程序ApplicationMaster肛著,NodeManager接到命令后圆兵,首先從HDFS上下載文件(緩存),然后啟動(dòng)ApplicationMaser枢贿。
- 當(dāng)ApplicationMaster啟動(dòng)后殉农,它與ResouceManager通信,以請求和獲取資源局荚。ApplicationMaster獲取到資源后超凳,與對應(yīng)NodeManager通信以啟動(dòng)任務(wù)。
( 如果該應(yīng)用程序第一次在節(jié)點(diǎn)上啟動(dòng)任務(wù)耀态,則NodeManager首先從HDFS上下載文件緩存到本地聪建,然后啟動(dòng)該任務(wù)。) - ApplicationMaster首先向ResourceManager注冊茫陆,這樣用戶可以直接通過ResourceManage查看應(yīng)用程序的運(yùn)行狀態(tài),然后它將為各個(gè)任務(wù)申請資源擎析,并監(jiān)控它們的運(yùn)行狀態(tài)簿盅,直到運(yùn)行結(jié)束,即重復(fù)步驟5~8
- ApplicationMaster采用輪詢的方式通過RPC協(xié)議向ResourceManager申請和領(lǐng)取資源
- 一旦ApplicationMaster申請到資源后揍魂,ApplicationMaster就會(huì)將啟動(dòng)命令交給NodeManager,要求它啟動(dòng)任務(wù)桨醋。啟動(dòng)命令里包含了一些信息使得Container可以與ApplicationMaster進(jìn)行通信。
- NodeManager為任務(wù)設(shè)置好運(yùn)行環(huán)境(包括環(huán)境變量现斋、JAR包喜最、二進(jìn)制程序等)后,將任務(wù)啟動(dòng)命令寫到一個(gè)腳本中庄蹋,并通過運(yùn)行該腳本啟動(dòng)任務(wù)(Container)
- 在應(yīng)用程序運(yùn)行過程中瞬内,用戶可隨時(shí)通過RPC向ApplicationMaster查詢應(yīng)用程序的當(dāng)前運(yùn)行狀態(tài)
- 應(yīng)用程序運(yùn)行完成后迷雪,ApplicationMaster向ResourceManager注銷并關(guān)閉自己
ContainerLauncher
ContainerLauncher負(fù)責(zé)與NodeManager通信,以啟動(dòng)一個(gè)container虫蝶。在YARN中章咧,運(yùn)行Task所需的全部信息被封裝到Container中,包括所需資源能真、依賴的外部文件赁严、jar包、運(yùn)行時(shí)環(huán)境變量粉铐、運(yùn)行命令等疼约。ContainerLauncher通過ContainerManager協(xié)議與NodeManager通信,該協(xié)議定義了三個(gè)RPC接口蝙泼,具體如下:
tartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException;//啟動(dòng)一個(gè)container
StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException;//停止一個(gè)container
GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException;//獲取一個(gè)container運(yùn)行情況
ContainerLauncher是一個(gè)接口程剥,它定義了2種事件:
CONTAINER_REMOTE_LAUNCH 啟動(dòng)一個(gè)container。當(dāng)ContainerAllocator為某個(gè)任務(wù)申請到資源后踱承,會(huì)將運(yùn)行該任務(wù)相關(guān)的所有信息封裝到container中倡缠,并要求對應(yīng)的節(jié)點(diǎn)啟動(dòng)該container
CONTAINER_REMOTE_CLEANUP 停止/殺死一個(gè)container。存在多種可能觸發(fā)該事件的行為茎活,常見的有昙沦,1)推測執(zhí)行時(shí)一個(gè)任務(wù)運(yùn)行完成,則需殺死另一個(gè)同輸入數(shù)據(jù)的任務(wù) 2)用戶發(fā)送一個(gè)殺死任務(wù)請求 3)任意一個(gè)任務(wù)運(yùn)行結(jié)束時(shí)载荔,YARN會(huì)觸發(fā)一個(gè)殺死任務(wù)的命令盾饮,以便結(jié)束對應(yīng)進(jìn)程。
ContainerLauncher接口由ContainerLauncherImpl類實(shí)現(xiàn)懒熙,它是一個(gè)服務(wù)丘损,接收和處理來自事件調(diào)度器發(fā)送過來的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP兩種事件,它采用了隊(duì)列+線程池的方式異步并行處理這兩種事件工扎。
對于CONTAINER_REMOTE_LAUNCH事件凹联,它會(huì)調(diào)用Container.launch()函數(shù)與對應(yīng)的NodeManager通信雏掠,以啟動(dòng)container,代碼如下:
proxy = getCMProxy(containerID, containerMgrAddress,
containerToken);//構(gòu)造一個(gè)RPC client
ContainerLaunchContext containerLaunchContext =
event.getContainer();
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
StartContainerResponse response = proxy.startContainer(startRequest);//調(diào)用RPC函數(shù),獲取返回值
對于CONTAINER_REMOTE_CLEANUP事件量窘,它會(huì)調(diào)用Container. kill()函數(shù)與對應(yīng)的NodeManager通信乱陡,以殺死一個(gè)container斯撮,代碼如下:
proxy = getCMProxy(this.containerID, this.containerMgrAddress,
this.containerToken);
StopContainerRequest stopRequest = Records
.newRecord(StopContainerRequest.class);
stopRequest.setContainerId(this.containerID);
proxy.stopContainer(stopRequest);
總之台妆,ContainerLauncherImpl是一個(gè)非常簡單的服務(wù),其最核心的代碼組織方式是隊(duì)列+進(jìn)程池拘荡,以處理事件調(diào)度器發(fā)送過來的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP兩種事件臼节。
作業(yè)恢復(fù)
在MRAppMaster中,記錄日志是由服務(wù)JobHistoryEventHandler完成的,而作業(yè)恢復(fù)是由服務(wù)RecoveryService完成的网缝。
同MRv1一樣巨税,MRv2也會(huì)對一些關(guān)鍵的時(shí)間記錄日志,這主要有兩個(gè)作用:(1)方便用戶查看歷史作業(yè)運(yùn)行信息 (2)作業(yè)因故障重新啟動(dòng)后途凫,可根據(jù)日志信息恢復(fù)之前已經(jīng)運(yùn)行完成的任務(wù)垢夹,以減少重新計(jì)算代價(jià)。
MRAppMaster采用的日志格式與MRv1一樣维费,但有兩個(gè)小的改動(dòng):
- 實(shí)現(xiàn)方式不同果元。MRv1采用了同步記錄日志的方式,也就是說犀盟,每發(fā)生一個(gè)行為而晒,會(huì)記錄一次日志,然后才可以執(zhí)行下面的代碼阅畴。由于YARN引入了基于事件的異步編程模型倡怎,因此,MRAppMaster也采用了異步方式記錄日志贱枣。
- 存儲位置不同监署。盡管MRv1允許用戶將作業(yè)日志存放到HDFS上,但默認(rèn)是存儲到本地的纽哥,MRAppMaster則不同钠乏,它直接將日志寫到HDFS上,這樣春塌,當(dāng)MRAppMaster失敗后晓避,另一個(gè)MRAppMaster啟動(dòng)時(shí),可直接讀取HDFS中上一個(gè)作業(yè)產(chǎn)生的日志只壳,以恢復(fù)已經(jīng)運(yùn)行完成的任務(wù)俏拱。
作業(yè)恢復(fù)的過程是重新解析作業(yè)日志,以恢復(fù)各個(gè)任務(wù)運(yùn)行狀態(tài)的過程(重做日志)吼句,這是由RecoveryService完成的锅必。如果用戶將yarn.app.mapreduce.am.job.recovery.enable參數(shù)置為true(默認(rèn)就是true),則MRAppMaster運(yùn)行作業(yè)之前惕艳,首先會(huì)檢查這是否是第一次運(yùn)行該作業(yè)况毅,如果不是,則從HDFS上讀取上次運(yùn)行的作業(yè)日志尔艇,并恢復(fù)作業(yè)的運(yùn)行狀態(tài),然后才會(huì)按照正常流程執(zhí)行么鹤。
public void init(final Configuration conf) {
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = createRecoveryService(context);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
}
}
public void start() {
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
}
……
}
生命周期
我們知道MRAppMaster中每個(gè)job终娃,若干干Map Task和Reduce Task組成,每個(gè)Task進(jìn)一步由若干個(gè)TaskAttempt組成蒸甜,Job棠耕、Task和TaskAttempt的生命周期均由一個(gè)狀態(tài)機(jī)表示余佛,
作業(yè)的創(chuàng)建入口在MRAppMaster類中,如下所示:
public class MRAppMaster extends CompositeService {
public void start() {
...
job = createJob(getConfig());//創(chuàng)建Job
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);//發(fā)送JOB_INI,創(chuàng)建MapTask,ReduceTask
startJobs();//啟動(dòng)作業(yè)窍荧,這是后續(xù)一切動(dòng)作的觸發(fā)之源
...
}
protected Job createJob(Configuration conf) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
}
JobImpl會(huì)接收到.JOB_INIT事件辉巡,然后觸發(fā)作業(yè)狀態(tài)從NEW變?yōu)镮NITED,并觸發(fā)函數(shù)InitTransition()蕊退,該函數(shù)會(huì)創(chuàng)建MapTask和 ReduceTask郊楣,
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
...
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
}
其中,createMapTasks函數(shù)實(shí)現(xiàn)如下:
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}
}
作業(yè)啟動(dòng)
public class MRAppMaster extends CompositeService {
protected void startJobs() {
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
dispatcher.getEventHandler().handle(startJobEvent);
}
}
JobImpl會(huì)接收到.JOB_START事件瓤荔,會(huì)觸發(fā)作業(yè)狀態(tài)從INITED變?yōu)镽UNNING净蚤,并觸發(fā)函數(shù)StartTransition(),進(jìn)而觸發(fā)Map Task和Reduce Task開始調(diào):
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
public void transition(JobImpl job, JobEvent event) {
job.scheduleTasks(job.mapTasks);
job.scheduleTasks(job.reduceTasks);
}
}
這之后输硝,所有Map Task和Reduce Task各自負(fù)責(zé)各自的狀態(tài)變化今瀑,ContainerAllocator模塊會(huì)首先為Map Task申請資源,然后是Reduce Task点把,一旦一個(gè)Task獲取到了資源橘荠,則會(huì)創(chuàng)建一個(gè)運(yùn)行實(shí)例TaskAttempt,如果該實(shí)例運(yùn)行成功郎逃,則Task運(yùn)行成功哥童,否則,Task還會(huì)啟動(dòng)下一個(gè)運(yùn)行實(shí)例TaskAttempt衣厘,直到一個(gè)TaskAttempt運(yùn)行成功或者達(dá)到嘗試次數(shù)上限如蚜。當(dāng)所有Task運(yùn)行成功后,Job運(yùn)行成功影暴。一個(gè)運(yùn)行成功的任務(wù)所經(jīng)歷的狀態(tài)變化如下(不包含失敗或者被殺死情況):