Yarn上運(yùn)行Hello World

前言

上文提到y(tǒng)arn類似一個(gè)分布式操作系統(tǒng)白胀,那么我們就可以自定義寫一些應(yīng)用在這個(gè)操作系統(tǒng)上運(yùn)行

當(dāng)然也不能太過隨意寫摊腋,我們要運(yùn)行在操作系統(tǒng)上就必然要遵守操作系統(tǒng)本身的規(guī)矩

Yarn

Yarn體系中压汪,用戶的主程序被稱作ApplicationMaster,當(dāng)然我們可以在ApplicationMaster中繼續(xù)向RM申請(qǐng)資源來執(zhí)行子程序,比如MapReduce中的MapTask和ReduceTask都屬于子程序糊昙。

這就好比我們平時(shí)寫java,在main方法主線程中可以創(chuàng)建子線程跑一些邏輯

  • linux/windows中弥激,我們創(chuàng)建java子線程不需要關(guān)心這個(gè)線程任務(wù)到底由哪個(gè)cpu完成进陡,任務(wù)交給操作系統(tǒng)來調(diào)度
  • 同理yarn中,ApplicationMaster申請(qǐng)創(chuàng)建出來的子程序微服,我們不用考慮程序運(yùn)行在哪臺(tái)機(jī)器上趾疚,任務(wù)交給yarn來調(diào)度

Hello World

接下來我們就嘗試寫一個(gè)簡(jiǎn)單應(yīng)用(輸出Hello World),運(yùn)行在yarn中以蕴,我們先不考慮使用子程序糙麦,直接在ApplicationMaster中輸出Hello World

ApplicationMaster

寫一個(gè)Hello World應(yīng)用再簡(jiǎn)單不過了:

public class MyAppMaster {
    public static void main(String[] args) {
        System.out.println("HELLO WORLD");
    }
}

但還是那句話,在yarn上運(yùn)行就要遵守人家的規(guī)矩丛肮,而yarn規(guī)定:

ApplicationMaster程序運(yùn)行前需要向RM注冊(cè)赡磅,運(yùn)行結(jié)束后需要取消注冊(cè)

也就是說程序不是你想跑就能跑,你得告訴人家資源管理器一聲宝与,否則人家隊(duì)伍怎么帶焚廊?

注冊(cè)的相關(guān)邏輯如果真自己寫還挺復(fù)雜,但好在hadoop為我們提供了客戶端工具习劫,我們引入依賴就方便了

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-yarn-client</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.3</version>
</dependency>

最終ApplicationMaster代碼如下(就是增加了注冊(cè)到RM和取消注冊(cè))

public class MyAppMaster {

    /**
     * AppMaster 程序入口
     * @param args 執(zhí)行參數(shù)
     */
    public static void main(String[] args) {
        MyAppMaster master = new MyAppMaster();
        master.run();
    }

    /**
     * AppMaster 運(yùn)行
     */
    public void run() {
        try {
            // 開啟am-rm client咆瘟,建立rm-am的通道,用于注冊(cè)AM
            AMRMClientAsync amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, null);
            amRmClient.init(new Configuration());
            amRmClient.start();
            String hostName = NetUtils.getHostname();
            // 注冊(cè)至RM
            amRmClient.registerApplicationMaster(hostName, -1, null);
            // 運(yùn)行程序
            doRun();
            // 解除注冊(cè)
            amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 實(shí)際運(yùn)行程序榜聂,就一個(gè)輸出
     */
    private void doRun() {
        System.out.println("HELLO WORLD");
    }
}

到此我們的應(yīng)用程序就寫完了搞疗,并且遵守了yarn的規(guī)矩

YarnClient

應(yīng)用程序?qū)懲炅耍趺窗殉绦虿渴鸬統(tǒng)arn上運(yùn)行吶须肆?

yarn又有規(guī)定了:

想讓你的程序在我的平臺(tái)上跑匿乃,需要你在RM上創(chuàng)建應(yīng)用,并指定好應(yīng)用名稱豌汇、運(yùn)行環(huán)境幢炸、程序(jar包)位置、啟動(dòng)命令拒贱、所需資源等

當(dāng)然這些數(shù)據(jù)的提交是有一定格式的宛徊,就像我們前端對(duì)接后端api,肯定是有一個(gè)json格式

