hadoop yarn是一個獨立的調(diào)度框架寥假,我們自己也可以通用yarn提供的api編寫程序?qū)⑽覀冏约旱膶懙某绦蛴脃arn來調(diào)度運行
hadoop 官提供了一個distributedshell yarn程序,該程序用途是實現(xiàn)使用yarn調(diào)度框架執(zhí)行shell濒持,Hadoop提供這個程序的目的是為了通過這個簡單的例子說明如果自己寫一個yarn程序
具體代碼地址:
https://github.com/apache/hadoop/tree/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell
編寫yarn程序一般需要
- 編寫一個客戶端,客戶端定義了啟動ApplicationMaster的方式寺滚,提交application到RM
- 編寫自己的ApplicationMaster柑营,在ApplicationMaster中創(chuàng)建與RM,NN交互的客戶端用于向RM申請資源并且在NN中啟動容器運行任務
接口
底層接口
ApplicationClientProtocol
clients
和ResourceManager
之間用于提交/中斷job
和獲取 application
,集群metrics
,node
queue
和ACLs
信息的底層協(xié)議
ApplicationMasterProtocol
AM和RM通信的底層協(xié)議村视,ApplicationMasterProtocol.allocate
用于AM和RM心跳
ContainerManagementProtocol
AM和NN通信的底層協(xié)議官套,用于啟動和停止容器以及獲取運行容器的狀態(tài)信息
高層接口
Client<-->ResourceManager
使用 YarnClient
對象
ApplicationMaster<-->ResourceManager
用于向RM申請Container
,使用AMRMClientAsync
對象, AMRMClientAsync.CallbackHandler
用于異步事件處理
ApplicationMaster<-->NodeManager
用于在NodeManager上啟動容器蚁孔,使用NMClientAsync
與NodeManager
通信,NMClientAsync.CallbackHandler
用于異步事件處理
distributedshell
下面我們來看一下hadoop的這個例子程序distributedshell是怎么編寫的
編寫Client
org.apache.hadoop.yarn.applications.distributedshell.Client
這個是客戶端入口程序用于和RM交互
- 程序首先初始化了
YarnClient
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
然后調(diào)用yarnClient.createApplication
方法創(chuàng)建App奶赔,獲取application id,底層api使用的是ApplicationClientProtocol
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
根據(jù)請求RM
獲取到的application id
構(gòu)造ApplicationSubmissionContext
,ApplicationSubmissionContext
代表了RM啟動ApplicationMaster
所需要的信息,客戶端需要在這個上下文中設置如下信息:
- Application 信息: id, name
- Queue, priority 信息:提交application到哪個隊列, 優(yōu)先級.
- User: 提交application的用戶
-
ContainerLaunchContext
: AM將要運行的容器的所有信息的定義杠氢,比如Local Resources (binaries, jars, files etc.), Environment settings (CLASSPATH etc.), 需要執(zhí)行的Command 和 security Tokens (RECT).
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
...
//設置appName
appContext.setApplicationName(appName);
...
//準備Local Resources 站刑,Environment ,和Command
...
// 通過準備的信息構(gòu)造 application master的 ContainerLaunchContext
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null);
...
//設置啟動資源信息
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
//設置ContainerLaunchContext
appContext.setAMContainerSpec(amContainer);
//設置優(yōu)先級
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
//設置隊列
appContext.setQueue(amQueue);
//提交應用
yarnClient.submitApplication(appContext);
舉個例子幫助理解
假如以test用戶運行hadoop-yarn-applications-distributedshell
, 運行命令如下:
hadoop jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -queue root.download -shell_script /tmp/a.sh
假設yarnClient.createApplication()
- 申請到的appid為
application_1576067711791_1132781
鼻百, - Client會將 -jar參數(shù) 傳過來的jar路徑即
hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar
作為為本地資源(Local Resources)绞旅,放到test用戶的如下hdfs 路徑
/user/test/DistributedShell/application_1576067711791_1132781/AppMaster.jar
- 對應的執(zhí)行腳本
/tmp/a.sh
也會放到對于hadoop家目錄對于application 的hdfs路徑下:
/user/test/DistributedShell/application_1576067711791_1132781/ExecScript.sh
其中運行ApplicationMaster
的comond
為:
org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
客戶端與RM交互圖
ApplicationSubmissionContext結(jié)構(gòu)圖
編寫 ApplicationMaster (AM)
AM
使用ApplicationAttemptId
與RM交互
可以從傳進來的env環(huán)境變量中獲取 Container信息,進一步獲取ApplicationAttemptId
ContainerId containerId = ConverterUtils.toContainerId(envs
.get(Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId();
在AM完全初始化自身之后温艇,我們可以啟動兩個客戶端:一個與RM通信因悲,一個與NM通信。并設置相關的事件處理函數(shù)
//與RM通信并設置相關的事件處理函數(shù)
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
//與NM通信并設置相關的事件處理函數(shù)
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM必須向RM發(fā)出心跳信息
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
AM向RM申請container資源
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ " previous attempts' running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
// 申請啟動container
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
AMRMClientAsync.CallbackHandler
onContainersAllocated
回調(diào)函數(shù)啟動LaunchContainerRunnable
線程執(zhí)行
//containerListener是 containerListener = createNMCallbackHandler();
//containerListener是NMCallbackHandler類型的
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
LaunchContainerRunnable
LaunchContainerRunnable
的run
方法里面構(gòu)造了需要在container
中運行shell
的ContainerLaunchContext
,并且綁定containerListener
回調(diào)函數(shù)勺爱,然后使用nmClientAsync
異步啟動container
//封裝用于啟動shell腳本的ctx
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
//注冊NMCallbackHandler回調(diào)函數(shù)
containerListener.addContainer(container.getId(), container);
//使用nmClientAysnc異步啟動container
nmClientAsync.startContainerAsync(container, ctx);