從源碼角度分析MapReduce運(yùn)作_一.準(zhǔn)備階段

一.目錄

本系列文章對(duì)Hadoop知識(shí)進(jìn)行復(fù)盤。
分為兩個(gè)階段佳吞,建立連接階段,提交job階段旭斥。

waitForCompletion()
 
submit();
 
// 1建立連接
   connect();  
      // 1)創(chuàng)建提交Job的代理
      new Cluster(getConfiguration());
         // (1)判斷是本地yarn還是遠(yuǎn)程
         initialize(jobTrackAddr, conf);
 
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
   // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
 
   // 2)獲取jobid 容达,并創(chuàng)建Job路徑
   JobID jobId = submitClient.getNewJobID();
 
   // 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir); 
   rUploader.uploadFiles(job, jobSubmitDir);
 
   // 4)計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
      maps = writeNewSplits(job, jobSubmitDir);
      input.getSplits(job);
 
   // 5)向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
   conf.writeXml(out);
 
   // 6)提交Job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

二.建立連接

客戶端提交MR程序后垂券,首先是運(yùn)行job.waitForCompletion(true)花盐,所以從waitForCompletion方法開始分析羡滑。

/**
   * Submit the job to the cluster and wait for it to finish. 
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    //提交作業(yè),重點(diǎn)
    }
    if (verbose) {
      monitorAndPrintJob();  // 監(jiān)控任務(wù)狀態(tài)
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

進(jìn)入submit方法

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    // 1.建立連接
    connect(); 
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
     // 2.提交job
        return submitter.submitJobInternal(Job.this, cluster); 
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

進(jìn)入connect方法

private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     //  1)創(chuàng)建提交job的代理 
                     return new Cluster(getConfiguration()); 
                   }
                 });
    }
  }

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    // 判斷是本地還是遠(yuǎn)程
    initialize(jobTrackAddr, conf); 
  }
  
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            // 如果是遠(yuǎn)程算芯,則創(chuàng)建yarn代理柒昏;如果是本地,則創(chuàng)建local代理
            clientProtocol = provider.create(conf);  
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

三.提交job

接著來(lái)看submitJobInternal方法熙揍,用來(lái)提交作業(yè)到集群上职祷,主要是以下幾個(gè)步驟:

  • 檢查作業(yè)的輸入輸出
  • 計(jì)算作業(yè)的分片
  • 設(shè)置job相關(guān)的計(jì)算信息
  • 復(fù)制需要的jar和配置信息到文件系統(tǒng)上
  • 提交作業(yè)以及監(jiān)控其狀態(tài)
/**
   * Internal method for submitting jobs to the system.
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
    //檢查輸出路徑是否存在,若存在拋出異常
    checkSpecs(job);    

    ...

    // 1)創(chuàng)建給集群提交數(shù)據(jù)的stage目錄
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    ....

    // 2)獲取JobID届囚,并創(chuàng)建job路徑
    JobID jobId = submitClient.getNewJobID(); 
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());//創(chuàng)建staging目錄下的JobID文件夾
    
    ...

    try {
      .....
     
      // 3)拷貝jar包到集群
      copyAndConfigureFiles(job, submitJobDir);
    
      .....

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      // 4)計(jì)算Jobmap端的切片有梆,生成切片規(guī)劃文件
      int maps = writeSplits(job, submitJobDir);  
      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      ...

      // 5)把job的配置信息寫入staging+JobID目錄下的job.xml
      writeConf(conf, submitJobFile); 
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 6)真正開始提交作業(yè),返回提交狀態(tài)
      status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); 
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)     
          jtFs.delete(submitJobDir, true);//刪除staging+JobID目錄下所有東西
      }
    }
  }

接著我們看writeSplits方法

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);  // 獲取新的切片意系,重點(diǎn)
    } else {  
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

進(jìn)入writeNewSplits方法

private <T extends InputSplit> int writeNewSplits(JobContext job, 
Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 
    List<InputSplit> splits = input.getSplits(job); // 獲取切片泥耀,重點(diǎn)
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

這里我們來(lái)看下FileInputFormat對(duì)應(yīng)的getSplits方法

/** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 獲取切片大小,重點(diǎn)

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  // 判斷文件是否切片蛔添,重點(diǎn)
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          ....
    }
    return splits;
  }

這里有兩個(gè)點(diǎn)需要注意下:

  • 切片的大小痰催,是通過Math.max(minSize, Math.min(maxSize, blockSize))獲取。默認(rèn)minSize的值為1迎瞧,maxSize的值為L(zhǎng)ong類型的最大值(即9223372036854775807)夸溶,blockSize是塊大小,故默認(rèn)切片大小為塊大小凶硅。
  • 文件是否切片缝裁,是通過((double) bytesRemaining)/splitSize > SPLIT_SLOP判斷。SPLIT_SLOP的值為1.1咏尝,如果剩余文件大小/切片大小>1.1压语,則切片。
    接著我們回到submitJobInternal方法中编检,查看submitClient.submitJob動(dòng)作。submitClient有本地和yarn兩種扰才,這里以yarn方式舉例允懂。
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    // 把job提交到ResourceManager上
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

最后就是監(jiān)控任務(wù)狀態(tài),等待返回任務(wù)執(zhí)行結(jié)果衩匣,參考monitorAndPrintJob方法蕾总。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市琅捏,隨后出現(xiàn)的幾起案子生百,更是在濱河造成了極大的恐慌,老刑警劉巖柄延,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蚀浆,死亡現(xiàn)場(chǎng)離奇詭異缀程,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)市俊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門杨凑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人摆昧,你說我怎么就攤上這事撩满。” “怎么了绅你?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵伺帘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我忌锯,道長(zhǎng)伪嫁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任汉规,我火速辦了婚禮礼殊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘针史。我一直安慰自己晶伦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布啄枕。 她就那樣靜靜地躺著婚陪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪频祝。 梳的紋絲不亂的頭發(fā)上泌参,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音常空,去河邊找鬼沽一。 笑死,一個(gè)胖子當(dāng)著我的面吹牛漓糙,可吹牛的內(nèi)容都是我干的铣缠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼昆禽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蝗蛙!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起醉鳖,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤捡硅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后盗棵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體壮韭,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡北发,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了泰涂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鲫竞。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖逼蒙,靈堂內(nèi)的尸體忽然破棺而出从绘,到底是詐尸還是另有隱情,我是刑警寧澤是牢,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布僵井,位于F島的核電站,受9級(jí)特大地震影響驳棱,放射性物質(zhì)發(fā)生泄漏批什。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一社搅、第九天 我趴在偏房一處隱蔽的房頂上張望驻债。 院中可真熱鬧,春花似錦形葬、人聲如沸合呐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)淌实。三九已至,卻和暖如春猖腕,著一層夾襖步出監(jiān)牢的瞬間拆祈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工倘感, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留放坏,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓老玛,卻偏偏與公主長(zhǎng)得像轻姿,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子逻炊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353