1. 背景
??在筆者開發(fā)的大數(shù)據(jù)平臺(tái)(XSailboat)中,包含一個(gè)任務(wù)開發(fā)椎例、調(diào)度原杂、監(jiān)控的模塊印颤,叫XTaskWorks。這里的任務(wù)是使用Java穿肄,在XTaskFrame框架之上開發(fā)的數(shù)據(jù)集成年局、數(shù)據(jù)計(jì)算际看、數(shù)據(jù)分析等類型的任務(wù),并且在通用的任務(wù)框架基礎(chǔ)之上矢否,抽象出可視化編輯的ETL工具仲闽。所以任務(wù)數(shù)量是很多的,通常在一個(gè)項(xiàng)目中能達(dá)到幾百的規(guī)模僵朗。每個(gè)任務(wù)可以視為一個(gè)Java進(jìn)程(有一些調(diào)度頻率不高的任務(wù)赖欣,出于節(jié)約內(nèi)存的目的,也會(huì)合并到一個(gè)進(jìn)程中)衣迷。
??為了維護(hù)畏鼓、部署酱酬、更新方便壶谒,筆者基于YARN的容器化和AM 架構(gòu),開發(fā)了任務(wù)引擎膳沽。任務(wù)引擎并不是一個(gè)ApplicationMaster汗菜、多個(gè)Application的模式,而是每一個(gè)任務(wù)進(jìn)程都是一個(gè)ApplicationMater挑社,主要原因是:
- XTaskFrame是先于大數(shù)據(jù)平臺(tái)XSailboat的產(chǎn)品陨界,它本身已經(jīng)有了任務(wù)注冊(cè)和管理服務(wù)(類似于AM),注冊(cè)和管理服務(wù)不存在痛阻,不會(huì)影響任務(wù)的正常調(diào)度執(zhí)行菌瘪,只是暫時(shí)暫時(shí)失去了從界面管控任務(wù)的能力。
- 如果采用一個(gè)AM阱当,多個(gè)APP的模式俏扩,一旦AM因?yàn)槟承┰颍V沽吮滋恚c它相關(guān)的幾百個(gè)任務(wù)可能會(huì)因?yàn)槭チ薃M录淡,而被回收,即使沒(méi)有回收油坝,AM重啟后也需要在YARN的框架下重新將這些容器納入到新AM的管理之下嫉戚,這會(huì)比較復(fù)雜。
??筆者的處理方法是:
- 在原來(lái)的任務(wù)注冊(cè)和管理服務(wù)基礎(chǔ)之上澈圈,增加了容器引擎的擴(kuò)展模塊彬檀。
- 通過(guò)任務(wù)注冊(cè)和管理服務(wù)實(shí)現(xiàn)定義任務(wù),一鍵啟停任務(wù)瞬女。啟動(dòng)任務(wù)會(huì)在YARN上以AM形式運(yùn)行起來(lái)窍帝。
- 任務(wù)啟動(dòng)成功后,會(huì)向任務(wù)注冊(cè)和管理服務(wù)注冊(cè)拆魏。
2. 出現(xiàn)的問(wèn)題
- 任務(wù)進(jìn)程運(yùn)行起來(lái)以后盯桦,一直沒(méi)有達(dá)到RUNNING狀態(tài)慈俯。
- 停止進(jìn)程之后,YARN又會(huì)自動(dòng)啟動(dòng)一次拥峦。
3. 原因:
??運(yùn)行起來(lái)的AM需要主動(dòng)連接RM贴膘,進(jìn)行注冊(cè),發(fā)送心跳和取消注冊(cè)略号。提供下面代碼做參考:
mLogger.info("開始啟動(dòng) ApplicationMaster ...");
Map<String, String> envMap = System.getenv() ;
String ctnIdString = envMap.get(ApplicationConstants.Environment.CONTAINER_ID.key());
mLogger.info("容器ID:{}", ctnIdString);
mLogger.info("JAVA_HOME:{}", envMap.get(ApplicationConstants.Environment.JAVA_HOME.key()));
try
{
ContainerId ctnId = ContainerId.fromString(ctnIdString);
ApplicationAttemptId attempId = ctnId.getApplicationAttemptId();
mAmCtx.setApplicationAttemptId(attempId) ;
RMCallbackHandler rmCall = new RMCallbackHandler(mAmCtx);
mAMRMClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCall);
Configuration conf = mAmCtx.getHadoopConf() ;
Assert.notNull(conf) ;
conf = new Configuration(conf) ;
conf.set(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS , Integer.toString(YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS));
mAMRMClient.init(conf);
mUgf = UserGroupInformation.getCurrentUser();
mUgf.addToken(mAmCtx.getAMRMToken()) ;
mUgf.doAs((PrivilegedExceptionAction<Void>) () -> {
try
{
mAMRMClient.start();
}
catch(Exception e)
{
mLogger.error("啟動(dòng)AMRMClientAsync出現(xiàn)異常刑峡,異常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
System.exit(0) ;
}
return null;
}) ;
}
catch (IOException | InterruptedException e)
{
mLogger.error("安全執(zhí)行出現(xiàn)異常,異常消息:{}" , ExceptionAssist.getStackTrace(e)) ;
System.exit(0) ;
}
// AM必須向RM發(fā)出心跳信號(hào)玄柠,以便讓它知道AM已存在且仍在運(yùn)行突梦。
// RM的超時(shí)到期間隔由可通過(guò)YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS訪問(wèn)的配置設(shè)置定義,
// 默認(rèn)值由YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS定義羽利。
// ApplicationMaster需要使用ResourceManager注冊(cè)自己以開始心跳宫患。
String hostName = NetUtils.getHostname();
try
{
// 服務(wù)地址
String serviceAddr = "http://"+XNet.getPreferedIpv4()+":"
+ MSApp.instance().getHttpPort() ;
//注冊(cè)服務(wù)地址
RegisterApplicationMasterResponse resp = mAMRMClient.registerApplicationMaster(hostName
, 0 , serviceAddr);
long maxMem = resp.getMaximumResourceCapability().getMemorySize();
mLogger.info("集群資源分配的最大內(nèi)存是:{}MB" , maxMem);
int maxVCores = resp.getMaximumResourceCapability().getVirtualCores();
mLogger.info("集群資源分配的最大虛擬核數(shù)是:{}個(gè) " , maxVCores);
mLogger.info("AM啟動(dòng)完成。");
// 取消注冊(cè)
Runnable stopPerformer = ()->{
try
{
mAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED , "用戶主動(dòng)關(guān)閉應(yīng)用这弧。", "N/A");
}
catch (YarnException | IOException e)
{
e.printStackTrace();
}
} ;
// 任務(wù)停止時(shí)娃闲,取消注冊(cè)。
MSApp.instance().withStopPerformer(stopPerformer) ;
// ShutdownHook形式不可靠
// Runtime.getRuntime().addShutdownHook(new Thread(stopPerformer)) ;
}
catch (YarnException | IOException e)
{
mLogger.error("AM運(yùn)行過(guò)程中出現(xiàn)異常匾浪。異常消息:"+ExceptionAssist.getStackTrace(e)) ;
}