導(dǎo)語(yǔ)
1.YARN各模塊通信協(xié)議音羞、gateway對(duì)象?
2.YARN App從client啟動(dòng)到執(zhí)行完成的生命周期實(shí)現(xiàn)万哪?
3.CapacityScheduler、FairScheduler實(shí)現(xiàn)細(xì)節(jié)?//TODO:
4.LinuxContainerExecutor vs Docker?//TODO:
YARN 2.6.0 版本還未支持GPU,且未啟用DRF的DominantResourceCalculator
的情況下知纷,默認(rèn)的DefaultResourceCalculator
只是基于Memory來(lái)切割分配Container壤圃。
一.各組件間通信協(xié)議
- 框架層協(xié)議
ApplicationClientProtocol [ClientRMService] // clients -> RM
ContainerManagementProtocol[ContainerManagerImpl] // AM|RM -> NM
ApplicationMasterProtocol[ApplicationMasterService] // AM -> RM
ResourceTracker[ResourceTrackerService] // NM -> RM
- 應(yīng)用層協(xié)議
MR:MRClientProtocol [MRClientService] //clinet -> AM
二.on-yarn-app生命周期
Client請(qǐng)求RM獲取AM_ID,然后封裝ApplicationSubmissionContext提交給RM,RM響應(yīng)NM心跳琅轧,調(diào)度NM拉起AM伍绳,AM向RM注冊(cè),并匯報(bào)心跳請(qǐng)求資源乍桂,拿到資源后請(qǐng)求NM啟動(dòng)worker Container冲杀。
以下Client基于MR的client,但是AM用了distributedshell這個(gè)簡(jiǎn)單的版本睹酌,后續(xù)再完善MR版本的了
2.1 Client初始化权谁,提交Job[MR
]到RM,監(jiān)控狀態(tài),打印進(jìn)度日志
org.apache.hadoop.examples.WordCount.main
->new Job(conf, "word count");
->job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);
->FileInputFormat.addInputPath(job, new Path(otherArgs[i]))->FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]))
->job.waitForCompletion(true)
->org.apache.hadoop.mapreduce.Job.submit();
->org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job.this, cluster);
->JobSubmissionFiles.getStagingDir(cluster, conf);//初始化stage目錄/staging/user/.staging
->org.apache.hadoop.mapred.ResourceMgrDelegate.getNewJobID
->client.createApplication().getApplicationSubmissionContext()//向RM申請(qǐng)AM_ID
->ApplicationClientProtocol.getNewApplication
->[RM]ClientRMService.getNewApplication
->[RM]org.apache.hadoop.yarn.server.utils.BuilderUtils.newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),applicationCounter.incrementAndGet() //就是RM的啟動(dòng)時(shí)間+自增ID
->new org.apache.hadoop.mapred.JobID(identifier, appID.getId());//基于AM_ID構(gòu)建Job_ID
->conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, new Path(jobStagingArea, jobId.toString()));//Configuring job with submitJobDir as the submit dir
->get delegation token for the dir
->JobResourceUploader.uploadFiles(job, jobSubmitDir);//Upload and configure files, libjars, jobjars, and archives pertaining to * the passed job
->JobSubmitter.writeSplits(job, submitJobDir);
->org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(conf, splits, out);
->org.apache.hadoop.mapreduce.split.JobSplitWriter.writeJobSplitMetaInfo
->conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());//write "queue admins of the queue to which job is being submitted"
->writeConf(conf, submitJobFile);//Write job file to submit dir
->org.apache.hadoop.mapred.YARNRunner.submitJob(jobId, submitJobDir.toString(), job.getCredentials())
->YARNRunner.createApplicationSubmissionContext
->capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB));capability.setVirtualCores;
->localResources.put(MRJobConfig.JOB_CONF_FILE,createApplicationResource(defaultFileContext,jobConfPath, LocalResourceType.FILE))
->localResources.put(MRJobConfig.JOB_JAR, createApplicationResource(FileContext.getFileContext(jobJarPath.toUri(), jobConf),jobJarPath,LocalResourceType.PATTERN)) //jar
->localResources.put(MRJobConfig.JOB_SUBMIT_DIR + "/" + (MRJobConfig.JOB_SPLIT||MRJobConfig.JOB_SPLIT_METAINFO),createApplicationResource(defaultFileContext,new Path(jobSubmitDir, s), LocalResourceType.FILE)//split info
->/bin/java org.apache.hadoop.mapreduce.v2.app.MRAppMaster ..>>ApplicationConstants.LOG_DIR_EXPANSION_VAR //Setup the command to run the AM
->MRApps.setClasspath(environment, conf);
->//Setup the environment
->ContainerLaunchContext amContainer =ContainerLaunchContext.newInstance(localResources, environment,vargsFinal, null, securityTokens, acls);
->appContext.setApplicationId(applicationId); appContext.setQueue();appContext.setApplicationName
->appContext.setAMContainerSpec(amContainer); // AM Container
->[RM]ClientRMService.submitApplication(appContext)
->[RM]RMAppManager.submitApplication
->[RM]new RMAppImpl() // Create RMApp
->[RM]rmContext.getRMApps().putIfAbsent(applicationId, application)//add to rmContext
->[RMAppState.NEW->RMAppEventType.NEW_SAVING]RMAppImpl.RMAppNewlySavingTransition.transition()
->StoreAppTransition.transition
->storeApplicationStateInternal(appId, appState);//RMAppEventType.APP_NEW_SAVED
->[RMAppState.NEW_SAVING->RMAppState.SUBMITTED]AddApplicationToSchedulerTransition
->FifoScheduler.handle(new AppAddedSchedulerEvent())//APP_ADDED
->FifoScheduler.addApplication->AbstractYarnScheduler.applications.put(applicationId, application);
->[RMAppState.SUBMITTED->RMAppState.ACCEPTED] StartAppAttemptTransition
->RMAppImpl.createAndStartNewAttempt
->RMAppImpl.createNewAttempt
->new RMAppAttemptImpl
->RMAppImpl.attempts.put(appAttemptId, attempt);
->[RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED]AttemptStartedTransition
->ApplicationMasterService.registerAppAttempt(appAttempt.applicationAttemptId)
->ApplicationMasterService.responseMap.put(attemptId, new AllocateResponseLock(response));
->FifoScheduler.handle(new AppAttemptAddedSchedulerEvent())//APP_ATTEMPT_ADDED
->FifoScheduler.addApplicationAttempt
->[RMAppAttemptState.SUBMITTED, RMAppAttemptState.SCHEDULED] ScheduleTransition
->appAttempt.amReq.setNumContainers(1);setPriority(AM_CONTAINER_PRIORITY);setResourceName(ResourceRequest.ANY);setRelaxLocality(true);//設(shè)置AM ResourceRequest
->appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,Collections.singletonList(appAttempt.amReq),EMPTY_CONTAINER_RELEASE_LIST,amBlacklist.getAdditions(),amBlacklist.getRemovals())
->FifoScheduler.allocate //注意:此處并未同步分配資源憋沿,只是記錄下來(lái)旺芽,返回的Allocation是之前異步分配得到的資源
->scheduler.SchedulerApplicationAttempt.pullNewlyAllocatedContainersAndNMTokens
->rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),RMContainerEventType.ACQUIRED)) //給NM->scheduler->app獲取到的newlyAllocatedContainers發(fā)送event
->[RMContainerState.ALLOCATED -> RMContainerState.ACQUIRED] AcquiredTransition
->container.containerAllocationExpirer.register(container.getContainerId());
->[RMAppState.ACCEPTED->RMAppState.ACCEPTED] AppRunningOnNodeTransition
->rmAppImpl.ranNodes.add(nodeAddedEvent.getNodeId());
->return new ContainersAndNMTokensAllocation(SchedulerApplicationAttempt.newlyAllocatedContainers, nmTokens);//返回獲取到的application獲得的Container[TODO://newlyAllocatedContainers如何初始化的,即scheduler如何賦值這個(gè)]
->[RM]ClientRMService.getApplicationReport
->[RM]RMAppImpl.createAndGetApplicationReport
->RMAppImpl.currentAttempt //讀取currentAttempt狀態(tài)并返回
->org.apache.hadoop.mapreduce.Job.monitorAndPrintJob();//連接AM并讀取應(yīng)用狀態(tài),打印日志
->while (!isComplete() || !reportedAfterCompletion)
->mapreduce.Job.updateStatus //更新MR進(jìn)度
->YARNRunner.getJobStatus(status.getJobID())
->mapred.ClientServiceDelegate.getJobStatus
->obReport report = ((GetJobReportResponse) invoke("getJobReport",GetJobReportRequest.class, request)).getJobReport()
->mapred.ClientServiceDelegate.invoke
->mapred.ClientServiceDelegate.getProxy
->MRClientProtocol MRClientProxy=application.getTrackingUrl();
->serviceAddr = NetUtils.createSocketAddrForHost(application.getHost(), application.getRpcPort());//獲取AM的host以及RPC接口
->instantiateAMProxy(serviceAddr)//構(gòu)建client->AM proxy
->[AM]MRClientService.getJobReport //AM讀取job狀態(tài)
->org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.getReport
->mapreduce.Job.getTaskCompletionEvents
->[AM]org.apache.hadoop.mapreduce.v2.app.client.MRClientService.MRClientProtocolHandler.getTaskAttemptCompletionEvents
2.2 NM啟動(dòng),匯報(bào)心跳給RM , Scheduler[FifoScheduler
]分配資源采章,拉起AM container
org.apache.hadoop.yarn.server.nodemanager.NodeManager.main
->new NodeManager().initAndStartNodeManager(conf, false) //CompositeService類
->NodeManager.serviceInit;
->initAndStartRecoveryStore//涉及Recovery的暫時(shí)跳過(guò)
->this.aclsManager = new ApplicationACLsManager(conf);//涉及Token的也都跳過(guò)
->ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,DefaultContainerExecutor.class, ContainerExecutor.class), conf);
->ContainerExecutor.init
->nodemanager.LinuxContainerExecutor.init || DockerContainerExecutor
->container-executor --checksetup //執(zhí)行shell檢查健康狀態(tài)
->addService(DeletionService)//NM本地文件及日志的清理Service
->new NodeHealthCheckerService();//健康檢查service
->new LocalDirsHandlerService()//本地目錄檢查
->NodeManager.context=createNMContext(containerTokenSecretManager,nmTokenSecretManager, nmStore)
->NodeManager.nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);[TODO://]
->NodeStatusUpdaterImpl.NodeStatusUpdaterImpl()
->NodeStatusUpdaterImpl.serviceInit
->NodeStatusUpdaterImpl.totalResource = Resource.newInstance(memoryMb, virtualCores) //計(jì)算當(dāng)前節(jié)點(diǎn)資源
->NodeStatusUpdaterImpl.serviceStart
->NodeStatusUpdaterImpl.resourceTracker = getRMClient();
->NodeStatusUpdaterImpl.registerWithRM
->RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
->[RM]org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService.registerNodeManager //以下全是RM里的操作
->[RM]this.nodesListManager.isValidNode(host)//Check if this node is a 'valid' node
->[RM]RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,resolve(host), capability, nodeManagerVersion)
->[RM]this.rmContext.getDispatcher().getEventHandler().handle(new RMNodeStartedEvent)
->[NodeState.NEW, NodeState.RUNNING]AddNodeTransition
->[RM]resourcemanager.ResourceTrackerService.handleNMContainerStatus(status, nodeId);
->if (container.getContainerState() == ContainerState.RUNNING) { RMNodeImpl.launchedContainers.add(container.getContainerId()) }//將NM匯報(bào)的的container加入launchedContainers
->RMNodeImpl.handleRunningAppOnNode(rmNode, rmNode.context, startEvent.getRunningApplications(), rmNode.nodeId)//處理RunningApplications运嗜,分發(fā)RMAppRunningOnNodeEvent事件
->getDispatcher().getEventHandler().handle(new RMAppRunningOnNodeEvent)
->[ ->RMAppState.RUNNING]AppRunningOnNodeTransition
->context.getDispatcher().getEventHandler().handle(new NodeAddedSchedulerEvent())
->FifoScheduler.handle(NODE_ADDED)
->FifoScheduler.addNode
->FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(rMNode, usePortForNodeName);
->FifoScheduler.nodes.put(nodeManager.getNodeID(), schedulerNode);
->Resources.addTo(FifoScheduler.clusterResource, nodeManager.getTotalCapability());//當(dāng)前NM的資源加入集群資源
->AbstractYarnScheduler.updateMaximumAllocation//更新集群最大資源配置
->AbstractYarnScheduler.recoverContainersOnNode //recoverContainers on that NM
->context.getDispatcher().getEventHandler().handle(new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE, rmNode))
->resourcemanager.NodesListManager.handle(NODE_USABLE)
->rmContext.getDispatcher().getEventHandler().handle(new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,RMAppNodeUpdateType.NODE_USABLE)
-[RMAppState.* -> RMAppState.*] RMAppNodeUpdateTransition
->RMAppImpl.processNodeUpdate
->RMAppImpl.updatedNodes.add(node);//將NM狀態(tài)變更告訴RMAppImpl
->NodeStatusUpdaterImpl.startStatusUpdater //啟動(dòng)線程匯報(bào)心跳并觸發(fā)RM Scheduler
->response = resourceTracker.nodeHeartbeat(request);
->↓[RM]↓ ResourceTrackerService.nodeHeartbeat//下面都是RM內(nèi)的流程
->this.nodesListManager.isValidNode()//1. Check if it's a valid (i.e. not excluded) node
->RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);//2. Check if it's a registered node
->ResourceTrackerService.nmLivelinessMonitor.receivedPing(nodeId);//記錄心跳
->//3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
->RMNodeImpl.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
->response.addAllContainersToCleanup(new ArrayList<ContainerId>(this.containersToClean));response.addAllApplicationsToCleanup(this.finishedApplications);response.addContainersToBeRemovedFromNM(new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
->rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent)//發(fā)送RMNodeEventType.STATUS_UPDATE事件
->[NodeState.RUNNING, NodeState.RUNNING]StatusUpdateWhenHealthyTransition
->RMNodeImpl.handleContainerStatus(statusEvent.getContainers());//更新RMNodeImpl內(nèi)狀態(tài)
->RMNodeImpl.nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,completedContainers)//將container變動(dòng)加入nodeUpdateQueue中等下一步Scheduler用
->context.getDispatcher().getEventHandler().handle(new NodeUpdateSchedulerEvent(rmNode))//nodeupdate觸發(fā)Scheduler資源分配
->FifoScheduler.handle(NODE_UPDATE)
->FifoScheduler.nodeUpdate
->RMNodeImpl.pullContainerUpdates //之前匯報(bào)心跳保存的RMNodeImpl.nodeUpdateQueue
->AbstractYarnScheduler.containerLaunchedOnNode
->rmContainer.handle(new RMContainerEvent(containerId,RMContainerEventType.LAUNCHED));// Processing the newly launched containers,給Container發(fā)送LAUNCHED事件[TODO://]
->FifoScheduler.completedContainer(RMContainerEventType.FINISHED)
->application.containerCompleted(rmContainer, containerStatus, event);// Inform the application
->rmContainer.handle(new RMContainerFinishedEvent(containerId,containerStatus, event));
->[RMContainerState.RUNNING, RMContainerState.COMPLETED] FinishedTransition
->eventHandler.handle(new RMAppAttemptContainerFinishedEvent(container.appAttemptId, finishedEvent.getRemoteContainerStatus(),container.getAllocatedNode())//更新
->[RMAppAttemptState.RUNNING -> RMAppAttemptState.*]RMAppAttemptImpl.ContainerFinishedTransition.transition [TODO://Container對(duì)APP的影響后續(xù)再跟了]
->FiCaSchedulerNode.releaseContainer(container);
->FifoScheduler.assignContainers(node);//終于到了給node分配資源了
->//基于fifo取出app,然后基于priority分配資源
->FifoScheduler.assignContainersOnNode //依次滿足節(jié)點(diǎn)、機(jī)架悯舟、隨機(jī)的需求
->FifoScheduler.assignNodeLocalContainers
->FifoScheduler.assignRackLocalContainers
->FifoScheduler.assignOffSwitchContainers
->FifoScheduler.assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type)
->Container container =BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, null);// new capability Container
->FiCaSchedulerApp.allocate(type, node, priority, request, container)
->RMContainer rmContainer = new RMContainerImpl(container, this.getApplicationAttemptId(), node.getNodeID(),appSchedulingInfo.getUser(), this.rmContext)
->//更新SchedulerApplicationAttempt.newlyAllocatedContainers担租、SchedulerApplicationAttempt.liveContainers
->rmContainer.handle(new RMContainerEvent(container.getId(), RMContainerEventType.START)
->[RMContainerState.NEW->RMContainerState.ALLOCATED] ContainerStartedTransition
->eventHandler.handle(new RMAppAttemptContainerAllocatedEvent()) //RMAppAttemptEventType.CONTAINER_ALLOCATED
->[RMAppAttemptState.SCHEDULED->RMAppAttemptState.ALLOCATED_SAVING] AMContainerAllocatedTransition
->Allocation amContainerAllocation =appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,null);//拿到新分配的container
->appAttempt.setMasterContainer() // Set the masterContainer
->appAttempt.storeAttempt();
->RMStateStore.storeNewApplicationAttempt
->dispatcher.getEventHandler().handle(new RMStateStoreAppAttemptEvent(attemptState))
->RMStateStore.StoreAppAttemptTransition
->RMStateStore.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(),RMAppAttemptEventType.ATTEMPT_NEW_SAVED))
->[RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED]AttemptStoredTransition()
->RMAppAttemptImpl.launchAttempt -> eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));//發(fā)出拉起AM事件
->ApplicationMasterLauncher.handle(LAUNCH)->ApplicationMasterLauncher.createRunnableLauncher -> new AMLauncher
->ApplicationMasterLauncher線程會(huì)一直讀取masterEvents,并啟動(dòng)AMLauncher線程
->resourcemanager.amlauncher.AMLauncher.run
->resourcemanager.amlauncher.AMLauncher.launch();
->StartContainersResponse response = containerMgrProxy.startContainers(allRequests);
->[NM]ContainerManagerImpl.startContainers //NM拉起AM Container
->handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),RMAppAttemptEventType.LAUNCHED)) //至此RM已經(jīng)通知NM啟動(dòng)AM抵怎,等待AM的注冊(cè)奋救,使AppAttempt變?yōu)镽UNNING[TODO://后續(xù)流程見下面]
->FiCaSchedulerNode.allocateContainer(rmContainer)
->response.getNodeAction()->SHUTDOWN | RESYNC //NM是否關(guān)閉或是同步
->removeOrTrackCompletedContainersFromContext(response.getContainersToBeRemovedFromNM());//移除RM不再需要的Containers
->dispatcher.getEventHandler().handle(new CMgrCompletedContainersEvent(containersToCleanup,CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER))
->dispatcher.getEventHandler().handle(new ContainerKillEvent(container,ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,"Container Killed by ResourceManager")) //發(fā)送ContainerKillEvent
->heartbeatMonitor.wait(nextHeartBeatInterval);
->NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();//yarn后續(xù)版本的資源監(jiān)控直接起線程監(jiān)控了
->NodeManager.containerManager = createContainerManager(context, exec, del, nodeStatusUpdater,this.aclsManager, dirsHandler) [TODO://]
->ContainerManagerImpl.ContainerManagerImpl() //構(gòu)造函數(shù)
->ContainerManagerImpl.resourceLocalizationService = createResourceLocalizationService(exec, deletionContext, context);
->ResourceLocalizationService.serviceInit
->ResourceLocalizationService.publicRsrc = new LocalResourcesTrackerImpl
->cleanUpLocalDirs;initializeLocalDirs;initializeLogDirs;//初始化本地目錄
->localizationServerAddress= "localizer.address"http://Address where the localizer IPC is
->localizerTracker = createLocalizerTracker(conf)
->LocalizerTracker.LocalizerTracker()
->this.publicLocalizer = new PublicLocalizer(conf);
->this.privLocalizers = privLocalizers; //new一個(gè)
->LocalizerTracker.serviceStart
->publicLocalizer.start()
->ResourceLocalizationService.PublicLocalizer.run()
->//循環(huán)從PublicLocalizer.queue中獲取完成下載的任務(wù),發(fā)出ResourceLocalizedEvent事件反惕,觸發(fā)LocalizedResource狀態(tài)變更方法
->[ResourceState.DOWNLOADING, ResourceState.LOCALIZED] FetchSuccessTransition
->ContainerManagerImpl.containersLauncher = createContainersLauncher(context, exec);
->ContainerManagerImpl.auxiliaryServices = new AuxServices();[TODO://AuxServices shuffle 后面再看]
->
->ContainerManagerImpl.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context);//其實(shí)監(jiān)控的是貌似只有內(nèi)存[TODO://監(jiān)控細(xì)節(jié)后續(xù)再看]
->ContainersMonitorImpl.serviceInit//初始化一堆比如CPU尝艘、內(nèi)存的配置,比如maxPmemAllottedForContainers,maxVCoresAllottedForContainers
->ContainersMonitorImpl.serviceStart
->MonitoringThread.run
->new ContainerKillEvent //如果發(fā)現(xiàn)container超過(guò)資源使用,則發(fā)出kill事件
->[->ContainerState.KILLING] KillTransition
->ContainerManagerImpl.serviceInit
->new LogAggregationService(this.dispatcher, context,deletionService, dirsHandler)[TODO://LogAggregation相關(guān)的后續(xù)再看]
->ContainerManagerImpl.serviceStart
->ContainerManagerImpl.server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(),conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));//創(chuàng)建NM 的ipc接口服務(wù)
->WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager, dirsHandler); //啟動(dòng)NM 的web服務(wù)承璃,默認(rèn)8042
->NodeManager.serviceStart;
->super.serviceStart(); //do nothing
2.3 NM啟動(dòng)container
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.startContainers
->ContainerManagerImpl.startContainerInternal
->Container container = new ContainerImpl(getConfig(), this.dispatcher,launchContext, credentials, metrics, containerTokenIdentifier,context);
->Application application = new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
->dispatcher.getEventHandler().handle(new ApplicationInitEvent(applicationID, appAcls,logAggregationContext))//如果本NM上沒(méi)有此APP_ID,則初始化
->[ApplicationState.NEW->ApplicationState.INITING]AppInitTransition
->dispatcher.getEventHandler().handle(new LogHandlerAppStartedEvent())
->logaggregation.LogAggregationService.handle(APPLICATION_STARTED) [TODO://logaggregation的暫時(shí)都跳過(guò)]
->this.context.getNMStateStore().storeContainer(containerId, request);
->dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(container))
->[ApplicationState.INITING->ApplicationState.INITING]InitContainerTransition
->ApplicationImpl.containers.put(container.getContainerId(), container);
->dispatcher.getEventHandler().handle(new ContainerInitEvent(container.getContainerId())); //ContainerEventType.INIT_CONTAINER
->[ContainerState.NEW->ContainerState.LOCALIZING]RequestResourcesTransition
->dispatcher.getEventHandler().handle(new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT, container));[TODO://AuxServices相關(guān)的暫時(shí)跳過(guò)]
->container.pendingResources分別放入ContainerImpl.publicRsrcs利耍,ContainerImpl.privateRsrcs,ContainerImpl.appRsrcs
->dispatcher.getEventHandler().handle(new ContainerLocalizationRequestEvent(container, req)) //資源下載請(qǐng)求封裝為ContainerLocalizationRequestEvent
->nodemanager.containermanager.localizer.ResourceLocalizationService.handle(INIT_CONTAINER_RESOURCES)
->ResourceLocalizationService.handleInitContainerResources
->LocalizerContext ctxt = new LocalizerContext(c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
->LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),c.getContainerId().getApplicationAttemptId().getApplicationId()) //public為全局,PRIVATE蚌本、APPLICATION獲取map中相應(yīng)的實(shí)例
->tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
->[ResourceState.INIT, ResourceState.DOWNLOADING]FetchResourceTransition
->LocalizedResource.dispatcher.getEventHandler().handle(new LocalizerResourceRequestEvent))
->containermanager.localizer.ResourceLocalizationService.LocalizerTracker.handle(REQUEST_RESOURCE_LOCALIZATION)
->PUBLIC:
->PublicLocalizer.addResource(req);
->PRIVATE || APPLICATION
->if (null == localizer) {localizer = new LocalizerRunner(req.getContext(), locId);localizer.start()};
->nodemanager.LinuxContainerExecutor.startLocalizer //調(diào)用container-executor啟動(dòng)ContainerLocalizer
-> delService.delete(context.getUser(),null, paths.toArray(new Path[paths.size()])) //下載的文件加入delService
->localizer.addResource(req);->LocalizerRunner.pending.add(..)
->scheduled.put(nextRsrc, evt);//上面初始化的ContainerLocalizer會(huì)持續(xù)的輪詢盔粹,并啟動(dòng)執(zhí)行下再
->getLocalResourcesTracker(req.getVisibility(), user, applicationId).handle(new ResourceLocalizedEvent()) //ResourceEventType.LOCALIZED
->[ResourceState.DOWNLOADING -> ResourceState.LOCALIZED]FetchSuccessTransition
->dispatcher.getEventHandler().handle(new ContainerResourceLocalizedEvent())
->[ContainerState.LOCALIZING -> ContainerState.LOCALIZED] LocalizedTransition
->ContainerImpl.sendLaunchEvent();
->dispatcher.getEventHandler().handle(new ContainersLauncherEvent()) //ContainersLauncherEventType.LAUNCH_CONTAINER
->launcher.ContainersLauncher.handle(LAUNCH_CONTAINER)
->ContainerLaunch launch = new ContainerLaunch(context, getConfig(), dispatcher, exec, app,event.getContainer(), dirsHandler, containerManager);
->containerLauncher.submit(launch);//提交線程池
->ContainerLaunch.call
->
->dispatcher.getEventHandler().handle(new ContainerEvent(containerID,ContainerEventType.CONTAINER_LAUNCHED)))
->LinuxContainerExecutor.activateContainer(containerID, pidFilePath);//開始執(zhí)行container
->int ret = LinuxContainerExecutor.launchContainer(container, nmPrivateContainerScriptPath,nmPrivateTokensPath, user, appIdStr, containerWorkDir,localDirs, logDirs)
->CASE(Throwable)
-> dispatcher.getEventHandler().handle(new ContainerExitEvent(containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,e.getMessage()))//報(bào)錯(cuò)觸發(fā)CONTAINER_EXITED_WITH_FAILURE
->CASE(ExitCode.FORCE_KILLED || ExitCode.TERMINATED)
->dispatcher.getEventHandler().handle(new ContainerExitEvent(containerID,ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,"Container exited with a non-zero exit code " + ret)
->CASE(0)
->dispatcher.getEventHandler().handle(new ContainerEvent(containerID,ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS))
->[ContainerState.RUNNING->ContainerState.EXITED_WITH_SUCCESS]ExitedWithSuccessTransition
->dispatcher.getEventHandler().handle(new ContainersLauncherEvent(container,ContainersLauncherEventType.CLEANUP_CONTAINER)
->nodemanager.containermanager.launcher.ContainerLaunch.cleanupContainer //執(zhí)行完成清理Container[TODO://]
->ContainersLauncher.running.put(containerId, launch);
2.4 AM[distributedshell.ApplicationMaster instead of MRAppMaster
]匯報(bào)給RM心跳,聯(lián)系NM啟動(dòng)container程癌,并監(jiān)控狀態(tài)
distributedshell.ApplicationMaster.main
->distributedshell.ApplicationMaster.init//yarn container傳參都是通過(guò)環(huán)境變量傳參的舷嗡,envs=System.getenv()
->appAttemptID = containerId.getApplicationAttemptId()
->shellCommand = readContent(shellCommandPath);//文件等從hdfs讀取,client提交任務(wù)會(huì)準(zhǔn)備好
->distributedshell.ApplicationMaster.run
->amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); //AMRMClientAsync.CallbackHandler 由于不管是NM管理container還是RM分配資源都是異步的嵌莉,需要提供Callback來(lái)處理請(qǐng)求結(jié)果
->nmClientAsync = new NMClientAsyncImpl(containerListener); //NMCallbackHandler
->//Setup local RPC Server to accept status requests directly from clients 按道理AM要提供一個(gè)應(yīng)用層協(xié)議RPC接口供client獲取日志进萄、狀態(tài)等
->amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort,appMasterTrackingUrl);
->↓[RM]↓ApplicationMasterService.registerApplicationMaster //RM中注冊(cè)AM
->rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptRegistrationEvent())
->[RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING] AMRegisteredTransition //Attempt轉(zhuǎn)為RUNNING狀態(tài),后續(xù)FINISHING锐峭、KILLED中鼠、FAILED以后再補(bǔ)充
->eventHandler.handle(new RMAppEvent(appAttempt.getAppAttemptId().getApplicationId(),RMAppEventType.ATTEMPT_REGISTERED))
->[RMAppState.ACCEPTED->RMAppState.RUNNING] //do nothing
->distributedshell.ApplicationMaster.RMCallbackHandler.onContainersAllocated //當(dāng)AM的資源獲的滿足,則AM直接聯(lián)系NM啟動(dòng)containers
->ApplicationMaster.LaunchContainerRunnable.run
->ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(localResources, myShellEnv, commands, null, allTokens.duplicate()//初始化ContainerLaunchContext
->startContainerAsync.startContainerAsync(container, ctx)//聯(lián)系NM拉起Container
->[NM]org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.startContainers
->ApplicationMaster.NMCallbackHandler.NMCallbackHandler//獲取NM的Container狀態(tài)沿癞、事件[TODO://]
->distributedshell.ApplicationMaster.finish
->unregisterApplicationMaster(appStatus, appMessage, null);
->[RM]ApplicationMasterService.finishApplicationMaster //通知RM任務(wù)完成
三.調(diào)度器
實(shí)現(xiàn)一個(gè)調(diào)度器需要關(guān)注一下幾點(diǎn):本地化
援雇、reserve
、preempt
椎扬、label
TODO://后續(xù)再補(bǔ)充了
四.reference
- https://hortonworks.com/blog/introducing-apache-hadoop-yarn/
- Hadoop YARN權(quán)威指南
- https://www.cnblogs.com/shenh062326/p/3587108.html/
- https://blog.csdn.net/gaopenghigh/article/details/45507765/