APACHE YARN 2.6.0 CODE REVIEW

導(dǎo)語(yǔ)

1.YARN各模塊通信協(xié)議音羞、gateway對(duì)象?
2.YARN App從client啟動(dòng)到執(zhí)行完成的生命周期實(shí)現(xiàn)万哪?
3.CapacityScheduler、FairScheduler實(shí)現(xiàn)細(xì)節(jié)?//TODO:
4.LinuxContainerExecutor vs Docker?//TODO:

YARN 2.6.0 版本還未支持GPU,且未啟用DRF的DominantResourceCalculator的情況下知纷,默認(rèn)的DefaultResourceCalculator只是基于Memory來(lái)切割分配Container壤圃。


一.各組件間通信協(xié)議

  • 框架層協(xié)議
ApplicationClientProtocol [ClientRMService] // clients -> RM
ContainerManagementProtocol[ContainerManagerImpl] // AM|RM -> NM
ApplicationMasterProtocol[ApplicationMasterService] // AM -> RM
ResourceTracker[ResourceTrackerService] // NM -> RM
  • 應(yīng)用層協(xié)議
MR:MRClientProtocol [MRClientService] //clinet -> AM

二.on-yarn-app生命周期

Client請(qǐng)求RM獲取AM_ID,然后封裝ApplicationSubmissionContext提交給RM,RM響應(yīng)NM心跳琅轧,調(diào)度NM拉起AM伍绳,AM向RM注冊(cè),并匯報(bào)心跳請(qǐng)求資源乍桂,拿到資源后請(qǐng)求NM啟動(dòng)worker Container冲杀。
以下Client基于MR的client,但是AM用了distributedshell這個(gè)簡(jiǎn)單的版本睹酌,后續(xù)再完善MR版本的了


