distributedshell yarn編程指南

hadoop yarn是一個獨立的調(diào)度框架寥假,我們自己也可以通用yarn提供的api編寫程序?qū)⑽覀冏约旱膶懙某绦蛴脃arn來調(diào)度運行

hadoop 官提供了一個distributedshell yarn程序,該程序用途是實現(xiàn)使用yarn調(diào)度框架執(zhí)行shell濒持,Hadoop提供這個程序的目的是為了通過這個簡單的例子說明如果自己寫一個yarn程序
具體代碼地址:
https://github.com/apache/hadoop/tree/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell

編寫yarn程序一般需要

  • 編寫一個客戶端,客戶端定義了啟動ApplicationMaster的方式寺滚,提交application到RM
  • 編寫自己的ApplicationMaster柑营,在ApplicationMaster中創(chuàng)建與RM,NN交互的客戶端用于向RM申請資源并且在NN中啟動容器運行任務

接口

底層接口

ApplicationClientProtocol
clientsResourceManager之間用于提交/中斷job和獲取 application ,集群metrics ,node queueACLs信息的底層協(xié)議

ApplicationMasterProtocol
AM和RM通信的底層協(xié)議村视,ApplicationMasterProtocol.allocate用于AM和RM心跳

ContainerManagementProtocol
AM和NN通信的底層協(xié)議官套,用于啟動和停止容器以及獲取運行容器的狀態(tài)信息

高層接口

Client<-->ResourceManager
使用 YarnClient 對象

ApplicationMaster<-->ResourceManager

用于向RM申請Container,使用AMRMClientAsync對象, AMRMClientAsync.CallbackHandler 用于異步事件處理

ApplicationMaster<-->NodeManager
用于在NodeManager上啟動容器蚁孔,使用NMClientAsyncNodeManager通信,NMClientAsync.CallbackHandler用于異步事件處理

distributedshell

下面我們來看一下hadoop的這個例子程序distributedshell是怎么編寫的

編寫Client

org.apache.hadoop.yarn.applications.distributedshell.Client這個是客戶端入口程序用于和RM交互

  1. 程序首先初始化了YarnClient
  yarnClient = YarnClient.createYarnClient();
  yarnClient.init(conf);

然后調(diào)用yarnClient.createApplication方法創(chuàng)建App奶赔,獲取application id,底層api使用的是ApplicationClientProtocol

  // Get a new application id
    YarnClientApplication app = yarnClient.createApplication();
    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

根據(jù)請求RM獲取到的application id 構(gòu)造ApplicationSubmissionContext,ApplicationSubmissionContext代表了RM啟動ApplicationMaster所需要的信息,客戶端需要在這個上下文中設置如下信息:

  • Application 信息: id, name
  • Queue, priority 信息:提交application到哪個隊列, 優(yōu)先級.
  • User: 提交application的用戶
  • ContainerLaunchContext: AM將要運行的容器的所有信息的定義杠氢,比如Local Resources (binaries, jars, files etc.), Environment settings (CLASSPATH etc.), 需要執(zhí)行的Command 和 security Tokens (RECT).
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
...
//設置appName
 appContext.setApplicationName(appName);
...
//準備Local Resources 站刑,Environment ,和Command
...
// 通過準備的信息構(gòu)造 application master的 ContainerLaunchContext
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null);
...
//設置啟動資源信息
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
//設置ContainerLaunchContext
appContext.setAMContainerSpec(amContainer);
//設置優(yōu)先級
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
//設置隊列
appContext.setQueue(amQueue);
//提交應用
yarnClient.submitApplication(appContext);

舉個例子幫助理解
假如以test用戶運行hadoop-yarn-applications-distributedshell, 運行命令如下:

hadoop jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar  -queue root.download -shell_script /tmp/a.sh

假設yarnClient.createApplication()

  • 申請到的appid為 application_1576067711791_1132781 鼻百,
  • Client會將 -jar參數(shù) 傳過來的jar路徑即hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar作為為本地資源(Local Resources)绞旅,放到test用戶的如下hdfs 路徑
/user/test/DistributedShell/application_1576067711791_1132781/AppMaster.jar
  • 對應的執(zhí)行腳本/tmp/a.sh也會放到對于hadoop家目錄對于application 的hdfs路徑下:
/user/test/DistributedShell/application_1576067711791_1132781/ExecScript.sh

其中運行ApplicationMastercomond為:

org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
客戶端與RM交互圖
Client<->RM
ApplicationSubmissionContext結(jié)構(gòu)圖
image.png

編寫 ApplicationMaster (AM)

