《Hadoop-MapReduce源碼解析》之二: org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal


1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

  /**
   * Internal method for submitting jobs to the system.
   * The job submission process involves:
   *   1. Checking the input and output specifications of the job.
   *   1. 檢查作業(yè)輸入輸出規(guī)范
   *   2. Computing the InputSplits for the job.
   *   2. 計算作業(yè)的輸入分片
   *   3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
   *   3. 如有必要缎脾,請為作業(yè)的分布式緩存設置必要的記帳信息
   *   4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
   *   4. 將作業(yè)的jar和配置復制到分布式文件系統上的map reduce系統目錄中
   *   5. Submitting the job to the JobTracker and optionally monitoring it's status.
   *   5. 將作業(yè)提交給JobTracker,并可選擇監(jiān)視其狀態(tài)
   * Params:
   *   job – the configuration to submit cluster – the handle to the Cluster
   * Throws:
   *   ClassNotFoundException –
   *   InterruptedException –
   *   IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    // 驗證作業(yè)的輸出規(guī)范
    // 通常檢查輸出路徑是否已經存在,當它已經存在時拋出異常,這樣輸出就不會被覆蓋
    checkSpecs(job);

    // 除非明確關閉鸦难,否則Hadoop默認指定兩個資源,按類路徑的順序加載:
    // core-default.xml:hadoop的只讀默認值御吞。
    // core-site.xml:給定hadoop安裝的特定于站點的配置宽堆。
    Configuration conf = job.getConfiguration();

    // 從MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
    // 解析路徑中的任何符號鏈接
    // 解析后的uri添加到分布式緩存中:
    // MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
    // conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
    //       : archives + "," + uri.toString());
    addMRFrameworkToDistributedCache(conf);

    // 獲取放置作業(yè)特定文件的暫存目錄
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      // 設置提交任務的主機名稱和地址
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }

    // 這里的submitClient在后續(xù)解釋,即:LocalJobRunner或者YARNRunner
    // 創(chuàng)建Applicant唉韭,生成JobId夜涕,返回JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);

    // 提交作業(yè)的路徑(Path parent, String child),將兩個參數拼接為一個新路徑
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // 作業(yè)狀態(tài)
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      // 從與傳遞的路徑(作業(yè)文件目錄)相對應的名稱節(jié)點獲取委派令牌
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      // 從所有NAMENODE節(jié)點處獲取委派令牌
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      // 獲取Shuffle密鑰來授權Shuffle轉換
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      // 在Hadoop MapReduce中属愤,當進行數據溢出(spill)時女器,會將部分數據從內存中寫入磁盤以釋內存間
      // 為保證數據安全,當啟用加密中間數據溢出時住诸,最大ApplicationMaster(AM)嘗試次數設置為1
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      // 使用命令行選項-libjars驾胆、-files涣澡、-archives配置用戶的jobconf
      // 并上載和配置與傳遞的作業(yè)相關的文件、libjar丧诺、jobjar和歸檔文件
      copyAndConfigureFiles(job, submitJobDir);
      
      // 獲取作業(yè)conf文件暑塑,即:new Path(jobSubmitDir, "job.xml");
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      // 計算任務輸入的分片,并返回分片數量锅必,即map任務的數量
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);  
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // 如果計算出來的map數大于設置的或者默認的最大map數事格,拋出異常
      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
          MRJobConfig.DEFAULT_JOB_MAX_MAP);
      if (maxMaps >= 0 && maxMaps < maps) {
        throw new IllegalArgumentException("The number of map tasks " + maps +
            " exceeded limit " + maxMaps);
      }

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 將“作業(yè)提交到什么隊列”寫入job文件
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      // 在將jobconf復制到HDFS之前刪除jobtoken引用,因為任務不需要此設置;
      // 實際上它們可能會因此而中斷搞隐,因為引用將指向不同的作業(yè)驹愚。
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        // 添加DHFS tracking ids:跟蹤標識符,該標識符可用于在多個客戶端會話中關聯令牌的使用情況
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      // reservationId是全局唯一作業(yè)的保留標識符劣纲,如果作業(yè)沒有任何關聯的保留逢捺,則為null
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      // 將submitJobFile寫到JobTracker的文件系統中去
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 正式提交Job到Yarn或者本地
      status = submitClient.submitJob(  // 具體見Hadoop-MapReduce源碼解析》之三
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        // 返回Job提交后的狀態(tài)
        return status;
      } else {
        // 任務啟動失敗
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          // 清空暫存目錄
          jtFs.delete(submitJobDir, true);

      }
    }
  }
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市癞季,隨后出現的幾起案子劫瞳,更是在濱河造成了極大的恐慌,老刑警劉巖绷柒,帶你破解...
    沈念sama閱讀 218,386評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件志于,死亡現場離奇詭異,居然都是意外死亡废睦,警方通過查閱死者的電腦和手機伺绽,發(fā)現死者居然都...
    沈念sama閱讀 93,142評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嗜湃,“玉大人奈应,你說我怎么就攤上這事」号” “怎么了杖挣?”我有些...
    開封第一講書人閱讀 164,704評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長刚陡。 經常有香客問我惩妇,道長,這世上最難降的妖魔是什么橘荠? 我笑而不...
    開封第一講書人閱讀 58,702評論 1 294
  • 正文 為了忘掉前任屿附,我火速辦了婚禮,結果婚禮上哥童,老公的妹妹穿的比我還像新娘挺份。我一直安慰自己,他們只是感情好贮懈,可當我...
    茶點故事閱讀 67,716評論 6 392
  • 文/花漫 我一把揭開白布匀泊。 她就那樣靜靜地躺著优训,像睡著了一般。 火紅的嫁衣襯著肌膚如雪各聘。 梳的紋絲不亂的頭發(fā)上揣非,一...
    開封第一講書人閱讀 51,573評論 1 305
  • 那天,我揣著相機與錄音躲因,去河邊找鬼早敬。 笑死,一個胖子當著我的面吹牛大脉,可吹牛的內容都是我干的搞监。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼镰矿,長吁一口氣:“原來是場噩夢啊……” “哼琐驴!你這毒婦竟也來了?” 一聲冷哼從身側響起秤标,我...
    開封第一講書人閱讀 39,230評論 0 276
  • 序言:老撾萬榮一對情侶失蹤绝淡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后苍姜,有當地人在樹林里發(fā)現了一具尸體牢酵,經...
    沈念sama閱讀 45,680評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,873評論 3 336
  • 正文 我和宋清朗相戀三年怖现,在試婚紗的時候發(fā)現自己被綠了茁帽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片玉罐。...
    茶點故事閱讀 39,991評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡屈嗤,死狀恐怖,靈堂內的尸體忽然破棺而出吊输,到底是詐尸還是另有隱情饶号,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評論 5 346
  • 正文 年R本政府宣布季蚂,位于F島的核電站茫船,受9級特大地震影響,放射性物質發(fā)生泄漏扭屁。R本人自食惡果不足惜算谈,卻給世界環(huán)境...
    茶點故事閱讀 41,329評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望料滥。 院中可真熱鬧然眼,春花似錦、人聲如沸葵腹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鲸匿,卻和暖如春爷怀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背带欢。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評論 1 270
  • 我被黑心中介騙來泰國打工运授, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人乔煞。 一個月前我還...
    沈念sama閱讀 48,158評論 3 370
  • 正文 我出身青樓徒坡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瘤缩。 傳聞我的和親對象是個殘疾皇子喇完,可洞房花燭夜當晚...
    茶點故事閱讀 44,941評論 2 355

推薦閱讀更多精彩內容