2.1 Client初始化权谁,提交Job[MR]到RM,監(jiān)控狀態(tài),打印進(jìn)度日志
org.apache.hadoop.examples.WordCount.main
    ->new Job(conf, "word count");
    ->job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);
    ->FileInputFormat.addInputPath(job, new Path(otherArgs[i]))->FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]))
    ->job.waitForCompletion(true)
        ->org.apache.hadoop.mapreduce.Job.submit();
            ->org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job.this, cluster);
                ->JobSubmissionFiles.getStagingDir(cluster, conf);//初始化stage目錄/staging/user/.staging
                ->org.apache.hadoop.mapred.ResourceMgrDelegate.getNewJobID
                    ->client.createApplication().getApplicationSubmissionContext()//向RM申請(qǐng)AM_ID
                        ->ApplicationClientProtocol.getNewApplication
                            ->[RM]ClientRMService.getNewApplication
                                ->[RM]org.apache.hadoop.yarn.server.utils.BuilderUtils.newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),applicationCounter.incrementAndGet() //就是RM的啟動(dòng)時(shí)間+自增ID
                    ->new org.apache.hadoop.mapred.JobID(identifier, appID.getId());//基于AM_ID構(gòu)建Job_ID
                ->conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, new Path(jobStagingArea, jobId.toString()));//Configuring job with submitJobDir as the submit dir
                ->get delegation token for the dir
                ->JobResourceUploader.uploadFiles(job, jobSubmitDir);//Upload and configure files, libjars, jobjars, and archives pertaining to * the passed job
                ->JobSubmitter.writeSplits(job, submitJobDir);
                    ->org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(conf, splits, out);
                    ->org.apache.hadoop.mapreduce.split.JobSplitWriter.writeJobSplitMetaInfo
                ->conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());//write "queue admins of the queue to which job is being submitted"
                ->writeConf(conf, submitJobFile);//Write job file to submit dir
                ->org.apache.hadoop.mapred.YARNRunner.submitJob(jobId, submitJobDir.toString(), job.getCredentials())
                    ->YARNRunner.createApplicationSubmissionContext
                        ->capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB));capability.setVirtualCores;
                        ->localResources.put(MRJobConfig.JOB_CONF_FILE,createApplicationResource(defaultFileContext,jobConfPath, LocalResourceType.FILE))
                        ->localResources.put(MRJobConfig.JOB_JAR, createApplicationResource(FileContext.getFileContext(jobJarPath.toUri(), jobConf),jobJarPath,LocalResourceType.PATTERN)) //jar
                        ->localResources.put(MRJobConfig.JOB_SUBMIT_DIR + "/" + (MRJobConfig.JOB_SPLIT||MRJobConfig.JOB_SPLIT_METAINFO),createApplicationResource(defaultFileContext,new Path(jobSubmitDir, s), LocalResourceType.FILE)//split info
                        ->/bin/java org.apache.hadoop.mapreduce.v2.app.MRAppMaster ..>>ApplicationConstants.LOG_DIR_EXPANSION_VAR  //Setup the command to run the AM
                        ->MRApps.setClasspath(environment, conf);
                        ->//Setup the environment
                        ->ContainerLaunchContext amContainer =ContainerLaunchContext.newInstance(localResources, environment,vargsFinal, null, securityTokens, acls);
                        ->appContext.setApplicationId(applicationId); appContext.setQueue();appContext.setApplicationName
                        ->appContext.setAMContainerSpec(amContainer); // AM Container
                    ->[RM]ClientRMService.submitApplication(appContext)
                        ->[RM]RMAppManager.submitApplication
                            ->[RM]new RMAppImpl() // Create RMApp
                            ->[RM]rmContext.getRMApps().putIfAbsent(applicationId, application)//add to rmContext
                            ->[RMAppState.NEW->RMAppEventType.NEW_SAVING]RMAppImpl.RMAppNewlySavingTransition.transition()
                                ->StoreAppTransition.transition
                                ->storeApplicationStateInternal(appId, appState);//RMAppEventType.APP_NEW_SAVED
                            ->[RMAppState.NEW_SAVING->RMAppState.SUBMITTED]AddApplicationToSchedulerTransition
                                ->FifoScheduler.handle(new AppAddedSchedulerEvent())//APP_ADDED
                                    ->FifoScheduler.addApplication->AbstractYarnScheduler.applications.put(applicationId, application);
                            ->[RMAppState.SUBMITTED->RMAppState.ACCEPTED] StartAppAttemptTransition
                                ->RMAppImpl.createAndStartNewAttempt
                                    ->RMAppImpl.createNewAttempt
                                        ->new RMAppAttemptImpl
                                        ->RMAppImpl.attempts.put(appAttemptId, attempt);
                                ->[RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED]AttemptStartedTransition
                                    ->ApplicationMasterService.registerAppAttempt(appAttempt.applicationAttemptId)
                                        ->ApplicationMasterService.responseMap.put(attemptId, new AllocateResponseLock(response));
                                    ->FifoScheduler.handle(new AppAttemptAddedSchedulerEvent())//APP_ATTEMPT_ADDED
                                        ->FifoScheduler.addApplicationAttempt
                                            ->[RMAppAttemptState.SUBMITTED, RMAppAttemptState.SCHEDULED] ScheduleTransition
                                                ->appAttempt.amReq.setNumContainers(1);setPriority(AM_CONTAINER_PRIORITY);setResourceName(ResourceRequest.ANY);setRelaxLocality(true);//設(shè)置AM ResourceRequest
                                                ->appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,Collections.singletonList(appAttempt.amReq),EMPTY_CONTAINER_RELEASE_LIST,amBlacklist.getAdditions(),amBlacklist.getRemovals())
                                                    ->FifoScheduler.allocate //注意:此處并未同步分配資源憋沿,只是記錄下來(lái)旺芽,返回的Allocation是之前異步分配得到的資源
                                                        ->scheduler.SchedulerApplicationAttempt.pullNewlyAllocatedContainersAndNMTokens
                                                            ->rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),RMContainerEventType.ACQUIRED)) //給NM->scheduler->app獲取到的newlyAllocatedContainers發(fā)送event
                                                                ->[RMContainerState.ALLOCATED -> RMContainerState.ACQUIRED] AcquiredTransition
                                                                    ->container.containerAllocationExpirer.register(container.getContainerId());
                                                                    ->[RMAppState.ACCEPTED->RMAppState.ACCEPTED] AppRunningOnNodeTransition
                                                                        ->rmAppImpl.ranNodes.add(nodeAddedEvent.getNodeId());
                                                            ->return new ContainersAndNMTokensAllocation(SchedulerApplicationAttempt.newlyAllocatedContainers, nmTokens);//返回獲取到的application獲得的Container[TODO://newlyAllocatedContainers如何初始化的,即scheduler如何賦值這個(gè)]
                    ->[RM]ClientRMService.getApplicationReport
                        ->[RM]RMAppImpl.createAndGetApplicationReport
                            ->RMAppImpl.currentAttempt //讀取currentAttempt狀態(tài)并返回
        ->org.apache.hadoop.mapreduce.Job.monitorAndPrintJob();//連接AM并讀取應(yīng)用狀態(tài),打印日志
            ->while (!isComplete() || !reportedAfterCompletion) 
                ->mapreduce.Job.updateStatus //更新MR進(jìn)度
                    ->YARNRunner.getJobStatus(status.getJobID())
                        ->mapred.ClientServiceDelegate.getJobStatus
                            ->obReport report = ((GetJobReportResponse) invoke("getJobReport",GetJobReportRequest.class, request)).getJobReport()
                                ->mapred.ClientServiceDelegate.invoke
                                    ->mapred.ClientServiceDelegate.getProxy
                                        ->MRClientProtocol MRClientProxy=application.getTrackingUrl();
                                        ->serviceAddr = NetUtils.createSocketAddrForHost(application.getHost(), application.getRpcPort());//獲取AM的host以及RPC接口
                                        ->instantiateAMProxy(serviceAddr)//構(gòu)建client->AM proxy
                                ->[AM]MRClientService.getJobReport //AM讀取job狀態(tài)
                                    ->org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.getReport
                ->mapreduce.Job.getTaskCompletionEvents 
                    ->[AM]org.apache.hadoop.mapreduce.v2.app.client.MRClientService.MRClientProtocolHandler.getTaskAttemptCompletionEvents

