本文首發(fā)于我的個(gè)人博客:HTMADAO的Blog 轉(zhuǎn)載請(qǐng)注明出處
Job提交
在mapreduce程序的job類中歇拆,我們通過set Configuration對(duì)象矩动,得到相應(yīng)的job對(duì)象,在job對(duì)象中指定Mapper類腹纳、Reducer類,Job類等屬性后,通過waitForCompletion(true)方法提交并等待job執(zhí)行九杂。傳入的boolean類型參數(shù)決定是否監(jiān)控并打印job的執(zhí)行情況。
public class CheckJob {
public static void main(String[] args) throws Exception {
if (args.length!=2){
System.out.println("你傳夠/多參數(shù)了嗎宣蠕?");
System.exit(1);
}else {
//本程序要處理的數(shù)據(jù)在HDFS上的目錄
String inputPath = args[0];
//本程序處理結(jié)果存在HDFS上的目錄例隆,注意:這個(gè)目錄不能存在
String outputPath = args[1];
if (inputPath == null || inputPath == "" || outputPath == null || outputPath == ""){
System.out.println("參數(shù)不對(duì),滾回去重配");
System.exit(1);
}else {
//構(gòu)建Job類的對(duì)象
Job checkRandomJob = Job.getInstance(HadoopUtil.getRemoteHadoopConf());
//給當(dāng)前job類的對(duì)象設(shè)置job名稱
checkRandomJob.setJobName("check Random app");
//設(shè)置運(yùn)行主類
checkRandomJob.setJarByClass(CheckJob.class);
//設(shè)置job的Mapper及其輸出K,V的類型
checkRandomJob.setMapperClass(CheckMapper.class);
checkRandomJob.setMapOutputKeyClass(Text.class);
checkRandomJob.setMapOutputValueClass(LongWritable.class);
//設(shè)置job的輸出K,V的類型抢蚀,也可以說是Reducer輸出的K,V的類型
checkRandomJob.setReducerClass(CheckReducer.class);
checkRandomJob.setOutputKeyClass(Text.class);
checkRandomJob.setOutputValueClass(LongWritable.class);
//設(shè)置要處理的HDFS上的文件的路徑
FileInputFormat.addInputPath(checkRandomJob,new Path(inputPath));
//設(shè)置最終輸出結(jié)果的路徑
FileOutputFormat.setOutputPath(checkRandomJob,new Path(outputPath));
//等待程序完成后自動(dòng)結(jié)束程序
System.exit(checkRandomJob.waitForCompletion(true)?0:1);
}
}
}
}
job.waitForCompletion()方法
進(jìn)入Job類中的waitForCompletion()方法查看镀层,該方法傳入一個(gè)布爾值參數(shù)。方法首先檢查Job狀態(tài)皿曲,若處于DEFINE狀態(tài)則通過submit()方法提交job唱逢。而后根據(jù)傳入的參數(shù)決定是否監(jiān)控并打印job的運(yùn)行狀況吴侦。
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法向集群提交job
if (state == JobState.DEFINE) {
submit();
}
//若傳入?yún)?shù)為true坞古,則監(jiān)控并打印job運(yùn)行情況
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
submit()方法
該方法負(fù)責(zé)向集群提交job备韧,方法首先再次檢查job的狀態(tài),如果不是DEFINE則不能提交作業(yè)痪枫,setUseNewAPI()方法作用是指定job使用的是新版mapreduce的API织堂,即org.apache.hadoop.mapreduce包下的Mapper和Reducer,而不是老版的mapred包下的類奶陈。
submit()中執(zhí)行了兩個(gè)比較重要的方法:其一的connect()方法會(huì)對(duì)Job類中的Cluster類型的成員進(jìn)行初始化易阳,該成員對(duì)象中封裝了通過Configuration設(shè)置的集群的信息,其內(nèi)部創(chuàng)建了真正的通信協(xié)議對(duì)象尿瞭,它將用于最終的job提交闽烙。
getJobSubmitter()方法通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對(duì)象,該對(duì)象負(fù)責(zé)最終向集群提交job并返回job的運(yùn)行進(jìn)度声搁。最后job提交器對(duì)象submitter.submitJobInternal(Job.this, cluster)將當(dāng)前job對(duì)象提交到cluster中黑竞,并返回job運(yùn)行狀態(tài)給status成員,該方法是JobSubmitter中最核心的功能代碼疏旨。提交成功后很魂,JobState被設(shè)置為RUNNING,表示當(dāng)前job進(jìn)入運(yùn)行階段檐涝,最后控制臺(tái)中打印跟蹤job運(yùn)行狀況的URL遏匆。
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
//通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對(duì)象,該對(duì)象負(fù)責(zé)最終向集群提交job并返回job的運(yùn)行進(jìn)度
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
submitJobInternal()方法
任務(wù)提交器(JobSubmitter)最終提交任務(wù)到集群的方法谁榜。
首先checkSpecs(job)方法檢查作業(yè)輸出路徑是否配置并且是否存在幅聘。正確情況是已經(jīng)配置且不存在,輸出路徑的配置參數(shù)為mapreduce.output.fileoutputformat.outputdir
而后獲取job中封裝的Configuration對(duì)象窃植,添加MAPREDUCE_APPLICATION_FRAMEWORK_PATH(應(yīng)用框架路徑)到分布式緩存中帝蒿。
通過JobSubmissionFiles中的靜態(tài)方法getStagingDir()獲取作業(yè)執(zhí)行時(shí)相關(guān)資源的存放路徑。默認(rèn)路徑是: /tmp/hadoop-yarn/staging/root/.staging
關(guān)于ip地址的方法則是用于獲取提交任務(wù)的當(dāng)前主機(jī)的IP巷怜,并將ip葛超、主機(jī)名等相關(guān)信息封裝進(jìn)Configuration對(duì)象中。
生成jobID并將其設(shè)置進(jìn)job對(duì)象中延塑,構(gòu)造提交job的路徑绣张。然后是對(duì)該路徑設(shè)置一系列權(quán)限的操作,此處略過不表
writeConf()方法关带,將Job文件(jar包)上傳到任務(wù)提交文件夾(HDFS)
(重要)writeSplits()方法侥涵,寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo到任務(wù)提交文件夾,計(jì)算maptask數(shù)量。
submitClient.submitJob()方法独令,真正的提交作業(yè)到集群端朵,并返回作業(yè)狀態(tài)到status成員。submitClient是前面初始化Cluster對(duì)象時(shí)構(gòu)建的燃箭。
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs 檢查作業(yè)輸出路徑是否配置并且是否存在
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
//獲取提交任務(wù)的當(dāng)前主機(jī)的IP冲呢,并將ip、主機(jī)名等相關(guān)信息封裝僅Configuration對(duì)象中
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//生成作業(yè)ID,即jobID
JobID jobId = submitClient.getNewJobID();
//將jobID設(shè)置入job
job.setJobID(jobId);
//構(gòu)造提交作業(yè)路徑,jobStagingArea后接/jobID
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir 一系列關(guān)于作業(yè)提交路徑權(quán)限的設(shè)置
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
//復(fù)制并配置相關(guān)文件
copyAndConfigureFiles(job, submitJobDir);
//獲取配置文件路徑
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//writeSplits()方法招狸,寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo,計(jì)算map任務(wù)數(shù)
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir將Job文件(jar包)上傳到任務(wù)提交文件夾(HDFS)
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
writeSplits()方法
使用newAPI將會(huì)調(diào)用writeNewSplits()方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//如果使用newAPI則調(diào)用writeNewSplits
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
writeNewSplits()方法
writeNewSplits()方法將會(huì)根據(jù)我們?cè)O(shè)置的inputFormat.class通過反射獲得inputFormat對(duì)象input敬拓,然后調(diào)用inputFormat對(duì)象的getSplits方法,當(dāng)獲得分片信息之后調(diào)用JobSplitWriter.createSplitFiles方法將分片的信息寫入到submitJobDir/job.split文件中裙戏。
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
FileInputFormat類中的getSplits()方法
這里我們需要注意這一代碼:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
這里的while循環(huán)中的判定語(yǔ)句作用是判斷分塊中剩余的字節(jié)大小與預(yù)設(shè)分片大小的比例是否超過某個(gè)限定值SPLIT_SLOP乘凸,該值是一個(gè)常量,為1.1累榜,在FileInputFormat類中定義营勤。也就是說當(dāng)剩余字節(jié)大于預(yù)設(shè)分片大小的110%后,對(duì)剩余的文件繼續(xù)分片壹罚,否則不足110%葛作,直接將剩余文件生成一個(gè)分片。
private static final double SPLIT_SLOP = 1.1;
getSplits()方法全覽:
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}