MapReducer任務(wù)在到Y(jié)arn上運(yùn)行流程分析

  1. 以WordCount為例
public class WordCount
{
  public static void main(String[] args)
    throws Exception
  {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    //開始提交
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      this.result.set(sum);
      context.write(key, this.result);
    }
  }

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
    {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        this.word.set(itr.nextToken());
        context.write(this.word, one);
      }
    }
  }
}

2.Job類分析

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    //開始提交
    }
    if (verbose) {
      monitorAndPrintJob();   //提交成功的話民逼,監(jiān)控打印任務(wù)進(jìn)度
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

4.Job submit方法

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();        //根據(jù)配置信息向resourcemanager建立聯(lián)系
    //得到提交器
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      //提交器提交任務(wù)
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  1. submitter.submitJobInternal(Job.this, cluster);
 //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    //會(huì)使用Rpc與resoucemanager建立聯(lián)系,得到JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //任務(wù)在yarn上的目錄,存放jar包資源文件等
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
      //把任務(wù)的jar包復(fù)制放到HDFS上的任務(wù)目錄下
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //寫分片信息
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
     //設(shè)置任務(wù)的隊(duì)列椅文,每個(gè)用戶會(huì)有一個(gè)隊(duì)列
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      //所有的配置文件寫到任務(wù)目錄下
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      //真正開始提交任務(wù)
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }

5 寫分片信息 writeSplits(job, submitJobDir);

     private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //對(duì)文件進(jìn)行切分,得到分片信息
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    //對(duì)分片信息進(jìn)行排序,大的分片先運(yùn)行
    Arrays.sort(array, new SplitComparator());
    //將分片元數(shù)據(jù)信息寫到文件中,文件在HDFS的任務(wù)目錄下
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
  

6.org.apache.hadoop.mapred.YARNRunner由這個(gè)類來進(jìn)行提交

 @Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    設(shè)置ApplicatonMaster的初始化信息,包括jar包,資源糙麦,啟動(dòng)命令等,每個(gè)應(yīng)用都有一個(gè)ApplicationMaster
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
     //再交由底層的,使用rpc協(xié)議提交到resourcemanager
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);
      //整個(gè)后面丛肮,判斷appMaster的提交狀態(tài)
      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

7.ApplicationMaster的初始化信息

public ApplicationSubmissionContext createApplicationSubmissionContext(
  Configuration jobConf,
  String jobSubmitDir, Credentials ts) throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(
    conf.getInt(
        MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
        )
    );
capability.setVirtualCores(
    conf.getInt(
        MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
        )
    );
LOG.debug("AppMaster capability = " + capability);

// Setup LocalResources
Map<String, LocalResource> localResources =
    new HashMap<String, LocalResource>();

Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);

URL yarnUrlForJobSubmitDir = ConverterUtils
    .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
        .resolvePath(
            defaultFileContext.makeQualified(new Path(jobSubmitDir))));
LOG.debug("Creating setup context, jobSubmitDir url is "
    + yarnUrlForJobSubmitDir);

localResources.put(MRJobConfig.JOB_CONF_FILE,
    createApplicationResource(defaultFileContext,
        jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
  Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
  LocalResource rc = createApplicationResource(
      FileContext.getFileContext(jobJarPath.toUri(), jobConf),
      jobJarPath,
      LocalResourceType.PATTERN);
  String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
      JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
  rc.setPattern(pattern);
  localResources.put(MRJobConfig.JOB_JAR, rc);
} else {
  // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
  // mapreduce jar itself which is already on the classpath.
  LOG.info("Job jar is not present. "
      + "Not adding any jar to the list of resources.");
}

// TODO gross hack
for (String s : new String[] {
    MRJobConfig.JOB_SPLIT,
    MRJobConfig.JOB_SPLIT_METAINFO }) {
  localResources.put(
      MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
      createApplicationResource(defaultFileContext,
          new Path(jobSubmitDir, s), LocalResourceType.FILE));
}

// Setup security tokens
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
    + "/bin/java");

Path amTmpDir =
    new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + amTmpDir);
MRApps.addLog4jSystemProperties(null, vargs, conf);

// Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
    MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
    MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
    MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
    MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);

// Add AM admin command opts before user command opts
// so that it can be overridden by user
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
    MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
vargs.add(mrAppMasterAdminOptions);

// Add AM user command opts
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
    MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