索性我們不要考慮這復(fù)雜的格式逻澳,因?yàn)?code>hadoop-yarn-client依賴同樣幫我們封裝好了闸天,就好似有了sdk,寫寫代碼就可以和RM對(duì)接了斜做,而這個(gè)負(fù)責(zé)對(duì)接RM上傳應(yīng)用程序和啟動(dòng)參數(shù)的代碼苞氮,一般我們叫它:YarnClient

我們開始寫代碼實(shí)現(xiàn)這個(gè)YarnClient

1.配置

首先我們要與RM溝通創(chuàng)建應(yīng)用,首先要搞清楚RM在哪才能和它交互瓤逼,所以先配置一下RM的IP地址

Configuration conf = new Configuration();
// 設(shè)置rm所在的ip地址
conf.set("yarn.resourcemanager.hostname", "192.168.10.101");

其中192.168.10.101就是你運(yùn)行RM的機(jī)器IP地址

2.申請(qǐng)應(yīng)用

有了地址笼吟,就可以申請(qǐng)應(yīng)用库物,這一步直接使用hadoop-yarn-client依賴的工具即可

// 創(chuàng)建客戶端
YarnClient yarnClient = YarnClient.createYarnClient();
// 初始配置
yarnClient.init(conf);
// 開啟(建立連接)
yarnClient.start();
// 向RM發(fā)送請(qǐng)求創(chuàng)建應(yīng)用
YarnClientApplication application = yarnClient.createApplication();
// 準(zhǔn)備應(yīng)用提交上下文(RM要求你提交的信息格式)
ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
// 獲取分配的應(yīng)用id
ApplicationId appId = applicationSubmissionContext.getApplicationId();
log.info("appId: {}", appId);

其中ApplicationId就是RM給我們分配的應(yīng)用ID,ApplicationSubmissionContext就是我們要提交的應(yīng)用相關(guān)信息的載體

所以接下來就是給applicationSubmissionContext填充應(yīng)用名稱贷帮、運(yùn)行環(huán)境戚揭、程序(jar包)位置、啟動(dòng)命令撵枢、所需資源等信息再次提交給RM

3.設(shè)置應(yīng)用名稱

應(yīng)用名稱就起個(gè)"Hello World"

// 設(shè)置應(yīng)用名稱
applicationSubmissionContext.setApplicationName("Hello World");
4.設(shè)置程序(jar包)位置

這一步最重要民晒,你得告訴RM你得程序在哪,一般都存在HDFS上诲侮,因?yàn)槲覒兄ド蟼鞫婆埃瑢懥艘粋€(gè)本地傳送到HDFS的方法

// 即上一步寫的AppMaster jar包的本地位置
String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
String jarName = "my-yarn-app.jar";
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
    put(jarName, addLocalToHdfs(jarPath, jarName));
}};

其中addLocalToHdfs就是上傳到HDFS,并獲取HDFS路徑

private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
    //獲取文件系統(tǒng)
    Configuration configuration = new Configuration();
    //NameNode的ip和端口
    FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
    // 目標(biāo)路徑
    String dst =
            "hello/" + jarName;
    Path dstPath =
            new Path(fs.getHomeDirectory(), dst);
    // 上傳
    fs.copyFromLocalFile(new Path(jarPath), dstPath);
    FileStatus scFileStatus = fs.getFileStatus(dstPath);
    // 關(guān)閉
    fs.close();
    LocalResource scRsrc = LocalResource.newInstance(
                    URL.fromURI(dstPath.toUri()),
                    LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                    scFileStatus.getLen(), scFileStatus.getModificationTime());
    return scRsrc;
}

這一步需要引入hdfs-client依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>3.1.3</version>
</dependency>
5.設(shè)置程序環(huán)境

這一步同樣比較重要沟绪,我們需要設(shè)置程序運(yùn)行的環(huán)境刮便,jdk、yarn包什么的绽慈,設(shè)置了CLASSPATH

Map<String, String> env = new HashMap<>();
// 任務(wù)的運(yùn)行依賴jar包的準(zhǔn)備
StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
        .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
// yarn依賴包
for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
    classPathEnv.append(c.trim());
}
env.put("CLASSPATH", classPathEnv.toString());
6.設(shè)置啟動(dòng)腳本

這一步一樣至關(guān)重要恨旱,我們要告訴RM我們的程序怎么啟動(dòng),因?yàn)閅arn不光支持java包這一種程序坝疼,所以我們要寫java的啟動(dòng)命令搜贤,可以通過-Xms -Xmx等設(shè)置啟動(dòng)jvm參數(shù)

List<String> commands = new ArrayList<String>() {{
    add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
}};
7.配置Container啟動(dòng)上下文

