最近在定位Yarn的crash問題時(shí),順便把spark怎么使用yarn的好好的梳理了一遍晌杰。不過我先了解一下Yarn和怎么提交yarn的job的。
首先我們先看看Yarn的架構(gòu):
ResourceManager
a)一個(gè)純粹的調(diào)度器
b)根據(jù)應(yīng)用程序的資源請(qǐng)求嚴(yán)格限制系統(tǒng)的可用資源
c)在保證容量筷弦、公平性及服務(wù)等級(jí)的情況下肋演,優(yōu)化集群資源利用率,讓所有資源都得到充分的利用
d)由可插拔的調(diào)度器來應(yīng)用不同的調(diào)度算法烂琴,如注重容量調(diào)度還是注意公平調(diào)度
ApplicationManager
a)負(fù)責(zé)與ResourceManager協(xié)商資源爹殊,并和NodeManager協(xié)同工作來執(zhí)行和監(jiān)控Container以及他們的資源消耗
b)有責(zé)任與ResourceManager協(xié)商并獲取合適的資源Container,跟蹤他們的狀態(tài)奸绷,以及監(jiān)控其進(jìn)展
c)在真實(shí)環(huán)境中梗夸,每一個(gè)應(yīng)用都有自己的ApplicationMaster的實(shí)例,但是也可為一組提供一個(gè)ApplicationMaster号醉,比如Pig或者Hive的ApplicationMaster
一反症、Yarn
Client編寫
1.創(chuàng)建Yarn客戶端
YarnClient yarnClient =
YarnClient.createYarnClien。t();
yarnClient.init(conf);
yarnClient.start();
2.創(chuàng)建Yarn應(yīng)用
YarnClientApplication app =yarnClient.createApplication();
3.設(shè)置Applicaton的名字畔派,內(nèi)存和cpu需求以及優(yōu)先級(jí)和Queue信息铅碍,YARN將根據(jù)這些信息來調(diào)度AppMaster
app.getApplicationSubmissionContext().setApplicationName("jenkins.ApplicationMaster");
app.getApplicationSubmissionContext().setResource(Resource.newInstance(100,1));
app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));
app.getApplicationSubmissionContext().setQueue("default");
4.設(shè)置ContainerLaunchContext,這一步,amContainer中包含了App Master執(zhí)行需要的資源文件线椰,環(huán)境變量和啟動(dòng)命令胞谈,這里將資源文件上傳到了HDFS,這樣在NODE Manager就可以通過HDFS取得這些文件
app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);
5.提交應(yīng)用
ApplicationId appId =yarnClient.submitApplication(app.getApplicationSubmissionContext());
二、YARN ApplicationMaster編寫
ApplicationMaster編寫的編寫比較復(fù)雜烦绳,其需要通Resource Manager和Node Manager交互悔叽,
通過ResourceManager:申請(qǐng)Container,并接收Resource Manager的一些消息爵嗅,如可用的Container,結(jié)束的Container等笨蚁。
通過NodeManage:啟動(dòng)Container睹晒,并接收Node Manage的一些消息,如Container的狀態(tài)變化以及Node狀態(tài)變化括细。
1.創(chuàng)建一個(gè)AMRMClientAsync對(duì)象伪很,這個(gè)對(duì)象負(fù)責(zé)與Resource
Manager交互
amRMClient= AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());
這里的RMCallbackHandler是我們編寫的繼承自AMRMClientAsync.CallbackHandler的一個(gè)類,其功能是處理由Resource
Manager收到的消息,
其需要實(shí)現(xiàn)的方法由如下
publicvoid onContainersCompleted(List statuses);
publicvoid onContainersAllocated(List containers) ;
publicvoid onShutdownRequest() ;
publicvoid onNodesUpdated(List updatedNodes) ;
publicvoid onError(Throwable e) ;
這里不考慮異常的情況下奋单,只寫onContainersAllocated锉试,onContainersCompleted這兩個(gè)既可以,一個(gè)是當(dāng)有新的Container可以使用览濒,一個(gè)是Container運(yùn)行結(jié)束呆盖。
在onContainersAllocated我們需要編寫啟動(dòng)container的代碼,amNMClient.startContainerAsync(container, ctx);這里的ctx同Yarn Client中第4步中的amContainer是同一個(gè)類型贷笛,即這個(gè)container執(zhí)行的一些資源应又,環(huán)境變量與命令等,因?yàn)檫@是在回調(diào)函數(shù)中乏苦,為了保證時(shí)效性株扛,這個(gè)操作最好放在線程池中異步操作。
在onContainersCompleted中汇荐,如果是失敗的Container洞就,我們需要重新申請(qǐng)并啟動(dòng)Container,(這一點(diǎn)有可能是YARN的Fair Schedule中會(huì)強(qiáng)制退出某些Container以釋放資源)成功的將做記錄既可以掀淘。
2.創(chuàng)建一個(gè)NMClientAsyncImpl對(duì)象旬蟋,這個(gè)對(duì)象負(fù)責(zé)與Node
Manager交互
amNMClient= new NMClientAsyncImpl(new NMCallbackHandler());
這里NMCallbackHandler使我們需要編寫的繼承自NMClientAsync.CallbackHandler的對(duì)象,其功能是處理由Node
Manager收到的消息
publicvoid onContainerStarted(ContainerId containerId,MapallServiceResponse);
publicvoid onContainerStatusReceived(ContainerId containerId,ContainerStatus containerStatus);
publicvoid onContainerStopped(ContainerId containerId) ;
publicvoid onStartContainerError(ContainerId containerId, Throwable t);
publicvoid onGetContainerStatusError(ContainerId containerId,Throwable t) ;
publicvoid onStopContainerError(ContainerId containerId, Throwable t);
這里簡(jiǎn)單的不考慮異常的情況下革娄,這些函數(shù)可以寫一個(gè)空函數(shù)體咖为,忽略掉處理
3.將ApplicationMaster注冊(cè)到Resource
Manager上
RegisterApplicationMasterResponseresponse = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1,"");
這個(gè)函數(shù)將自己注冊(cè)到RM上,這里沒有提供RPC
port和TrackURL.
4. ApplicationMaster向Resource
Manager申請(qǐng)Container
ContainerRequestcontainerAsk = new ContainerRequest(
//100*10M + 1vcpu
Resource.newInstance(100, 1), null, null,
Priority.newInstance(0));
amRMClient.addContainerRequest(containerAsk);
這里一個(gè)containerAsk表示申請(qǐng)一個(gè)Container稠腊,這里的對(duì)nodes和rasks設(shè)置為NULL躁染,猜測(cè)MapReduce應(yīng)該由參數(shù)來嘗試申請(qǐng)靠近HDFS
block的container的
5.申請(qǐng)到Container后,回調(diào)AMRMClientAsync.CallbackHandler的onContainersAllocated就會(huì)響應(yīng)架忌,然后通過amNMClient在Container運(yùn)行計(jì)算任務(wù):
Listcommands = new LinkedList();
commands.add("sleep" + sleepSeconds.addAndGet(1));
ContainerLaunchContextctx = ContainerLaunchContext.newInstance(null, null, commands, null, null,null);
amNMClient.startContainerAsync(container,ctx);
6.等待Container執(zhí)行完畢吞彤,清理退出
我的代碼如下,循環(huán)等待container執(zhí)行完畢,并上報(bào)執(zhí)行結(jié)果
voidwaitComplete() throws YarnException, IOException{
while(numTotalContainers.get() !=numCompletedConatiners.get()){
try{
Thread.sleep(1000);
LOG.info("waitComplete"+
", numTotalContainers=" +numTotalContainers.get() +
", numCompletedConatiners=" +numCompletedConatiners.get());
} catch (InterruptedException ex){}
}
exeService.shutdown()饰恕;
amNMClient.stop();
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,"dummy Message", null)挠羔;
amRMClient.stop();
}
三、YARN Container Application
真正處理數(shù)據(jù)的是由ApplicationMaster由amNMClient.startContainerAsync(container, ctx)提交的Containerapplication,然后這這個(gè)應(yīng)用并不需要特殊編寫埋嵌,任何程序通過提交相應(yīng)的運(yùn)行信息都可以在這些Node中的某個(gè)Container中執(zhí)行破加,所以這個(gè)程序可以是一個(gè)復(fù)雜的MapReduce Task或者是一個(gè)簡(jiǎn)單的腳本。
總結(jié):
YARN提供了對(duì)cluster資源管理和作業(yè)調(diào)度的功能雹嗦。
編寫一個(gè)應(yīng)用運(yùn)行在YARN之上范舀,比較復(fù)雜的是ApplicationMaster的編寫,其需要維護(hù)container的狀態(tài)并能共做一些錯(cuò)誤恢復(fù)了罪,重啟應(yīng)用的操作锭环。比較簡(jiǎn)答的是Client的編寫,只需要提交必須的信息既可以泊藕,不需要維護(hù)狀態(tài)辅辩。真正運(yùn)行處理數(shù)據(jù)的是Container Application,這個(gè)程序可以不需要針對(duì)YARN做代碼編寫
四娃圆、Spark Yarn Client模式
Spark Yarn有兩種模式玫锋,一直是client模式,一種是cluster模式讼呢,今天我們先說說client模式景醇,以下是Spark YarnClient的交互圖。