vargs.add(mrAppMasterUserOptions);

if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
    MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
  final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
      MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
  if (profileParams != null) {
    vargs.add(String.format(profileParams,
        ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
            + TaskLog.LogName.PROFILE));
  }
}

vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDERR);


Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
  mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());

LOG.debug("Command to launch container for ApplicationMaster is : "
    + mergedCommand);

// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);

// Shell
environment.put(Environment.SHELL.name(),
    conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL,
        MRJobConfig.DEFAULT_SHELL));

// Add the container working directory in front of LD_LIBRARY_PATH
MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
    MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);

// Setup the environment variables for Admin first
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ENV), conf);

// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);

Map<ApplicationAccessType, String> acls
    = new HashMap<ApplicationAccessType, String>(2);
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
    MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
    MRJobConfig.JOB_ACL_MODIFY_JOB,
    MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));

// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
    ContainerLaunchContext.newInstance(localResources, environment,
      vargsFinal, null, securityTokens, acls);

Collection<String> tagsFromConf =
    jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext =
    recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appContext.setApplicationId(applicationId);                // ApplicationId
appContext.setQueue(                                       // Queue name
    jobConf.get(JobContext.QUEUE_NAME,
    YarnConfiguration.DEFAULT_QUEUE_NAME));
// add reservationID if present
ReservationId reservationID = null;
try {
  reservationID =
      ReservationId.parseReservationId(jobConf
          .get(JobContext.RESERVATION_ID));
} catch (NumberFormatException e) {
  // throw exception as reservationid as is invalid
  String errMsg =
      "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
          + " specified for the app: " + applicationId;
  LOG.warn(errMsg);
  throw new IOException(errMsg);
}
if (reservationID != null) {
  appContext.setReservationID(reservationID);
  LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
      + " to queue:" + appContext.getQueue() + " with reservationId:"
      + appContext.getReservationID());
}
appContext.setApplicationName(                             // Job name
    jobConf.get(JobContext.JOB_NAME,
    YarnConfiguration.DEFAULT_APPLICATION_NAME));
appContext.setCancelTokensWhenComplete(
    conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
appContext.setAMContainerSpec(amContainer);         // AM Container
appContext.setMaxAppAttempts(
    conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);

// set labels for the AM container request if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression
    && amNodelabelExpression.trim().length() != 0) {
  ResourceRequest amResourceRequest =
      recordFactory.newRecordInstance(ResourceRequest.class);
  amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
  amResourceRequest.setResourceName(ResourceRequest.ANY);
  amResourceRequest.setCapability(capability);
  amResourceRequest.setNumContainers(1);
  amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
  appContext.setAMContainerResourceRequest(amResourceRequest);
}
// set labels for the Job containers
appContext.setNodeLabelExpression(jobConf
    .get(JobContext.JOB_NODE_LABEL_EXP));

appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
  appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}

return appContext;

8.ResourceManager收到請(qǐng)求后,通知nodemanager赡磅,運(yùn)行AppMaster,調(diào)度器會(huì)給nodemanager分配Container宝与,讓nodemanager執(zhí)行AppMaster焚廊,
AppMaster啟動(dòng)之后冶匹,先向resourcemanager注冊(cè)自己,resourcemanager,才能知道任務(wù)的運(yùn)行信息节值,然后向resourcemanager請(qǐng)求分配Container,Container由調(diào)度器分配徙硅,會(huì)得到Container的內(nèi)存,cpu以及在哪個(gè)節(jié)點(diǎn)上搞疗。

resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000004 of capacity <memory:1024, vCores:1> on host centos-3:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation

AppMaster得到Container信息后指定nodemanager啟動(dòng)Container去運(yùn)行任務(wù)嗓蘑,AppMaster會(huì)監(jiān)控Container任務(wù)的運(yùn)行情況,同時(shí)向ResourceManger報(bào)告任務(wù)信息.
每個(gè)nodemanager會(huì)向resoucemanager發(fā)送心跳信息匿乃,resourcemanger會(huì)根據(jù)所有節(jié)點(diǎn)的心跳知道整個(gè)集群的資源桩皿,這樣調(diào)度器才能合理的分配Container。
每個(gè)AppMaster幢炸,任務(wù)完成后會(huì)向resourcemanage取消注冊(cè)泄隔。這樣一個(gè)任務(wù)就完成了。
9.分析resourcemanager任務(wù)日志,分配Container信息全部在resourcemanager節(jié)點(diǎn)的日志信息中宛徊,這里只截了第一個(gè)Container信息佛嬉。


