前言
上文提到y(tǒng)arn類似一個(gè)分布式操作系統(tǒng)白胀,那么我們就可以自定義寫一些應(yīng)用在這個(gè)操作系統(tǒng)上運(yùn)行
當(dāng)然也不能太過隨意寫摊腋,我們要運(yùn)行在操作系統(tǒng)上就必然要遵守操作系統(tǒng)本身的規(guī)矩
Yarn
Yarn體系中压汪,用戶的主程序被稱作ApplicationMaster
,當(dāng)然我們可以在ApplicationMaster中繼續(xù)向RM申請(qǐng)資源來執(zhí)行子程序,比如MapReduce中的MapTask和ReduceTask都屬于子程序糊昙。
這就好比我們平時(shí)寫java,在main方法主線程中可以創(chuàng)建子線程跑一些邏輯
- linux/windows中弥激,我們創(chuàng)建java子線程不需要關(guān)心這個(gè)線程任務(wù)到底由哪個(gè)cpu完成进陡,任務(wù)交給操作系統(tǒng)來調(diào)度
- 同理yarn中,
ApplicationMaster
申請(qǐng)創(chuàng)建出來的子程序微服,我們不用考慮程序運(yùn)行在哪臺(tái)機(jī)器上趾疚,任務(wù)交給yarn來調(diào)度
Hello World
接下來我們就嘗試寫一個(gè)簡(jiǎn)單應(yīng)用(輸出Hello World),運(yùn)行在yarn中以蕴,我們先不考慮使用子程序糙麦,直接在ApplicationMaster
中輸出Hello World
ApplicationMaster
寫一個(gè)Hello World應(yīng)用再簡(jiǎn)單不過了:
public class MyAppMaster {
public static void main(String[] args) {
System.out.println("HELLO WORLD");
}
}
但還是那句話,在yarn上運(yùn)行就要遵守人家的規(guī)矩丛肮,而yarn規(guī)定:
ApplicationMaster程序運(yùn)行前需要向RM注冊(cè)赡磅,運(yùn)行結(jié)束后需要取消注冊(cè)
也就是說程序不是你想跑就能跑,你得告訴人家資源管理器一聲宝与,否則人家隊(duì)伍怎么帶焚廊?
注冊(cè)的相關(guān)邏輯如果真自己寫還挺復(fù)雜,但好在hadoop為我們提供了客戶端工具习劫,我們引入依賴就方便了
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
最終ApplicationMaster
代碼如下(就是增加了注冊(cè)到RM和取消注冊(cè))
public class MyAppMaster {
/**
* AppMaster 程序入口
* @param args 執(zhí)行參數(shù)
*/
public static void main(String[] args) {
MyAppMaster master = new MyAppMaster();
master.run();
}
/**
* AppMaster 運(yùn)行
*/
public void run() {
try {
// 開啟am-rm client咆瘟,建立rm-am的通道,用于注冊(cè)AM
AMRMClientAsync amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, null);
amRmClient.init(new Configuration());
amRmClient.start();
String hostName = NetUtils.getHostname();
// 注冊(cè)至RM
amRmClient.registerApplicationMaster(hostName, -1, null);
// 運(yùn)行程序
doRun();
// 解除注冊(cè)
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 實(shí)際運(yùn)行程序榜聂,就一個(gè)輸出
*/
private void doRun() {
System.out.println("HELLO WORLD");
}
}
到此我們的應(yīng)用程序就寫完了搞疗,并且遵守了yarn的規(guī)矩
YarnClient
應(yīng)用程序?qū)懲炅耍趺窗殉绦虿渴鸬統(tǒng)arn上運(yùn)行吶须肆?
yarn又有規(guī)定了:
想讓你的程序在我的平臺(tái)上跑匿乃,需要你在RM上創(chuàng)建應(yīng)用,并指定好應(yīng)用名稱豌汇、運(yùn)行環(huán)境幢炸、程序(jar包)位置、啟動(dòng)命令拒贱、所需資源等
當(dāng)然這些數(shù)據(jù)的提交是有一定格式的宛徊,就像我們前端對(duì)接后端api,肯定是有一個(gè)json格式
索性我們不要考慮這復(fù)雜的格式逻澳,因?yàn)?code>hadoop-yarn-client依賴同樣幫我們封裝好了闸天,就好似有了sdk,寫寫代碼就可以和RM對(duì)接了斜做,而這個(gè)負(fù)責(zé)對(duì)接RM上傳應(yīng)用程序和啟動(dòng)參數(shù)的代碼苞氮,一般我們叫它:YarnClient
我們開始寫代碼實(shí)現(xiàn)這個(gè)YarnClient
1.配置
首先我們要與RM溝通創(chuàng)建應(yīng)用,首先要搞清楚RM在哪才能和它交互瓤逼,所以先配置一下RM的IP地址
Configuration conf = new Configuration();
// 設(shè)置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
其中192.168.10.101就是你運(yùn)行RM的機(jī)器IP地址
2.申請(qǐng)應(yīng)用
有了地址笼吟,就可以申請(qǐng)應(yīng)用库物,這一步直接使用hadoop-yarn-client
依賴的工具即可
// 創(chuàng)建客戶端
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 開啟(建立連接)
yarnClient.start();
// 向RM發(fā)送請(qǐng)求創(chuàng)建應(yīng)用
YarnClientApplication application = yarnClient.createApplication();
// 準(zhǔn)備應(yīng)用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 獲取分配的應(yīng)用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);
其中ApplicationId
就是RM給我們分配的應(yīng)用ID,ApplicationSubmissionContext
就是我們要提交的應(yīng)用相關(guān)信息的載體
所以接下來就是給applicationSubmissionContext填充應(yīng)用名稱贷帮、運(yùn)行環(huán)境戚揭、程序(jar包)位置、啟動(dòng)命令撵枢、所需資源等信息再次提交給RM
3.設(shè)置應(yīng)用名稱
應(yīng)用名稱就起個(gè)"Hello World"
// 設(shè)置應(yīng)用名稱
applicationSubmissionContext.setApplicationName("Hello World");
4.設(shè)置程序(jar包)位置
這一步最重要民晒,你得告訴RM你得程序在哪,一般都存在HDFS上诲侮,因?yàn)槲覒兄ド蟼鞫婆埃瑢懥艘粋€(gè)本地傳送到HDFS的方法
// 即上一步寫的AppMaster jar包的本地位置
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
put(jarName, addLocalToHdfs(jarPath, jarName));
}};
其中addLocalToHdfs
就是上傳到HDFS,并獲取HDFS路徑
private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
//獲取文件系統(tǒng)
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// 目標(biāo)路徑
String dst =
"hello/" + jarName;
Path dstPath =
new Path(fs.getHomeDirectory(), dst);
// 上傳
fs.copyFromLocalFile(new Path(jarPath), dstPath);
FileStatus scFileStatus = fs.getFileStatus(dstPath);
// 關(guān)閉
fs.close();
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(dstPath.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
return scRsrc;
}
這一步需要引入hdfs-client
依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.1.3</version>
</dependency>
5.設(shè)置程序環(huán)境
這一步同樣比較重要沟绪,我們需要設(shè)置程序運(yùn)行的環(huán)境刮便,jdk、yarn包什么的绽慈,設(shè)置了CLASSPATH
Map<String, String> env = new HashMap<>();
// 任務(wù)的運(yùn)行依賴jar包的準(zhǔn)備
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依賴包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
6.設(shè)置啟動(dòng)腳本
這一步一樣至關(guān)重要恨旱,我們要告訴RM我們的程序怎么啟動(dòng),因?yàn)閅arn不光支持java包這一種程序坝疼,所以我們要寫java的啟動(dòng)命令搜贤,可以通過-Xms -Xmx等設(shè)置啟動(dòng)jvm參數(shù)
List<String> commands = new ArrayList<String>() {{
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
7.配置Container啟動(dòng)上下文
資源、環(huán)境钝凶、啟動(dòng)命令等就組成了一個(gè)Container(AM的Container)啟動(dòng)的所需參數(shù)仪芒,把它們打包為container啟動(dòng)上下文,通過setAMContainerSpec
設(shè)置到要提交的參數(shù)中
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// 準(zhǔn)備am Container的運(yùn)行環(huán)境
applicationSubmissionContext.setAMContainerSpec(amContainer);
8.設(shè)置am程序所需硬件資源
準(zhǔn)備好了所有啟動(dòng)程序的信息耕陷,下一步就是告訴RM你這個(gè)AppMaster需要多少硬件資源掂名,這樣RM才能給你找合適的節(jié)點(diǎn)運(yùn)行你的程序,通過setResource
設(shè)置到要提交的參數(shù)中
int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
9.提交作業(yè)
完事具備哟沫,提交給RM你的程序就會(huì)被跑起來了
yarnClient.submitApplication(applicationSubmissionContext);
完整代碼
YarnClient完整代碼如下
package me.pq.yarn;
/**
* @Author pq217
* @Date 2022/11/18 17:47
* @Description
*/
public class MyYarnClient {
private static Logger log = LoggerFactory.getLogger(MyYarnClient.class);
public static void main(String[] args) {
MyYarnClient client = new MyYarnClient();
try {
client.run();
} catch (Exception e) {
log.error("client run exception , please check log file.", e);
}
}
/**
* 客戶端運(yùn)行
* @throws IOException
* @throws YarnException
* @throws URISyntaxException
* @throws InterruptedException
*/
public void run() throws IOException, YarnException, URISyntaxException, InterruptedException {
/**=====1.配置=====**/
Configuration conf = new Configuration();
// 設(shè)置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
/**=====2.申請(qǐng)app=====**/
// 創(chuàng)建YarnClient和ResourceManager進(jìn)行交互
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 開啟(建立連接)
yarnClient.start();
// 向RM發(fā)送請(qǐng)求創(chuàng)建應(yīng)用
YarnClientApplication application = yarnClient.createApplication();
// 準(zhǔn)備應(yīng)用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 獲取分配的應(yīng)用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);
/**=====3.設(shè)置應(yīng)用名稱=====**/
// 設(shè)置應(yīng)用名稱
applicationSubmissionContext.setApplicationName("Hello World");
/**=====4.準(zhǔn)備程序(jar包)=====**/
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
put(jarName, addLocalToHdfs(jarPath, jarName));
}};
/**=====5.準(zhǔn)備程序環(huán)境=====**/
Map<String, String> env = new HashMap<>();
// 任務(wù)的運(yùn)行依賴jar包的準(zhǔn)備
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依賴包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
/**=====6.準(zhǔn)備啟動(dòng)命令=====**/
List<String> commands = new ArrayList<String>() {{
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
/**=====7.構(gòu)造am container運(yùn)行資源+環(huán)境+腳本=====**/
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// 準(zhǔn)備am Container的運(yùn)行環(huán)境
applicationSubmissionContext.setAMContainerSpec(amContainer);
/**=====8.設(shè)置am程序所需資源=====**/
int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
/**=====9.提交并開始作業(yè)=====**/
yarnClient.submitApplication(applicationSubmissionContext);
/**=====10.查詢作業(yè)是否完成=====**/
for (;;) {
Thread.sleep(500);
ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
YarnApplicationState state = applicationReport.getYarnApplicationState();
FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
if (state.equals(YarnApplicationState.FINISHED)) {
if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
log.info("程序運(yùn)行成功!");
break;
} else {
log.error("程序運(yùn)行失敗!");
break;
}
} else if (state.equals(YarnApplicationState.FAILED) || state.equals(YarnApplicationState.KILLED) ) {
log.error("程序運(yùn)行失敗!");
break;
}
log.info("計(jì)算中...");
}
}
/**
* 上傳本地jar包到hdfs
* @param jarPath
* @param jarName
* @throws IOException
*/
private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
//獲取文件系統(tǒng)
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// 目標(biāo)路徑
String dst =
"hello/" + jarName;
Path dstPath =
new Path(fs.getHomeDirectory(), dst);
// 上傳
fs.copyFromLocalFile(new Path(jarPath), dstPath);
FileStatus scFileStatus = fs.getFileStatus(dstPath);
// 關(guān)閉
fs.close();
LocalResource scRsrc = LocalResource.newInstance(
URL.fromURI(dstPath.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
scFileStatus.getLen(), scFileStatus.getModificationTime());
return scRsrc;
}
}
測(cè)試
應(yīng)用寫好了饺蔑,上傳應(yīng)用的client也寫好了,下面測(cè)一下
首先使用maven-assembly插件給程序打jar包
mvn clean package
其次嗜诀,本地idea直接運(yùn)行YarnClient的main方法
注意替換一下代碼中的jar包地址和名稱猾警,以及AppMaster的全路徑名,以及hadoop的ip地址等信息
MyYarnClient的運(yùn)行結(jié)果idea輸出如下
打開yarn-web再看一下日志
成功實(shí)現(xiàn)了一個(gè)運(yùn)行在Yarn上的小程序隆敢!
分布式計(jì)算
以上发皿,我們完成了一個(gè)簡(jiǎn)單的程序運(yùn)行在yarn上,但其實(shí)這個(gè)應(yīng)用程序?qū)嶋H上只在一個(gè)節(jié)點(diǎn)上實(shí)際運(yùn)行了System.out.println
的代碼拂蝎,這就像去了一趟沃爾瑪雳窟,買了瓶礦泉水
yarn的優(yōu)勢(shì)是可以讓我們的計(jì)算程序分給多個(gè)機(jī)器節(jié)點(diǎn)去執(zhí)行,我們繼續(xù)改造一下AppMaster,實(shí)現(xiàn)如下功能:
- 添加兩個(gè)子任務(wù)封救,子任務(wù)分別在HDFS中創(chuàng)建一個(gè)文件夾
- 兩個(gè)子任務(wù)結(jié)束之后,再運(yùn)行輸出Hello World
ChildTask
首先編寫子任務(wù)捣作,我為了省事誉结,直接和AppMaster放一個(gè)項(xiàng)目中了,很簡(jiǎn)單的代碼券躁,創(chuàng)建一個(gè)/child/+服務(wù)器hostName的文件夾
public class ChildTask {
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
//獲取文件系統(tǒng)
Configuration configuration = new Configuration();
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
// hostName
String hostName = NetUtils.getHostname();
// 創(chuàng)建一個(gè)文件夾
fs.mkdirs(new Path("/child/"+hostName));
fs.close();
}
}
AppMaster
接下來要改造AppMaster惩坑,原來只是輸出Hello World,現(xiàn)在要向RM申請(qǐng)Container用來執(zhí)行子任務(wù)
container請(qǐng)求
首先申請(qǐng)Container需要向RM申請(qǐng)也拜,所以使用amRmClient即可發(fā)出請(qǐng)求
// 兩個(gè)子任務(wù)以舒,對(duì)應(yīng)兩個(gè)container
int childTaskNum = 2;
for (int i = 0; i < childTaskNum; i++) {
// 向rm申請(qǐng)一個(gè)1M內(nèi)存,1個(gè)CPU的資源容器
int memory = 1024;
int vCores = 1;
AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
amRmClient.addContainerRequest(containerRequest);
}
rm回調(diào)
申請(qǐng)成功后,當(dāng)rm分配出container時(shí)還要進(jìn)行相關(guān)回調(diào)處理慢哈,所以amRmClient定義時(shí)要加上一個(gè)回調(diào)處理類
// rm回調(diào)處理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 開啟am-rm client蔓钟,建立rm-am的通道,用于注冊(cè)AM, allocListener負(fù)責(zé)處理AM的響應(yīng)
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
RMCallBackHandler是rm響應(yīng)的處理器
private class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {
重點(diǎn)要實(shí)現(xiàn)兩個(gè)方法
- onContainersAllocated rm分配出containers的回調(diào)方法
- onContainersCompleted container運(yùn)行結(jié)束的方法
onContainersCompleted
這個(gè)方法主要是子任務(wù)運(yùn)行完成卵贱,我們?cè)贏ppMaster加幾個(gè)內(nèi)部變量控制所有子任務(wù)完成再輸出"Hello World"
// 充當(dāng)鎖
private Object lock = new Object();
// 任務(wù)個(gè)數(shù)
private int childTaskNum = 2;
// 已完成任務(wù)個(gè)數(shù)
private int childTaskCompletedNum = 0;
RMCallBackHandler的onContainersCompleted方法實(shí)現(xiàn)如下:
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
for (ContainerStatus status : statuses) {
synchronized (lock) {
System.out.println(++childTaskCompletedNum + " container completed");
// 子任務(wù)全部完成
if (childTaskCompletedNum == childTaskNum) {
lock.notify();
}
}
}
}
doRun方法修改為如下
private void doRun(AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient) throws InterruptedException {
// 申請(qǐng)兩個(gè)資源容器
for (int i = 0; i < childTaskNum; i++) {
// 向rm申請(qǐng)一個(gè)1M內(nèi)存,1個(gè)CPU的資源容器
int memory = 1024;
int vCores = 1;
AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
amRmClient.addContainerRequest(containerRequest);
}
synchronized (lock) {
// 等待子任務(wù)完成
lock.wait();
}
System.out.println("HELLO WORLD");
}
到此即可實(shí)現(xiàn)申請(qǐng)兩個(gè)container滥沫,兩個(gè)container運(yùn)行完后再執(zhí)行輸出"HELLO WORLD"
onContainersAllocated
這是RMCallBackHandler中要實(shí)現(xiàn)的重點(diǎn)方法,當(dāng)container分配成功后要做什么键俱?
思路很簡(jiǎn)單兰绣,container分配之后當(dāng)然要在對(duì)應(yīng)的容器上運(yùn)行我們的子任務(wù):ChildTask
,而子任務(wù)的運(yùn)行一定是在container所指定的NM節(jié)點(diǎn)上编振,所以我們要提前初始化一個(gè)NM客戶端:
加一個(gè)內(nèi)部屬性以供AppMaster整個(gè)類使用
NMClientAsyncImpl nmClientAsync;
此時(shí)AppMaster run
方法修改如下
public void run() {
try {
// rm回調(diào)處理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 開啟am-rm client缀辩,建立rm-am的通道,用于注冊(cè)AM, allocListener負(fù)責(zé)處理AM的響應(yīng)
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
amRmClient.init(new Configuration());
amRmClient.start();
String hostName = NetUtils.getHostname();
// 注冊(cè)至RM
amRmClient.registerApplicationMaster(hostName, -1, null);
// 初始化nmClient
nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
nmClientAsync.init(conf);
nmClientAsync.start();
// 運(yùn)行程序
doRun(amRmClient);
// 解除注冊(cè)
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
// am-rm客戶端關(guān)閉
amRmClient.stop();
// nm客戶端關(guān)閉
nmClientAsync.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
其中NMCallBackHandler是一個(gè)NM響應(yīng)的Callback踪央,可以通過實(shí)現(xiàn)其方法在container聲明周期加入一些邏輯
private class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
}
接下來就是實(shí)現(xiàn)onContainersAllocated
臀玄,代碼如下
@Override
public void onContainersAllocated(List<Container> containers) {
try {
for (Container container : containers) {
System.out.println("container allocated, Node=" + container.getNodeHttpAddress());
// 構(gòu)建AM<->NM客戶端并開啟
// 還是YarnClient containerLaunchContext那一套,這把直接去HDFS系統(tǒng)取文件杯瞻,因?yàn)楹蚘arnClient打包到一個(gè)jar上傳
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
//NameNode的ip和端口
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), conf, "root");
URI appUri = new URI("/user/root/hello/my-yarn-app.jar");
FileStatus fileStatus = fs.getFileStatus(new Path(appUri));
put("my-yarn-app.jar", LocalResource.newInstance(
URL.fromURI(appUri),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime()));
}};
Map<String, String> env = new HashMap<>();
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
List<String> commands = new ArrayList<String>() {{
// 傳入ip地址作為參數(shù)
add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx200m me.pq.yarn.ChildTask");
}};
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// nm節(jié)點(diǎn)啟動(dòng)container
nmClientAsync.startContainerAsync(container, containerLaunchContext);
}
} catch (Exception e) {
e.printStackTrace();
}
}
代碼就不詳解了镐牺,和YarnClient提交的ContainerLaunchContext寫法基本一致(最終運(yùn)行me.pq.yarn.ChildTask而不是MyAppMaster),最后使用NM客戶端的startContainerAsync
方法讓子任務(wù)運(yùn)行在NM上
值得一提的是我的ChildTask和AppMaster都在一個(gè)jar包下魁莉,所以這里不用上傳了睬涧,直接去HDFS取即可
測(cè)試
代碼寫完了,測(cè)試一下旗唁,mvn clean package然后執(zhí)行MyYarnClient main
方法
idea輸出
HDFS-WEB上看一下子任務(wù)的文件夾創(chuàng)建是否成功
可見文件夾創(chuàng)建出來了
YARN-WEB看一下AppMaster的日志
到此畦浓,實(shí)現(xiàn)了一個(gè)運(yùn)行在yarn上的簡(jiǎn)單分布式計(jì)算程序~