1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal
/**
* Internal method for submitting jobs to the system.
* The job submission process involves:
* 1. Checking the input and output specifications of the job.
* 1. 檢查作業(yè)輸入輸出規(guī)范
* 2. Computing the InputSplits for the job.
* 2. 計算作業(yè)的輸入分片
* 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
* 3. 如有必要缎脾,請為作業(yè)的分布式緩存設置必要的記帳信息
* 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
* 4. 將作業(yè)的jar和配置復制到分布式文件系統上的map reduce系統目錄中
* 5. Submitting the job to the JobTracker and optionally monitoring it's status.
* 5. 將作業(yè)提交給JobTracker,并可選擇監(jiān)視其狀態(tài)
* Params:
* job – the configuration to submit cluster – the handle to the Cluster
* Throws:
* ClassNotFoundException –
* InterruptedException –
* IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 驗證作業(yè)的輸出規(guī)范
// 通常檢查輸出路徑是否已經存在,當它已經存在時拋出異常,這樣輸出就不會被覆蓋
checkSpecs(job);
// 除非明確關閉鸦难,否則Hadoop默認指定兩個資源,按類路徑的順序加載:
// core-default.xml:hadoop的只讀默認值御吞。
// core-site.xml:給定hadoop安裝的特定于站點的配置宽堆。
Configuration conf = job.getConfiguration();
// 從MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
// 解析路徑中的任何符號鏈接
// 解析后的uri添加到分布式緩存中:
// MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
// conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
// : archives + "," + uri.toString());
addMRFrameworkToDistributedCache(conf);
// 獲取放置作業(yè)特定文件的暫存目錄
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
// 設置提交任務的主機名稱和地址
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 這里的submitClient在后續(xù)解釋,即:LocalJobRunner或者YARNRunner
// 創(chuàng)建Applicant唉韭,生成JobId夜涕,返回JobId
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 提交作業(yè)的路徑(Path parent, String child),將兩個參數拼接為一個新路徑
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// 作業(yè)狀態(tài)
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
// 從與傳遞的路徑(作業(yè)文件目錄)相對應的名稱節(jié)點獲取委派令牌
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
// 從所有NAMENODE節(jié)點處獲取委派令牌
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
// 獲取Shuffle密鑰來授權Shuffle轉換
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
// 在Hadoop MapReduce中属愤,當進行數據溢出(spill)時女器,會將部分數據從內存中寫入磁盤以釋內存間
// 為保證數據安全,當啟用加密中間數據溢出時住诸,最大ApplicationMaster(AM)嘗試次數設置為1
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
// 使用命令行選項-libjars驾胆、-files涣澡、-archives配置用戶的jobconf
// 并上載和配置與傳遞的作業(yè)相關的文件、libjar丧诺、jobjar和歸檔文件
copyAndConfigureFiles(job, submitJobDir);
// 獲取作業(yè)conf文件暑塑,即:new Path(jobSubmitDir, "job.xml");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
// 計算任務輸入的分片,并返回分片數量锅必,即map任務的數量
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// 如果計算出來的map數大于設置的或者默認的最大map數事格,拋出異常
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}
// write "queue admins of the queue to which job is being submitted"
// to job file.
// 將“作業(yè)提交到什么隊列”寫入job文件
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
// 在將jobconf復制到HDFS之前刪除jobtoken引用,因為任務不需要此設置;
// 實際上它們可能會因此而中斷搞隐,因為引用將指向不同的作業(yè)驹愚。
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
// 添加DHFS tracking ids:跟蹤標識符,該標識符可用于在多個客戶端會話中關聯令牌的使用情況
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
// reservationId是全局唯一作業(yè)的保留標識符劣纲,如果作業(yè)沒有任何關聯的保留逢捺,則為null
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
// 將submitJobFile寫到JobTracker的文件系統中去
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// 正式提交Job到Yarn或者本地
status = submitClient.submitJob( // 具體見Hadoop-MapReduce源碼解析》之三
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
// 返回Job提交后的狀態(tài)
return status;
} else {
// 任務啟動失敗
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
// 清空暫存目錄
jtFs.delete(submitJobDir, true);
}
}
}