2.2 NM啟動(dòng),匯報(bào)心跳給RM , Scheduler[FifoScheduler]分配資源采章,拉起AM container
org.apache.hadoop.yarn.server.nodemanager.NodeManager.main
    ->new NodeManager().initAndStartNodeManager(conf, false) //CompositeService類
        ->NodeManager.serviceInit;
            ->initAndStartRecoveryStore//涉及Recovery的暫時(shí)跳過(guò)
            ->this.aclsManager = new ApplicationACLsManager(conf);//涉及Token的也都跳過(guò)
            ->ContainerExecutor exec = ReflectionUtils.newInstance(conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,DefaultContainerExecutor.class, ContainerExecutor.class), conf);
            ->ContainerExecutor.init
                ->nodemanager.LinuxContainerExecutor.init || DockerContainerExecutor
                    ->container-executor --checksetup //執(zhí)行shell檢查健康狀態(tài)
            ->addService(DeletionService)//NM本地文件及日志的清理Service
            ->new NodeHealthCheckerService();//健康檢查service
            ->new LocalDirsHandlerService()//本地目錄檢查
            ->NodeManager.context=createNMContext(containerTokenSecretManager,nmTokenSecretManager, nmStore)
            ->NodeManager.nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);[TODO://]
                ->NodeStatusUpdaterImpl.NodeStatusUpdaterImpl()
                ->NodeStatusUpdaterImpl.serviceInit
                    ->NodeStatusUpdaterImpl.totalResource = Resource.newInstance(memoryMb, virtualCores) //計(jì)算當(dāng)前節(jié)點(diǎn)資源
                ->NodeStatusUpdaterImpl.serviceStart
                    ->NodeStatusUpdaterImpl.resourceTracker = getRMClient();
                    ->NodeStatusUpdaterImpl.registerWithRM
                        ->RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
                            ->[RM]org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService.registerNodeManager //以下全是RM里的操作
                                ->[RM]this.nodesListManager.isValidNode(host)//Check if this node is a 'valid' node
                                ->[RM]RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,resolve(host), capability, nodeManagerVersion)
                                ->[RM]this.rmContext.getDispatcher().getEventHandler().handle(new RMNodeStartedEvent)
                                    ->[NodeState.NEW, NodeState.RUNNING]AddNodeTransition
                                ->[RM]resourcemanager.ResourceTrackerService.handleNMContainerStatus(status, nodeId);
                                    ->if (container.getContainerState() == ContainerState.RUNNING) { RMNodeImpl.launchedContainers.add(container.getContainerId()) }//將NM匯報(bào)的的container加入launchedContainers
                                    ->RMNodeImpl.handleRunningAppOnNode(rmNode, rmNode.context, startEvent.getRunningApplications(), rmNode.nodeId)//處理RunningApplications运嗜,分發(fā)RMAppRunningOnNodeEvent事件
                                        ->getDispatcher().getEventHandler().handle(new RMAppRunningOnNodeEvent)
                                            ->[ ->RMAppState.RUNNING]AppRunningOnNodeTransition
                                    ->context.getDispatcher().getEventHandler().handle(new NodeAddedSchedulerEvent())
                                        ->FifoScheduler.handle(NODE_ADDED)
                                            ->FifoScheduler.addNode
                                                ->FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(rMNode, usePortForNodeName);
                                                ->FifoScheduler.nodes.put(nodeManager.getNodeID(), schedulerNode);
                                                ->Resources.addTo(FifoScheduler.clusterResource, nodeManager.getTotalCapability());//當(dāng)前NM的資源加入集群資源
                                                ->AbstractYarnScheduler.updateMaximumAllocation//更新集群最大資源配置
                                            ->AbstractYarnScheduler.recoverContainersOnNode //recoverContainers on that NM
                                    ->context.getDispatcher().getEventHandler().handle(new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE, rmNode))
                                        ->resourcemanager.NodesListManager.handle(NODE_USABLE)
                                            ->rmContext.getDispatcher().getEventHandler().handle(new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,RMAppNodeUpdateType.NODE_USABLE)
                                                -[RMAppState.* -> RMAppState.*] RMAppNodeUpdateTransition
                                                    ->RMAppImpl.processNodeUpdate
                                                        ->RMAppImpl.updatedNodes.add(node);//將NM狀態(tài)變更告訴RMAppImpl
                    ->NodeStatusUpdaterImpl.startStatusUpdater //啟動(dòng)線程匯報(bào)心跳并觸發(fā)RM Scheduler
                        ->response = resourceTracker.nodeHeartbeat(request);
                            ->↓[RM]↓ ResourceTrackerService.nodeHeartbeat//下面都是RM內(nèi)的流程
                                ->this.nodesListManager.isValidNode()//1. Check if it's a valid (i.e. not excluded) node
                                ->RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);//2. Check if it's a registered node
                                ->ResourceTrackerService.nmLivelinessMonitor.receivedPing(nodeId);//記錄心跳
                                ->//3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
                                ->RMNodeImpl.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
                                    ->response.addAllContainersToCleanup(new ArrayList<ContainerId>(this.containersToClean));response.addAllApplicationsToCleanup(this.finishedApplications);response.addContainersToBeRemovedFromNM(new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
                                ->rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent)//發(fā)送RMNodeEventType.STATUS_UPDATE事件
                                    ->[NodeState.RUNNING, NodeState.RUNNING]StatusUpdateWhenHealthyTransition
                                        ->RMNodeImpl.handleContainerStatus(statusEvent.getContainers());//更新RMNodeImpl內(nèi)狀態(tài)
                                            ->RMNodeImpl.nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,completedContainers)//將container變動(dòng)加入nodeUpdateQueue中等下一步Scheduler用
                                        ->context.getDispatcher().getEventHandler().handle(new NodeUpdateSchedulerEvent(rmNode))//nodeupdate觸發(fā)Scheduler資源分配
                                            ->FifoScheduler.handle(NODE_UPDATE)
                                                ->FifoScheduler.nodeUpdate
                                                    ->RMNodeImpl.pullContainerUpdates //之前匯報(bào)心跳保存的RMNodeImpl.nodeUpdateQueue
                                                    ->AbstractYarnScheduler.containerLaunchedOnNode
                                                        ->rmContainer.handle(new RMContainerEvent(containerId,RMContainerEventType.LAUNCHED));// Processing the newly launched containers,給Container發(fā)送LAUNCHED事件[TODO://]
                                                    ->FifoScheduler.completedContainer(RMContainerEventType.FINISHED)
                                                        ->application.containerCompleted(rmContainer, containerStatus, event);// Inform the application
                                                            ->rmContainer.handle(new RMContainerFinishedEvent(containerId,containerStatus, event));
                                                                ->[RMContainerState.RUNNING, RMContainerState.COMPLETED] FinishedTransition
                                                                    ->eventHandler.handle(new RMAppAttemptContainerFinishedEvent(container.appAttemptId, finishedEvent.getRemoteContainerStatus(),container.getAllocatedNode())//更新
                                                                        ->[RMAppAttemptState.RUNNING -> RMAppAttemptState.*]RMAppAttemptImpl.ContainerFinishedTransition.transition [TODO://Container對(duì)APP的影響后續(xù)再跟了]
                                                        ->FiCaSchedulerNode.releaseContainer(container);
                                                    ->FifoScheduler.assignContainers(node);//終于到了給node分配資源了
                                                        ->//基于fifo取出app,然后基于priority分配資源
                                                        ->FifoScheduler.assignContainersOnNode //依次滿足節(jié)點(diǎn)、機(jī)架悯舟、隨機(jī)的需求
                                                            ->FifoScheduler.assignNodeLocalContainers 
                                                            ->FifoScheduler.assignRackLocalContainers
                                                            ->FifoScheduler.assignOffSwitchContainers
                                                                ->FifoScheduler.assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type)
                                                                    ->Container container =BuilderUtils.newContainer(containerId, nodeId, node.getRMNode().getHttpAddress(), capability, priority, null);// new capability Container 
                                                                    ->FiCaSchedulerApp.allocate(type, node, priority, request, container) 
                                                                        ->RMContainer rmContainer = new RMContainerImpl(container, this.getApplicationAttemptId(), node.getNodeID(),appSchedulingInfo.getUser(), this.rmContext)
                                                                            ->//更新SchedulerApplicationAttempt.newlyAllocatedContainers担租、SchedulerApplicationAttempt.liveContainers
                                                                            ->rmContainer.handle(new RMContainerEvent(container.getId(), RMContainerEventType.START)
                                                                                ->[RMContainerState.NEW->RMContainerState.ALLOCATED] ContainerStartedTransition
                                                                                    ->eventHandler.handle(new RMAppAttemptContainerAllocatedEvent()) //RMAppAttemptEventType.CONTAINER_ALLOCATED
                                                                                        ->[RMAppAttemptState.SCHEDULED->RMAppAttemptState.ALLOCATED_SAVING] AMContainerAllocatedTransition
                                                                                            ->Allocation amContainerAllocation =appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,null);//拿到新分配的container
                                                                                            ->appAttempt.setMasterContainer() // Set the masterContainer
                                                                                            ->appAttempt.storeAttempt();
                                                                                                ->RMStateStore.storeNewApplicationAttempt
                                                                                                    ->dispatcher.getEventHandler().handle(new RMStateStoreAppAttemptEvent(attemptState))
                                                                                                        ->RMStateStore.StoreAppAttemptTransition
                                                                                                            ->RMStateStore.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(),RMAppAttemptEventType.ATTEMPT_NEW_SAVED))
                                                                                                                ->[RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED]AttemptStoredTransition() 
                                                                                                                    ->RMAppAttemptImpl.launchAttempt -> eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));//發(fā)出拉起AM事件
                                                                                                                        ->ApplicationMasterLauncher.handle(LAUNCH)->ApplicationMasterLauncher.createRunnableLauncher -> new AMLauncher
                                                                                                                        ->ApplicationMasterLauncher線程會(huì)一直讀取masterEvents,并啟動(dòng)AMLauncher線程
                                                                                                                            ->resourcemanager.amlauncher.AMLauncher.run
                                                                                                                                ->resourcemanager.amlauncher.AMLauncher.launch();
                                                                                                                                    ->StartContainersResponse response = containerMgrProxy.startContainers(allRequests);
                                                                                                                                        ->[NM]ContainerManagerImpl.startContainers //NM拉起AM Container
                                                                                                                                ->handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),RMAppAttemptEventType.LAUNCHED)) //至此RM已經(jīng)通知NM啟動(dòng)AM抵怎,等待AM的注冊(cè)奋救,使AppAttempt變?yōu)镽UNNING[TODO://后續(xù)流程見下面]

                                                                    ->FiCaSchedulerNode.allocateContainer(rmContainer)
                        ->response.getNodeAction()->SHUTDOWN | RESYNC //NM是否關(guān)閉或是同步
                        ->removeOrTrackCompletedContainersFromContext(response.getContainersToBeRemovedFromNM());//移除RM不再需要的Containers
                        ->dispatcher.getEventHandler().handle(new CMgrCompletedContainersEvent(containersToCleanup,CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER))
                            ->dispatcher.getEventHandler().handle(new ContainerKillEvent(container,ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,"Container Killed by ResourceManager")) //發(fā)送ContainerKillEvent
                        ->heartbeatMonitor.wait(nextHeartBeatInterval);
            ->NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();//yarn后續(xù)版本的資源監(jiān)控直接起線程監(jiān)控了
            ->NodeManager.containerManager = createContainerManager(context, exec, del, nodeStatusUpdater,this.aclsManager, dirsHandler) [TODO://]
                ->ContainerManagerImpl.ContainerManagerImpl() //構(gòu)造函數(shù)
                    ->ContainerManagerImpl.resourceLocalizationService = createResourceLocalizationService(exec, deletionContext, context);
                        ->ResourceLocalizationService.serviceInit
                            ->ResourceLocalizationService.publicRsrc = new LocalResourcesTrackerImpl
                            ->cleanUpLocalDirs;initializeLocalDirs;initializeLogDirs;//初始化本地目錄
                            ->localizationServerAddress= "localizer.address"http://Address where the localizer IPC is
                            ->localizerTracker = createLocalizerTracker(conf)
                                ->LocalizerTracker.LocalizerTracker()
                                    ->this.publicLocalizer = new PublicLocalizer(conf); 
                                    ->this.privLocalizers = privLocalizers; //new一個(gè)
                                ->LocalizerTracker.serviceStart
                                    ->publicLocalizer.start()
                                        ->ResourceLocalizationService.PublicLocalizer.run()
                                            ->//循環(huán)從PublicLocalizer.queue中獲取完成下載的任務(wù),發(fā)出ResourceLocalizedEvent事件反惕,觸發(fā)LocalizedResource狀態(tài)變更方法
                                                ->[ResourceState.DOWNLOADING, ResourceState.LOCALIZED] FetchSuccessTransition
                    ->ContainerManagerImpl.containersLauncher = createContainersLauncher(context, exec);
                    ->ContainerManagerImpl.auxiliaryServices = new AuxServices();[TODO://AuxServices shuffle 后面再看]
                        ->
                    ->ContainerManagerImpl.containersMonitor = new ContainersMonitorImpl(exec, dispatcher, this.context);//其實(shí)監(jiān)控的是貌似只有內(nèi)存[TODO://監(jiān)控細(xì)節(jié)后續(xù)再看]
                        ->ContainersMonitorImpl.serviceInit//初始化一堆比如CPU尝艘、內(nèi)存的配置,比如maxPmemAllottedForContainers,maxVCoresAllottedForContainers
                        ->ContainersMonitorImpl.serviceStart
                            ->MonitoringThread.run
                                ->new ContainerKillEvent //如果發(fā)現(xiàn)container超過(guò)資源使用,則發(fā)出kill事件
                                    ->[->ContainerState.KILLING] KillTransition
                ->ContainerManagerImpl.serviceInit
                    ->new LogAggregationService(this.dispatcher, context,deletionService, dirsHandler)[TODO://LogAggregation相關(guān)的后續(xù)再看]
                ->ContainerManagerImpl.serviceStart
                    ->ContainerManagerImpl.server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(),conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));//創(chuàng)建NM 的ipc接口服務(wù)
            ->WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager, dirsHandler); //啟動(dòng)NM 的web服務(wù)承璃,默認(rèn)8042
        ->NodeManager.serviceStart;
            ->super.serviceStart(); //do nothing