資源、環(huán)境钝凶、啟動(dòng)命令等就組成了一個(gè)Container(AM的Container)啟動(dòng)的所需參數(shù)仪芒,把它們打包為container啟動(dòng)上下文,通過setAMContainerSpec設(shè)置到要提交的參數(shù)中

ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
        localResources, env, commands, null, null, null);
// 準(zhǔn)備am Container的運(yùn)行環(huán)境
applicationSubmissionContext.setAMContainerSpec(amContainer);
8.設(shè)置am程序所需硬件資源

準(zhǔn)備好了所有啟動(dòng)程序的信息耕陷,下一步就是告訴RM你這個(gè)AppMaster需要多少硬件資源掂名,這樣RM才能給你找合適的節(jié)點(diǎn)運(yùn)行你的程序,通過setResource設(shè)置到要提交的參數(shù)中

int memory = 1024;
int vCores = 2;
applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
9.提交作業(yè)

完事具備哟沫,提交給RM你的程序就會(huì)被跑起來了

yarnClient.submitApplication(applicationSubmissionContext);
完整代碼

YarnClient完整代碼如下

package me.pq.yarn;

/**
 * @Author pq217
 * @Date 2022/11/18 17:47
 * @Description
 */
public class MyYarnClient {

    private static Logger log = LoggerFactory.getLogger(MyYarnClient.class);

    public static void main(String[] args) {
        MyYarnClient client = new MyYarnClient();
        try {
            client.run();
        } catch (Exception e) {
            log.error("client run exception , please check log file.", e);
        }
    }

    /**
     * 客戶端運(yùn)行
     * @throws IOException
     * @throws YarnException
     * @throws URISyntaxException
     * @throws InterruptedException
     */
    public void run() throws IOException, YarnException, URISyntaxException, InterruptedException {
        /**=====1.配置=====**/
        Configuration conf = new Configuration();
        // 設(shè)置rm所在的ip地址
        conf.set("yarn.resourcemanager.hostname", "192.168.10.101");
        /**=====2.申請(qǐng)app=====**/
        // 創(chuàng)建YarnClient和ResourceManager進(jìn)行交互
        YarnClient yarnClient = YarnClient.createYarnClient();
        // 初始配置
        yarnClient.init(conf);
        // 開啟(建立連接)
        yarnClient.start();
        // 向RM發(fā)送請(qǐng)求創(chuàng)建應(yīng)用
        YarnClientApplication application = yarnClient.createApplication();
        // 準(zhǔn)備應(yīng)用提交上下文(RM要求你提交的信息格式)
        ApplicationSubmissionContext applicationSubmissionContext = application.getApplicationSubmissionContext();
        // 獲取分配的應(yīng)用id
        ApplicationId appId = applicationSubmissionContext.getApplicationId();
        log.info("appId: {}", appId);
        /**=====3.設(shè)置應(yīng)用名稱=====**/
        // 設(shè)置應(yīng)用名稱
        applicationSubmissionContext.setApplicationName("Hello World");
        /**=====4.準(zhǔn)備程序(jar包)=====**/
        String jarPath = "E:\\projects\\hadoop2\\MyYarnApp\\target\\my-yarn-app.jar";
        String jarName = "my-yarn-app.jar";
        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
            put(jarName, addLocalToHdfs(jarPath, jarName));
        }};
        /**=====5.準(zhǔn)備程序環(huán)境=====**/
        Map<String, String> env = new HashMap<>();
        // 任務(wù)的運(yùn)行依賴jar包的準(zhǔn)備
        StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
        // yarn依賴包
        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
            classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
            classPathEnv.append(c.trim());
        }
        env.put("CLASSPATH", classPathEnv.toString());

        /**=====6.準(zhǔn)備啟動(dòng)命令=====**/
        List<String> commands = new ArrayList<String>() {{
            add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx300m me.pq.yarn.MyAppMaster");
        }};

        /**=====7.構(gòu)造am container運(yùn)行資源+環(huán)境+腳本=====**/
        ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
                localResources, env, commands, null, null, null);
        // 準(zhǔn)備am Container的運(yùn)行環(huán)境
        applicationSubmissionContext.setAMContainerSpec(amContainer);
        /**=====8.設(shè)置am程序所需資源=====**/
        int memory = 1024;
        int vCores = 2;
        applicationSubmissionContext.setResource(Resource.newInstance(memory, vCores));
        /**=====9.提交并開始作業(yè)=====**/
        yarnClient.submitApplication(applicationSubmissionContext);
        /**=====10.查詢作業(yè)是否完成=====**/
        for (;;) {
            Thread.sleep(500);
            ApplicationReport applicationReport = yarnClient.getApplicationReport(appId);
            YarnApplicationState state = applicationReport.getYarnApplicationState();
            FinalApplicationStatus status = applicationReport.getFinalApplicationStatus();
            if (state.equals(YarnApplicationState.FINISHED)) {
                if (status.equals(FinalApplicationStatus.SUCCEEDED)) {
                    log.info("程序運(yùn)行成功!");
                    break;
                } else  {
                    log.error("程序運(yùn)行失敗!");
                    break;
                }
            } else if (state.equals(YarnApplicationState.FAILED) || state.equals(YarnApplicationState.KILLED) ) {
                log.error("程序運(yùn)行失敗!");
                break;
            }
            log.info("計(jì)算中...");
        }
    }

    /**
     * 上傳本地jar包到hdfs
     * @param jarPath
     * @param jarName
     * @throws IOException
     */
    private LocalResource addLocalToHdfs(String jarPath, String jarName) throws IOException, URISyntaxException, InterruptedException {
        //獲取文件系統(tǒng)
        Configuration configuration = new Configuration();
        //NameNode的ip和端口
        FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
        // 目標(biāo)路徑
        String dst =
                "hello/" + jarName;
        Path dstPath =
                new Path(fs.getHomeDirectory(), dst);
        // 上傳
        fs.copyFromLocalFile(new Path(jarPath), dstPath);
        FileStatus scFileStatus = fs.getFileStatus(dstPath);
        // 關(guān)閉
        fs.close();
        LocalResource scRsrc = LocalResource.newInstance(
                        URL.fromURI(dstPath.toUri()),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        scFileStatus.getLen(), scFileStatus.getModificationTime());
        return scRsrc;
    }

}