AM使用ApplicationAttemptId與RM交互
可以從傳進來的env環(huán)境變量中獲取 Container信息,進一步獲取ApplicationAttemptId

      ContainerId containerId = ConverterUtils.toContainerId(envs
          .get(Environment.CONTAINER_ID.name()));
      appAttemptID = containerId.getApplicationAttemptId();

在AM完全初始化自身之后温艇,我們可以啟動兩個客戶端:一個與RM通信因悲,一個與NM通信。并設置相關的事件處理函數(shù)

    //與RM通信并設置相關的事件處理函數(shù)
    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();
    
    //與NM通信并設置相關的事件處理函數(shù)
    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(conf);
    nmClientAsync.start();

AM必須向RM發(fā)出心跳信息

// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
        appMasterTrackingUrl);

AM向RM申請container資源

// Dump out information about cluster capability as seen by the
    // resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
    
    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerVirtualCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
          + maxVCores);
      containerVirtualCores = maxVCores;
    }

    List<Container> previousAMRunningContainers =
        response.getContainersFromPreviousAttempts();
    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
      + " previous attempts' running containers on AM registration.");
    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

    int numTotalContainersToRequest =
        numTotalContainers - previousAMRunningContainers.size();
    // Setup ask for containers from RM
    // Send request for containers to RM
    // Until we get our fully allocated quota, we keep on polling RM for
    // containers
    // Keep looping until all the containers are launched and shell script
    // executed on them ( regardless of success/failure).
   //  申請啟動container
    for (int i = 0; i < numTotalContainersToRequest; ++i) {
      ContainerRequest containerAsk = setupContainerAskForRM();
      amRMClient.addContainerRequest(containerAsk);
    }
    numRequestedContainers.set(numTotalContainers);

AMRMClientAsync.CallbackHandler

onContainersAllocated回調(diào)函數(shù)啟動LaunchContainerRunnable線程執(zhí)行

        //containerListener是 containerListener = createNMCallbackHandler();
//containerListener是NMCallbackHandler類型的
        LaunchContainerRunnable runnableLaunchContainer =
            new LaunchContainerRunnable(allocatedContainer, containerListener);
        Thread launchThread = new Thread(runnableLaunchContainer);

LaunchContainerRunnable

LaunchContainerRunnablerun方法里面構(gòu)造了需要在container中運行shellContainerLaunchContext,并且綁定containerListener回調(diào)函數(shù)勺爱,然后使用nmClientAsync 異步啟動container

      //封裝用于啟動shell腳本的ctx
      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
      //注冊NMCallbackHandler回調(diào)函數(shù)
      containerListener.addContainer(container.getId(), container);
      //使用nmClientAysnc異步啟動container
      nmClientAsync.startContainerAsync(container, ctx);

整體交互流程圖

yarn 交互
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末晃琳,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子邻寿,更是在濱河造成了極大的恐慌蝎土,老刑警劉巖视哑,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绣否,死亡現(xiàn)場離奇詭異,居然都是意外死亡挡毅,警方通過查閱死者的電腦和手機蒜撮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人段磨,你說我怎么就攤上這事取逾。” “怎么了苹支?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵砾隅,是天一觀的道長。 經(jīng)常有香客問我债蜜,道長晴埂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任寻定,我火速辦了婚禮儒洛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘狼速。我一直安慰自己琅锻,他們只是感情好,可當我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布向胡。 她就那樣靜靜地躺著恼蓬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪僵芹。 梳的紋絲不亂的頭發(fā)上滚秩,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機與錄音淮捆,去河邊找鬼郁油。 笑死,一個胖子當著我的面吹牛攀痊,可吹牛的內(nèi)容都是我干的桐腌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼苟径,長吁一口氣:“原來是場噩夢啊……” “哼案站!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起棘街,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蟆盐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后遭殉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體石挂,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年险污,在試婚紗的時候發(fā)現(xiàn)自己被綠了痹愚。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片富岳。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖拯腮,靈堂內(nèi)的尸體忽然破棺而出窖式,到底是詐尸還是另有隱情,我是刑警寧澤动壤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布萝喘,位于F島的核電站,受9級特大地震影響琼懊,放射性物質(zhì)發(fā)生泄漏蜒灰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一肩碟、第九天 我趴在偏房一處隱蔽的房頂上張望强窖。 院中可真熱鬧,春花似錦削祈、人聲如沸翅溺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咙崎。三九已至,卻和暖如春吨拍,著一層夾襖步出監(jiān)牢的瞬間褪猛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工羹饰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留伊滋,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓队秩,卻偏偏與公主長得像笑旺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子馍资,可洞房花燭夜當晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容