2.3 NM啟動(dòng)container
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.startContainers
    ->ContainerManagerImpl.startContainerInternal
        ->Container container = new ContainerImpl(getConfig(), this.dispatcher,launchContext, credentials, metrics, containerTokenIdentifier,context);
        ->Application application = new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
        ->dispatcher.getEventHandler().handle(new ApplicationInitEvent(applicationID, appAcls,logAggregationContext))//如果本NM上沒(méi)有此APP_ID,則初始化
            ->[ApplicationState.NEW->ApplicationState.INITING]AppInitTransition
                ->dispatcher.getEventHandler().handle(new LogHandlerAppStartedEvent())
                    ->logaggregation.LogAggregationService.handle(APPLICATION_STARTED) [TODO://logaggregation的暫時(shí)都跳過(guò)]
        ->this.context.getNMStateStore().storeContainer(containerId, request);
        ->dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(container))
            ->[ApplicationState.INITING->ApplicationState.INITING]InitContainerTransition
                ->ApplicationImpl.containers.put(container.getContainerId(), container);
                ->dispatcher.getEventHandler().handle(new ContainerInitEvent(container.getContainerId())); //ContainerEventType.INIT_CONTAINER
                    ->[ContainerState.NEW->ContainerState.LOCALIZING]RequestResourcesTransition
                        ->dispatcher.getEventHandler().handle(new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT, container));[TODO://AuxServices相關(guān)的暫時(shí)跳過(guò)]
                        ->container.pendingResources分別放入ContainerImpl.publicRsrcs利耍,ContainerImpl.privateRsrcs,ContainerImpl.appRsrcs
                        ->dispatcher.getEventHandler().handle(new ContainerLocalizationRequestEvent(container, req)) //資源下載請(qǐng)求封裝為ContainerLocalizationRequestEvent
                            ->nodemanager.containermanager.localizer.ResourceLocalizationService.handle(INIT_CONTAINER_RESOURCES)
                                ->ResourceLocalizationService.handleInitContainerResources
                                    ->LocalizerContext ctxt = new LocalizerContext(c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
                                    ->LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),c.getContainerId().getApplicationAttemptId().getApplicationId()) //public為全局,PRIVATE蚌本、APPLICATION獲取map中相應(yīng)的實(shí)例
                                    ->tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
                                        ->[ResourceState.INIT, ResourceState.DOWNLOADING]FetchResourceTransition
                                            ->LocalizedResource.dispatcher.getEventHandler().handle(new LocalizerResourceRequestEvent))
                                                ->containermanager.localizer.ResourceLocalizationService.LocalizerTracker.handle(REQUEST_RESOURCE_LOCALIZATION)
                                                    ->PUBLIC:
                                                        ->PublicLocalizer.addResource(req);
                                                    ->PRIVATE || APPLICATION
                                                        ->if (null == localizer) {localizer = new LocalizerRunner(req.getContext(), locId);localizer.start()};
                                                            ->nodemanager.LinuxContainerExecutor.startLocalizer //調(diào)用container-executor啟動(dòng)ContainerLocalizer
                                                            -> delService.delete(context.getUser(),null, paths.toArray(new Path[paths.size()])) //下載的文件加入delService
                                                            ->localizer.addResource(req);->LocalizerRunner.pending.add(..)
                                                                ->scheduled.put(nextRsrc, evt);//上面初始化的ContainerLocalizer會(huì)持續(xù)的輪詢盔粹,并啟動(dòng)執(zhí)行下再
                                                                    ->getLocalResourcesTracker(req.getVisibility(), user, applicationId).handle(new ResourceLocalizedEvent()) //ResourceEventType.LOCALIZED
                                                                        ->[ResourceState.DOWNLOADING -> ResourceState.LOCALIZED]FetchSuccessTransition
                                                                            ->dispatcher.getEventHandler().handle(new ContainerResourceLocalizedEvent())
                                                                                ->[ContainerState.LOCALIZING -> ContainerState.LOCALIZED] LocalizedTransition
                                                                                    ->ContainerImpl.sendLaunchEvent();
                                                                                        ->dispatcher.getEventHandler().handle(new ContainersLauncherEvent()) //ContainersLauncherEventType.LAUNCH_CONTAINER
                                                                                            ->launcher.ContainersLauncher.handle(LAUNCH_CONTAINER)
                                                                                                ->ContainerLaunch launch = new ContainerLaunch(context, getConfig(), dispatcher, exec, app,event.getContainer(), dirsHandler, containerManager);
                                                                                                ->containerLauncher.submit(launch);//提交線程池
                                                                                                    ->ContainerLaunch.call
                                                                                                        ->
                                                                                                        ->dispatcher.getEventHandler().handle(new ContainerEvent(containerID,ContainerEventType.CONTAINER_LAUNCHED)))
                                                                                                        ->LinuxContainerExecutor.activateContainer(containerID, pidFilePath);//開始執(zhí)行container
                                                                                                        ->int ret = LinuxContainerExecutor.launchContainer(container, nmPrivateContainerScriptPath,nmPrivateTokensPath, user, appIdStr, containerWorkDir,localDirs, logDirs)
                                                                                                        ->CASE(Throwable)
                                                                                                             -> dispatcher.getEventHandler().handle(new ContainerExitEvent(containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,e.getMessage()))//報(bào)錯(cuò)觸發(fā)CONTAINER_EXITED_WITH_FAILURE
                                                                                                        ->CASE(ExitCode.FORCE_KILLED || ExitCode.TERMINATED)
                                                                                                            ->dispatcher.getEventHandler().handle(new ContainerExitEvent(containerID,ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,"Container exited with a non-zero exit code " + ret)
                                                                                                        ->CASE(0)
                                                                                                            ->dispatcher.getEventHandler().handle(new ContainerEvent(containerID,ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS))
                                                                                                                ->[ContainerState.RUNNING->ContainerState.EXITED_WITH_SUCCESS]ExitedWithSuccessTransition
                                                                                                                    ->dispatcher.getEventHandler().handle(new ContainersLauncherEvent(container,ContainersLauncherEventType.CLEANUP_CONTAINER)
                                                                                                                        ->nodemanager.containermanager.launcher.ContainerLaunch.cleanupContainer //執(zhí)行完成清理Container[TODO://]
                                                                                                ->ContainersLauncher.running.put(containerId, launch);

2.4 AM[distributedshell.ApplicationMaster instead of MRAppMaster]匯報(bào)給RM心跳,聯(lián)系NM啟動(dòng)container程癌,并監(jiān)控狀態(tài)
distributedshell.ApplicationMaster.main
    ->distributedshell.ApplicationMaster.init//yarn container傳參都是通過(guò)環(huán)境變量傳參的舷嗡,envs=System.getenv()
        ->appAttemptID = containerId.getApplicationAttemptId()
        ->shellCommand = readContent(shellCommandPath);//文件等從hdfs讀取,client提交任務(wù)會(huì)準(zhǔn)備好
    ->distributedshell.ApplicationMaster.run
        ->amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); //AMRMClientAsync.CallbackHandler 由于不管是NM管理container還是RM分配資源都是異步的嵌莉,需要提供Callback來(lái)處理請(qǐng)求結(jié)果
        ->nmClientAsync = new NMClientAsyncImpl(containerListener); //NMCallbackHandler
        ->//Setup local RPC Server to accept status requests directly from clients 按道理AM要提供一個(gè)應(yīng)用層協(xié)議RPC接口供client獲取日志进萄、狀態(tài)等
        ->amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort,appMasterTrackingUrl);
            ->↓[RM]↓ApplicationMasterService.registerApplicationMaster //RM中注冊(cè)AM
                ->rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptRegistrationEvent())
                    ->[RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING] AMRegisteredTransition //Attempt轉(zhuǎn)為RUNNING狀態(tài),后續(xù)FINISHING锐峭、KILLED中鼠、FAILED以后再補(bǔ)充
                        ->eventHandler.handle(new RMAppEvent(appAttempt.getAppAttemptId().getApplicationId(),RMAppEventType.ATTEMPT_REGISTERED))
                            ->[RMAppState.ACCEPTED->RMAppState.RUNNING] //do nothing
        ->distributedshell.ApplicationMaster.RMCallbackHandler.onContainersAllocated //當(dāng)AM的資源獲的滿足,則AM直接聯(lián)系NM啟動(dòng)containers
            ->ApplicationMaster.LaunchContainerRunnable.run
                ->ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(localResources, myShellEnv, commands, null, allTokens.duplicate()//初始化ContainerLaunchContext
                ->startContainerAsync.startContainerAsync(container, ctx)//聯(lián)系NM拉起Container
                    ->[NM]org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.startContainers
        ->ApplicationMaster.NMCallbackHandler.NMCallbackHandler//獲取NM的Container狀態(tài)沿癞、事件[TODO://]
    ->distributedshell.ApplicationMaster.finish
        ->unregisterApplicationMaster(appStatus, appMessage, null);
            ->[RM]ApplicationMasterService.finishApplicationMaster //通知RM任務(wù)完成

三.調(diào)度器

實(shí)現(xiàn)一個(gè)調(diào)度器需要關(guān)注一下幾點(diǎn):本地化援雇、reservepreempt椎扬、label
TODO://后續(xù)再補(bǔ)充了

四.reference

五.下篇

APACHE KAFKA 0.10.0 CODE REVIEW

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末惫搏,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蚕涤,更是在濱河造成了極大的恐慌筐赔,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揖铜,死亡現(xiàn)場(chǎng)離奇詭異茴丰,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門贿肩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鳞绕,“玉大人,你說(shuō)我怎么就攤上這事尸曼∶呛危” “怎么了?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵控轿,是天一觀的道長(zhǎng)冤竹。 經(jīng)常有香客問(wèn)我,道長(zhǎng)茬射,這世上最難降的妖魔是什么鹦蠕? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮在抛,結(jié)果婚禮上钟病,老公的妹妹穿的比我還像新娘。我一直安慰自己刚梭,他們只是感情好肠阱,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著朴读,像睡著了一般屹徘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上衅金,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天噪伊,我揣著相機(jī)與錄音,去河邊找鬼氮唯。 笑死鉴吹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的惩琉。 我是一名探鬼主播豆励,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼琳水!你這毒婦竟也來(lái)了肆糕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤在孝,失蹤者是張志新(化名)和其女友劉穎诚啃,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體私沮,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡始赎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片造垛。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡魔招,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出五辽,到底是詐尸還是另有隱情办斑,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布杆逗,位于F島的核電站乡翅,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一南缓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧淮蜈,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至争舞,卻和暖如春凛忿,著一層夾襖步出監(jiān)牢的瞬間澈灼,已是汗流浹背竞川。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叁熔,地道東北人委乌。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像荣回,于是被迫代替她去往敵國(guó)和親遭贸。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容