測(cè)試

應(yīng)用寫好了饺蔑,上傳應(yīng)用的client也寫好了,下面測(cè)一下

首先使用maven-assembly插件給程序打jar包

mvn clean package

其次嗜诀,本地idea直接運(yùn)行YarnClient的main方法

注意替換一下代碼中的jar包地址和名稱猾警,以及AppMaster的全路徑名,以及hadoop的ip地址等信息

MyYarnClient的運(yùn)行結(jié)果idea輸出如下

MyYarnClient

打開yarn-web再看一下日志

yarn-web

成功實(shí)現(xiàn)了一個(gè)運(yùn)行在Yarn上的小程序隆敢!

分布式計(jì)算

以上发皿,我們完成了一個(gè)簡(jiǎn)單的程序運(yùn)行在yarn上,但其實(shí)這個(gè)應(yīng)用程序?qū)嶋H上只在一個(gè)節(jié)點(diǎn)上實(shí)際運(yùn)行了System.out.println的代碼拂蝎,這就像去了一趟沃爾瑪雳窟,買了瓶礦泉水

yarn的優(yōu)勢(shì)是可以讓我們的計(jì)算程序分給多個(gè)機(jī)器節(jié)點(diǎn)去執(zhí)行,我們繼續(xù)改造一下AppMaster,實(shí)現(xiàn)如下功能:

  • 添加兩個(gè)子任務(wù)封救,子任務(wù)分別在HDFS中創(chuàng)建一個(gè)文件夾
  • 兩個(gè)子任務(wù)結(jié)束之后,再運(yùn)行輸出Hello World

ChildTask

首先編寫子任務(wù)捣作,我為了省事誉结,直接和AppMaster放一個(gè)項(xiàng)目中了,很簡(jiǎn)單的代碼券躁,創(chuàng)建一個(gè)/child/+服務(wù)器hostName的文件夾

public class ChildTask {

    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        //獲取文件系統(tǒng)
        Configuration configuration = new Configuration();
        //NameNode的ip和端口
        FileSystem fs  = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), configuration, "root");
        // hostName
        String hostName = NetUtils.getHostname();
        // 創(chuàng)建一個(gè)文件夾
        fs.mkdirs(new Path("/child/"+hostName));
        fs.close();
    }

}

AppMaster

接下來要改造AppMaster惩坑,原來只是輸出Hello World,現(xiàn)在要向RM申請(qǐng)Container用來執(zhí)行子任務(wù)

container請(qǐng)求

首先申請(qǐng)Container需要向RM申請(qǐng)也拜,所以使用amRmClient即可發(fā)出請(qǐng)求

