一.目錄
本系列文章對(duì)Hadoop知識(shí)進(jìn)行復(fù)盤。
分為兩個(gè)階段佳吞,建立連接階段,提交job階段旭斥。
waitForCompletion()
submit();
// 1建立連接
connect();
// 1)創(chuàng)建提交Job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn還是遠(yuǎn)程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid 容达,并創(chuàng)建Job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交Job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
二.建立連接
客戶端提交MR程序后垂券,首先是運(yùn)行job.waitForCompletion(true)花盐,所以從waitForCompletion方法開始分析羡滑。
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit(); //提交作業(yè),重點(diǎn)
}
if (verbose) {
monitorAndPrintJob(); // 監(jiān)控任務(wù)狀態(tài)
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
進(jìn)入submit方法
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 1.建立連接
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 2.提交job
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
進(jìn)入connect方法
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
// 1)創(chuàng)建提交job的代理
return new Cluster(getConfiguration());
}
});
}
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
// 判斷是本地還是遠(yuǎn)程
initialize(jobTrackAddr, conf);
}
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
// 如果是遠(yuǎn)程算芯,則創(chuàng)建yarn代理柒昏;如果是本地,則創(chuàng)建local代理
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
三.提交job
接著來(lái)看submitJobInternal方法熙揍,用來(lái)提交作業(yè)到集群上职祷,主要是以下幾個(gè)步驟:
- 檢查作業(yè)的輸入輸出
- 計(jì)算作業(yè)的分片
- 設(shè)置job相關(guān)的計(jì)算信息
- 復(fù)制需要的jar和配置信息到文件系統(tǒng)上
- 提交作業(yè)以及監(jiān)控其狀態(tài)
/**
* Internal method for submitting jobs to the system.
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//檢查輸出路徑是否存在,若存在拋出異常
checkSpecs(job);
...
// 1)創(chuàng)建給集群提交數(shù)據(jù)的stage目錄
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
....
// 2)獲取JobID届囚,并創(chuàng)建job路徑
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());//創(chuàng)建staging目錄下的JobID文件夾
...
try {
.....
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
.....
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 4)計(jì)算Jobmap端的切片有梆,生成切片規(guī)劃文件
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
...
// 5)把job的配置信息寫入staging+JobID目錄下的job.xml
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// 6)真正開始提交作業(yè),返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);//刪除staging+JobID目錄下所有東西
}
}
}
接著我們看writeSplits方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir); // 獲取新的切片意系,重點(diǎn)
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
進(jìn)入writeNewSplits方法
private <T extends InputSplit> int writeNewSplits(JobContext job,
Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job); // 獲取切片泥耀,重點(diǎn)
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
這里我們來(lái)看下FileInputFormat對(duì)應(yīng)的getSplits方法
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 獲取切片大小,重點(diǎn)
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { // 判斷文件是否切片蛔添,重點(diǎn)
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
....
}
return splits;
}
這里有兩個(gè)點(diǎn)需要注意下:
- 切片的大小痰催,是通過Math.max(minSize, Math.min(maxSize, blockSize))獲取。默認(rèn)minSize的值為1迎瞧,maxSize的值為L(zhǎng)ong類型的最大值(即9223372036854775807)夸溶,blockSize是塊大小,故默認(rèn)切片大小為塊大小凶硅。
- 文件是否切片缝裁,是通過((double) bytesRemaining)/splitSize > SPLIT_SLOP判斷。SPLIT_SLOP的值為1.1咏尝,如果剩余文件大小/切片大小>1.1压语,則切片。
接著我們回到submitJobInternal方法中编检,查看submitClient.submitJob動(dòng)作。submitClient有本地和yarn兩種扰才,這里以yarn方式舉例允懂。
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
// 把job提交到ResourceManager上
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
最后就是監(jiān)控任務(wù)狀態(tài),等待返回任務(wù)執(zhí)行結(jié)果衩匣,參考monitorAndPrintJob方法蕾总。