/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
// 如果Job.state是DEFINE星澳,就可以提交任務(wù)债朵。
submit(); // 參見(jiàn):2. org.apache.hadoop.mapreduce.Job#submit
}
if (verbose) {
// 在進(jìn)度和任務(wù)失敗時(shí)實(shí)時(shí)監(jiān)控作業(yè)和打印狀態(tài)。
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
// 獲取waitForCompletion() 應(yīng)檢查的時(shí)間間隔勺拣。
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
// 任務(wù)未完成略就,則睡眠一會(huì)般卑。
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
// 檢查作業(yè)是否成功完成酿秸。
return isSuccessful();
}
2. org.apache.hadoop.mapreduce.Job#submit
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 提交之前,確定Job.state是不是DEFINE嵌言,如果不是則拋出異常嗅回。
// 因?yàn)?lt;設(shè)置參數(shù)>方法只能在作業(yè)提交之前起作用,之后它們將拋出IllegalStateException
ensureState(JobState.DEFINE);
// 默認(rèn)為新API摧茴,除非它們被顯式設(shè)置绵载,或者使用了舊的mapper或reduce屬性。
// 做一些兼容性檢查苛白,避免沖突娃豹。
// 新API:org.apache.hadoop.mapreduce及其子包
// 老API:org.apache.hadoop.mapred及其子包
setUseNewAPI();
// 初始化org.apache.hadoop.mapreduce.Cluster對(duì)象,用于鏈接/訪問(wèn)map/reduce群集信息购裙。
connect();
// 初始化org.apache.hadoop.mapreduce.JobSubmitter對(duì)象懂版。
// 本質(zhì)就是:new JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)。
// FileSystem :本地文件系統(tǒng)或者分布式文件系統(tǒng)
// ClientProtocol :是JobClient和中央JobTracker用于通信的協(xié)議躏率。
// JobClient可以使用這些方法提交作業(yè)以供執(zhí)行定续,并了解當(dāng)前系統(tǒng)狀態(tài)。
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
// 包含用于向系統(tǒng)提交作業(yè)時(shí)的一些操作禾锤,并返回其最新的Job資料信息。
// 參數(shù):job->要提交的配置摹察;cluster->cluster的句柄(用于鏈接/訪問(wèn)map/reduce群集信息)
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 參見(jiàn):《MapReduce源碼解析》之二
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 更改Job.state為RUNNING恩掷,此時(shí)不能再修改任何配置信息。
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}