// 兩個(gè)子任務(wù)以舒,對(duì)應(yīng)兩個(gè)container
int childTaskNum = 2;
for (int i = 0; i < childTaskNum; i++) {
    // 向rm申請(qǐng)一個(gè)1M內(nèi)存,1個(gè)CPU的資源容器
    int memory = 1024;
    int vCores = 1;
    AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
    amRmClient.addContainerRequest(containerRequest);
}
rm回調(diào)

申請(qǐng)成功后,當(dāng)rm分配出container時(shí)還要進(jìn)行相關(guān)回調(diào)處理慢哈,所以amRmClient定義時(shí)要加上一個(gè)回調(diào)處理類

// rm回調(diào)處理器
AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
// 開啟am-rm client蔓钟,建立rm-am的通道,用于注冊(cè)AM, allocListener負(fù)責(zé)處理AM的響應(yīng)
AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);

RMCallBackHandler是rm響應(yīng)的處理器

private class RMCallBackHandler extends AMRMClientAsync.AbstractCallbackHandler {

重點(diǎn)要實(shí)現(xiàn)兩個(gè)方法

  • onContainersAllocated rm分配出containers的回調(diào)方法
  • onContainersCompleted container運(yùn)行結(jié)束的方法
onContainersCompleted

這個(gè)方法主要是子任務(wù)運(yùn)行完成卵贱,我們?cè)贏ppMaster加幾個(gè)內(nèi)部變量控制所有子任務(wù)完成再輸出"Hello World"

// 充當(dāng)鎖
private Object lock = new Object();
// 任務(wù)個(gè)數(shù)
private int childTaskNum = 2;
// 已完成任務(wù)個(gè)數(shù)
private int childTaskCompletedNum = 0;

RMCallBackHandler的onContainersCompleted方法實(shí)現(xiàn)如下:

@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
    for (ContainerStatus status : statuses) {
        synchronized (lock) {
            System.out.println(++childTaskCompletedNum + " container completed");
            // 子任務(wù)全部完成
            if (childTaskCompletedNum == childTaskNum) {
                lock.notify();
            }
        }
    }
}

doRun方法修改為如下

private void doRun(AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient) throws InterruptedException {
    // 申請(qǐng)兩個(gè)資源容器
    for (int i = 0; i < childTaskNum; i++) {
        // 向rm申請(qǐng)一個(gè)1M內(nèi)存,1個(gè)CPU的資源容器
        int memory = 1024;
        int vCores = 1;
        AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(Resource.newInstance(memory, vCores), null, null, Priority.UNDEFINED);
        amRmClient.addContainerRequest(containerRequest);
    }
    synchronized (lock) {
        // 等待子任務(wù)完成
        lock.wait();
    }
    System.out.println("HELLO WORLD");
}

到此即可實(shí)現(xiàn)申請(qǐng)兩個(gè)container滥沫,兩個(gè)container運(yùn)行完后再執(zhí)行輸出"HELLO WORLD"

onContainersAllocated

這是RMCallBackHandler中要實(shí)現(xiàn)的重點(diǎn)方法,當(dāng)container分配成功后要做什么键俱?

思路很簡(jiǎn)單兰绣,container分配之后當(dāng)然要在對(duì)應(yīng)的容器上運(yùn)行我們的子任務(wù):ChildTask,而子任務(wù)的運(yùn)行一定是在container所指定的NM節(jié)點(diǎn)上编振,所以我們要提前初始化一個(gè)NM客戶端:
加一個(gè)內(nèi)部屬性以供AppMaster整個(gè)類使用

NMClientAsyncImpl nmClientAsync;

此時(shí)AppMaster run方法修改如下

public void run() {
    try {
        // rm回調(diào)處理器
        AMRMClientAsync.AbstractCallbackHandler rmCallBackHandler = new RMCallBackHandler();
        // 開啟am-rm client缀辩,建立rm-am的通道,用于注冊(cè)AM, allocListener負(fù)責(zé)處理AM的響應(yīng)
        AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, rmCallBackHandler);
        amRmClient.init(new Configuration());
        amRmClient.start();
        String hostName = NetUtils.getHostname();
        // 注冊(cè)至RM
        amRmClient.registerApplicationMaster(hostName, -1, null);
        // 初始化nmClient
        nmClientAsync = new NMClientAsyncImpl(new NMCallBackHandler());
        nmClientAsync.init(conf);
        nmClientAsync.start();
        // 運(yùn)行程序
        doRun(amRmClient);
        // 解除注冊(cè)
        amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "SUCCESS", null);
        // am-rm客戶端關(guān)閉
        amRmClient.stop();
        // nm客戶端關(guān)閉
        nmClientAsync.stop();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

