一秩彤、首先拋出結(jié)論(Map階段)
1. Job.afterCompletion():
檢查是否是running狀態(tài)我注,如果是running避免重復(fù)提交句惯!
如果狀態(tài)是define莺禁,提交幕垦!執(zhí)行commit()
2. commit() :
創(chuàng)建Cluster對象扔茅,是Job運(yùn)行的集群的抽象表達(dá)读串,包含JobRunner聊记,及文件系統(tǒng)!
根據(jù)Cluster獲取Jobcommitter恢暖,提交Job
3. 提交前:
①確定當(dāng)前Job的作業(yè)目錄
②切片排监,將split.info / split.infometa在作業(yè)目錄生成
③根據(jù)切片數(shù),設(shè)置MapTask啟動(dòng)個(gè)數(shù)
④生成Job.xml文件(包含了所有的配置信息參數(shù))
4. 提交:
在LocalJobRunner上重構(gòu)Job對象杰捂!
執(zhí)行start()社露,啟動(dòng)一個(gè)線程!
5. Job的run()
①根據(jù)切片信息琼娘,獲取包含切面及屬性信息的數(shù)組峭弟,根據(jù)這個(gè)數(shù)組附鸽,確定List<RunableandThrowable> mapTaskRunables
②根據(jù)設(shè)置的numReduceTasks(默認(rèn)為1),確定List<RunableandThrowable> reduceeTaskRunables
③創(chuàng)建線程池瞒瘸,開啟多個(gè)線程坷备,運(yùn)行MapTask和ReduceTask
6. 每個(gè)MapTaskRunable對象,都會(huì)創(chuàng)建一個(gè)MapTask
MapTask執(zhí)行runNewMapper()執(zhí)行Map階段的核心邏輯情臭!
7. 每個(gè)ReduceTaskRunable對象省撑,都會(huì)創(chuàng)建一個(gè)ReduceTask
ReduceTask執(zhí)行runNewReducer()執(zhí)行Rudece階段的核心邏輯!
二俯在、各階段源碼
- Job.waitForCompletion
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
// 判斷job是定義狀態(tài)竟秫,避免job提交后正在運(yùn)行造成的重復(fù)提交
if (state == JobState.DEFINE) {
//此方法的核心在于submit()
submit();
}
if (verbose) { // 根據(jù)傳入?yún)?shù)決定是否將job運(yùn)行的過程打印顯示
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
- submit()
2.1 submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
//確認(rèn)運(yùn)行狀態(tài)
ensureState(JobState.DEFINE);
//確定是否是使用的新API
setUseNewAPI();
// 創(chuàng)建Cluster對象,Cluster是集群的抽象表達(dá)跷乐,包含JobRunner,和文件系統(tǒng)
connect();
// 根據(jù)Cluster創(chuàng)建Job提交器
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
// 使用Job提交器肥败,提交Job,獲取Job運(yùn)行狀態(tài)
//ugi是UserGroupInformation類的實(shí)例愕提,表示Hadoop中的用戶和組信息馒稍,這個(gè)類包裝了一個(gè)JAAS Subject以及提供了
//方法來確定用戶的名字和組,它同時(shí)支持Windows浅侨、Unix和Kerberos登錄模塊
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
// 將狀態(tài)改為正在運(yùn)行
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
2.2. setUseNewApi()
private void setUseNewAPI() throws IOException {
// reduceTask的個(gè)數(shù)纽谒,通過mapreduce.job.reduces設(shè)置,默認(rèn)是1
int numReduces = conf.getNumReduceTasks();
......//后面是一系列的檢查語句,此處不作具體解釋
}
- Jobsubmitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
// 獲取輸出格式如输,調(diào)用方法鼓黔,檢查輸入目錄是否設(shè)置且不存在!
checkSpecs(job);
Configuration conf = job.getConfiguration();
//將conf加入分布式緩存中
addMRFrameworkToDistributedCache(conf);
// 生成Job運(yùn)行期間臨時(shí)作業(yè)目錄
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
//得到本機(jī)的提交地址
submitHostAddress = ip.getHostAddress();
//得到本機(jī)的主機(jī)名字
submitHostName = ip.getHostName();
//獲取之后不见,在配置文件中進(jìn)行submitHostName和submitHostAddress的設(shè)置
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
// 生成Job的id
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 根據(jù)Jobid澳化,在Job作業(yè)目錄,創(chuàng)建一個(gè)子目錄
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try{
......//此段內(nèi)容為權(quán)限檢查
// 生成當(dāng)前Job的作業(yè)目錄
copyAndConfigureFiles(job, submitJobDir);
// 獲取當(dāng)前Job總的配置文件Job.xml(包含8個(gè)配置文件中所有的信息)的路徑
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 生成當(dāng)前Job的切片信息: Job.split(切片信息)脖祈,job.splitinfo(切片的屬性)
// Job.split保存了有多少個(gè)切片對象肆捕,以及每個(gè)切片是從哪個(gè)文件切的哪部分
// job.splitinfo記錄的是每一個(gè)切片刷晋,應(yīng)該去哪個(gè)主機(jī)來讀取盖高,塊信息的DN主機(jī)
int maps = writeSplits(job, submitJobDir);
// 根據(jù)切片數(shù),設(shè)置應(yīng)該啟動(dòng)幾個(gè)MapTask
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
......//此段內(nèi)容為權(quán)限檢查
// Write job file to submit dir
// 將當(dāng)前Job的配置信息生成到作業(yè)目錄的job.xml中
writeConf(conf, submitJobFile);
// Now, actually submit the job (using the submit name)
printTokens(jobId, job.getCredentials());
// 正式提交
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
}finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
4.1 提交Job
submitJob()方法是接口 ClientProtocol(RPC 協(xié)議)中的一個(gè)抽象方法眼虱。根據(jù) RPC 原理喻奥,在【客戶端代理對象submitClient】調(diào)用RPC協(xié)議中的submitJob()方法,此方法一定在服務(wù)端執(zhí)行捏悬。該方法也有兩種實(shí)現(xiàn): LocalJobRunner(本地模式)和 YARNRunner(YARN模式)
//本地模式的Job提交方式撞蚕,
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //跳至第4.2階段
job.job.setCredentials(credentials);
return job.status;
}
4.2 New Job
//重構(gòu)Job對象!
public Job(JobID jobid, String jobSubmitDir) throws IOException {
…… //各種設(shè)置过牙,重構(gòu)在LocalJobRunner上運(yùn)行的Job
// 開啟一個(gè)線程
this.start();
}
- Job.run()
//運(yùn)行Job
public void run() {
JobID jobId = profile.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
try {
outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
} catch (Exception e) {
LOG.info("Failed to createOutputCommitter", e);
return;
}
try {
// 根據(jù)切片信息甥厦,創(chuàng)建所有的切片及切片屬性對象
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks();
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
// 使用一個(gè)Map記錄每個(gè)MapTask最終保存文件的信息
Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
//創(chuàng)建MapTask運(yùn)行的線程集合纺铭,有幾片就啟動(dòng)幾個(gè)線程
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
initCounters(mapRunnables.size(), numReduceTasks);
// 創(chuàng)建線程池
ExecutorService mapService = createMapExecutor();
// 開啟Map階段線程的運(yùn)行
//注意:mapreduce的運(yùn)行過程中,使用了線程池的技術(shù)(放到隊(duì)列當(dāng)中刀疙,在將來的某個(gè)時(shí)刻進(jìn)行執(zhí)行)
runTasks(mapRunnables, mapService, "map");
try {
if (numReduceTasks > 0) {
//計(jì)算reduce對應(yīng)的runnable個(gè)數(shù)
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
//開啟線程池
ExecutorService reduceService = createReduceExecutor();
//開啟Reduce階段線程的運(yùn)行舶赔,此篇文章不講解此階段,后面新開文章講解此階段內(nèi)容
runTasks(reduceRunnables, reduceService, "reduce");
}
} finally {
for (MapOutputFile output : mapOutputFiles.values()) {
output.removeAll();
}
}
// delete the temporary directory in output directory
outputCommitter.commitJob(jContext);
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.SUCCEEDED);
}
JobEndNotifier.localRunnerNotification(job, status);
} catch (Throwable t) {
try {
outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
} catch (IOException ioe) {
LOG.info("Error cleaning up job:" + id);
}
status.setCleanupProgress(1.0f);
if (killed) {
this.status.setRunState(JobStatus.KILLED);
} else {
this.status.setRunState(JobStatus.FAILED);
}
LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
} finally {
try {
fs.delete(systemJobFile.getParent(), true); // delete submit dir
localFs.delete(localJobFile, true); // delete local copy
// Cleanup distributed cache
localDistributedCacheManager.close();
} catch (IOException e) {
LOG.warn("Error cleaning up "+id+": "+e);
}
}
}
6.Job類中的runTasks()方法
private void runTasks(List<RunnableWithThrowable> runnables,
ExecutorService service, String taskType) throws Exception {
// Start populating the executor with work units.
// They may begin running immediately (in other threads).
for (Runnable r : runnables) {
//進(jìn)行提交 是一個(gè)線程池谦秧,執(zhí)行map或者reduce
service.submit(r);
}
...
}
- MapTaskRunable的run()
public void run() {
try {
// 生成當(dāng)前線程的id
TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);
LOG.info("Starting task: " + mapId);
mapIds.add(mapId);
// 創(chuàng)建MapTask對象竟纳,這個(gè)對象負(fù)責(zé)當(dāng)前線程(節(jié)點(diǎn))map階段邏輯的執(zhí)行!
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);
map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
//設(shè)置目錄
//map為MapTask類型的一個(gè)值疚鲤,例如本次調(diào)試中所獲取的值為:attempt_localXXX_0001_m_000000_0
//localConf()加載配置文件的信息
setupChildMapredLocalDirs(map, localConf);
//創(chuàng)建一個(gè)map輸出文件并設(shè)置配置信息
// 當(dāng)前MapTask生成的數(shù)據(jù)保存的對象
MapOutputFile mapOutput = new MROutputFiles();
mapOutput.setConf(localConf);
mapOutputFiles.put(mapId, mapOutput);
//指的是一個(gè)job_localXXX_0001.xml文件
map.setJobFile(localJobFile.toString());
localConf.setUser(map.getUser());
map.localizeConfiguration(localConf);
map.setConf(localConf);
try {
map_tasks.getAndIncrement();
//launchMap()方法锥累,進(jìn)行啟動(dòng)map
myMetrics.launchMap(mapId);
map.run(localConf, Job.this); //此處將調(diào)用Maptask的run方法
myMetrics.completeMap(mapId);
} finally {
map_tasks.getAndDecrement();
}
LOG.info("Finishing task: " + mapId);
} catch (Throwable e) {
this.storedException = e;
}
}
- MapTask.run()
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// 如果需要對key-value進(jìn)行排序,那么必須有reduce階段集歇!
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
//此處進(jìn)入Map的核心處理階段的入口
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
9*. MapTask核心邏輯 RunNewMapper()
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
// 創(chuàng)建配置的上下文
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper 實(shí)例化Mapper對象
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format 實(shí)例化InputFormat
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split //重建切片 切片包含了當(dāng)前片所屬的文件及哪個(gè)部分
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
// 負(fù)責(zé)直接將MapTask輸出的結(jié)果輸出到最終的輸出目錄
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
// 負(fù)責(zé)將MapTask產(chǎn)生的結(jié)果進(jìn)行收集桶略,交給ReduceTask
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
//進(jìn)入之后運(yùn)行用戶自定義的mapper示例
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
10.Mapper.run()
//運(yùn)行自定義的Mapper!
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
//在此處開始運(yùn)行自己寫的mapper程序
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
參考文獻(xiàn):
MapReduce Job本地提交過程源碼跟蹤及分析https://blog.csdn.net/lemonZhaoTao/article/details/72943618