resourcemanager.ClientRMService: Allocated new applicationId: 5
resourcemanager.ClientRMService: Application with id 5 submitted by user root
resourcemanager.rmapp.RMAppImpl: Storing application with id application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW to NEW_SAVING
resourcemanager.RMAuditLogger: USER=root    IP=172.31.109.168   OPERATION=Submit Application Request    TARGET=ClientRMService  RESULT=SUCCESS  APPID=application_1515076284174_0005
resourcemanager.recovery.RMStateStore: Storing info for app: application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW_SAVING to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Accepted application application_1515076284174_0005 from user: root, in queue: default, currently num of applications: 1
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from SUBMITTED to ACCEPTED
resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1515076284174_0005_000001
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from NEW to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1515076284174_0005_000001 to scheduler from user: root
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from SUBMITTED to SCHEDULED
resourcemanager.rmcontainer.RMContainerImpl: container_1515076284174_0005_01_000001 Container Transitioned from NEW to ALLOCATED
resourcemanager.RMAuditLogger: USER=root    OPERATION=AM Allocated Container    TARGET=SchedulerApp RESULT=SUCCESS  APPID=application_1515076284174_0005    CONTAINERID=container_1515076284174_0005_01_000001
//可以看到container編號(hào)的尾號(hào)是00001證明這是第一個(gè)Container容器,在機(jī)器Centos-2上運(yùn)行闸天。
**resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000001 of capacity <memory:1024, vCores:1> on host centos-2:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation**

10.我運(yùn)行程序的時(shí)候有6個(gè)map,默認(rèn)一個(gè)reduce暖呕,再加上第一個(gè)AppMaster運(yùn)行的Container一共分配了8個(gè)Container,第一個(gè)Container在Centos-2上分配苞氮,其它的7個(gè)Container都分配在了Centos-3上了湾揽,可以從resourcemanage日志中看出來×鳎可以在Centos-3上驗(yàn)證你的想法


image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末库物,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子贷帮,更是在濱河造成了極大的恐慌戚揭,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件撵枢,死亡現(xiàn)場(chǎng)離奇詭異毫目,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)诲侮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來箱蟆,“玉大人沟绪,你說我怎么就攤上這事】詹拢” “怎么了绽慈?”我有些...
    開封第一講書人閱讀 157,490評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵恨旱,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我坝疼,道長(zhǎng)搜贤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,521評(píng)論 1 284
  • 正文 為了忘掉前任钝凶,我火速辦了婚禮仪芒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘耕陷。我一直安慰自己掂名,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,627評(píng)論 6 386
  • 文/花漫 我一把揭開白布哟沫。 她就那樣靜靜地躺著饺蔑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嗜诀。 梳的紋絲不亂的頭發(fā)上猾警,一...
    開封第一講書人閱讀 49,842評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音隆敢,去河邊找鬼发皿。 笑死,一個(gè)胖子當(dāng)著我的面吹牛筑公,可吹牛的內(nèi)容都是我干的雳窟。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼匣屡,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼封救!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起捣作,我...
    開封第一講書人閱讀 37,741評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤誉结,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后券躁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體惩坑,經(jīng)...
    沈念sama閱讀 44,203評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,534評(píng)論 2 327
  • 正文 我和宋清朗相戀三年也拜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了以舒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,673評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡慢哈,死狀恐怖蔓钟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情卵贱,我是刑警寧澤滥沫,帶...
    沈念sama閱讀 34,339評(píng)論 4 330
  • 正文 年R本政府宣布侣集,位于F島的核電站,受9級(jí)特大地震影響兰绣,放射性物質(zhì)發(fā)生泄漏世分。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,955評(píng)論 3 313
  • 文/蒙蒙 一缀辩、第九天 我趴在偏房一處隱蔽的房頂上張望臭埋。 院中可真熱鬧,春花似錦雌澄、人聲如沸斋泄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽炫掐。三九已至,卻和暖如春睬涧,著一層夾襖步出監(jiān)牢的瞬間募胃,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工畦浓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留痹束,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,394評(píng)論 2 360
  • 正文 我出身青樓讶请,卻偏偏與公主長(zhǎng)得像祷嘶,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夺溢,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,562評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容