其中NMCallBackHandler是一個(gè)NM響應(yīng)的Callback踪央,可以通過實(shí)現(xiàn)其方法在container聲明周期加入一些邏輯

private class NMCallBackHandler extends NMClientAsync.AbstractCallbackHandler {
}

接下來就是實(shí)現(xiàn)onContainersAllocated臀玄,代碼如下

@Override
public void onContainersAllocated(List<Container> containers) {
    try {
        for (Container container : containers) {
            System.out.println("container allocated, Node=" + container.getNodeHttpAddress());
            // 構(gòu)建AM<->NM客戶端并開啟
            // 還是YarnClient containerLaunchContext那一套,這把直接去HDFS系統(tǒng)取文件杯瞻,因?yàn)楹蚘arnClient打包到一個(gè)jar上傳
            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>() {{
                //NameNode的ip和端口
                FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.100:8020"), conf, "root");
                URI appUri = new URI("/user/root/hello/my-yarn-app.jar");
                FileStatus fileStatus = fs.getFileStatus(new Path(appUri));
                put("my-yarn-app.jar", LocalResource.newInstance(
                        URL.fromURI(appUri),
                        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
                        fileStatus.getLen(), fileStatus.getModificationTime()));
            }};
            Map<String, String> env = new HashMap<>();
            StringBuilder classPathEnv = new StringBuilder(ApplicationConstants.Environment.CLASSPATH.$$())
                    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
                classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
                classPathEnv.append(c.trim());
            }
            env.put("CLASSPATH", classPathEnv.toString());
            List<String> commands = new ArrayList<String>() {{
                // 傳入ip地址作為參數(shù)
                add(ApplicationConstants.Environment.JAVA_HOME.$$() + "/bin/java -Xmx200m me.pq.yarn.ChildTask");
            }};
            ContainerLaunchContext containerLaunchContext = ContainerLaunchContext.newInstance(
                    localResources, env, commands, null, null, null);
            // nm節(jié)點(diǎn)啟動(dòng)container
            nmClientAsync.startContainerAsync(container, containerLaunchContext);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

代碼就不詳解了镐牺,和YarnClient提交的ContainerLaunchContext寫法基本一致(最終運(yùn)行me.pq.yarn.ChildTask而不是MyAppMaster),最后使用NM客戶端的startContainerAsync方法讓子任務(wù)運(yùn)行在NM上

值得一提的是我的ChildTask和AppMaster都在一個(gè)jar包下魁莉,所以這里不用上傳了睬涧,直接去HDFS取即可

測(cè)試

代碼寫完了,測(cè)試一下旗唁,mvn clean package然后執(zhí)行MyYarnClient main方法

idea輸出

MyYarnClient

HDFS-WEB上看一下子任務(wù)的文件夾創(chuàng)建是否成功

HDFS-WEB

可見文件夾創(chuàng)建出來了

YARN-WEB看一下AppMaster的日志

YARN-WEB
AppMaster

到此畦浓,實(shí)現(xiàn)了一個(gè)運(yùn)行在yarn上的簡(jiǎn)單分布式計(jì)算程序~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市检疫,隨后出現(xiàn)的幾起案子讶请,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件夺溢,死亡現(xiàn)場(chǎng)離奇詭異论巍,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)风响,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門嘉汰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人状勤,你說我怎么就攤上這事鞋怀。” “怎么了持搜?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵密似,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我葫盼,道長(zhǎng)残腌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任剪返,我火速辦了婚禮废累,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脱盲。我一直安慰自己邑滨,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布钱反。 她就那樣靜靜地躺著掖看,像睡著了一般。 火紅的嫁衣襯著肌膚如雪面哥。 梳的紋絲不亂的頭發(fā)上哎壳,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音尚卫,去河邊找鬼归榕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛吱涉,可吹牛的內(nèi)容都是我干的刹泄。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼怎爵,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼特石!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鳖链,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤姆蘸,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逞敷,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡狂秦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了推捐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片故痊。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖玖姑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情慨菱,我是刑警寧澤焰络,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站符喝,受9級(jí)特大地震影響闪彼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜协饲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一畏腕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茉稠,春花似錦描馅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至膀篮,卻和暖如春嘹狞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背誓竿。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工磅网, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人筷屡。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓涧偷,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親速蕊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子嫂丙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容