本文主要講解一下spark2.0版本Spark-StandAlone模式下executor的分配過程和分配機(jī)制绍载。
跟蹤這一塊的源代碼應(yīng)該從SparkContext類開始氢伟。當(dāng)用戶new SparkContext時(shí)资溃,會執(zhí)行該類中定義在class body中的代碼。這部分代碼會執(zhí)行spark作業(yè)初始化之類的操作勃黍,比如校驗(yàn)參數(shù)奠支、初始化spark history server、初始化blockManager等等贵白。我們需要關(guān)注的是他構(gòu)建TaskScheduler這一部分的代碼率拒。這也是executor初始化的切入點(diǎn),如下圖:
點(diǎn)開createTaskScheduler方法可以看到禁荒,他基于spark master的類型構(gòu)建了SchedulerBackend俏橘。而TaskScheduler的類型基本上都是TaskSchedulerImpl。
匹配spark master類型的正則表達(dá)式如下圈浇,感興趣的可以了解一下:
然后我們一路ctrl + alt +左箭頭返回SparkContext類寥掐,看到_taskScheduler.start()靴寂,點(diǎn)進(jìn)去。然后看到backend.start()點(diǎn)進(jìn)去召耘。其中backend對象就是之前在createTaskScheduler方法中創(chuàng)建的StandaloneSchedulerBackend百炬。
下圖中command對象封裝的是在worker端啟動(dòng)executor的命令。而initialExecutorLimit是spark-standalone模式提供的dynamic allocation機(jī)制污它,他可以在executor閑置一段時(shí)間后就將其移除剖踊。對應(yīng)參數(shù)spark.dynamicAllocation.enabled,默認(rèn)為false衫贬。
然后看一下client.start()方法德澈,點(diǎn)進(jìn)去。然后看到new ClientEndpoint固惯,再點(diǎn)進(jìn)去梆造。
然后會調(diào)用到這個(gè)類中的onStart方法,有人會問為什么會調(diào)到這個(gè)方法呢葬毫,這里只是將其new出來了镇辉,并沒有執(zhí)行任何操作啊L瘛忽肛?答案就在前邊的rpcEnv.setupEndpoint方法。在該方法中通過回調(diào)機(jī)制會重新調(diào)用到onStart方法烂斋。點(diǎn)進(jìn)去屹逛,跟到NettyRpcEnv的setupEndpoint方法
然后繼續(xù)跟蹤到dispatcher.registerRpcEndpoint方法,
然后點(diǎn)進(jìn)new EndpointData方法汛骂,然后跟蹤到new Inbox罕模。Inbox在通信機(jī)制中起到一個(gè)消息存取的作用。
在Inbox中的class body中有如下一段
然后看到OnStart event的處理
?
點(diǎn)開ClientEndpoint后就會看到我們接下來要執(zhí)行到的onStart()方法了香缺。
然后再看這個(gè)類中的onStart()方法,registerWithMaster方法點(diǎn)進(jìn)去歇僧,然后再看到tryRegisterAllMasters方法點(diǎn)進(jìn)去图张。在這個(gè)方法中會看到構(gòu)建了一個(gè)與master通信的RPC終端,并發(fā)送了RegisterApplication事件诈悍,
在發(fā)送過程中祸轮,會將這條消息序列化,如下:
在整個(gè)spark core中搜索一下這個(gè)case class侥钳,然后在Master的receive方法中找到了對這類消息的處理邏輯如下:
在這部分代碼中createApplication方法會根據(jù)當(dāng)前時(shí)間創(chuàng)建appId适袜,appId格式為:
app-yyyyMMddHHmmss-nextAppNumber
nextAppNumber是隨著app的提交而遞增的。
除此之外還會構(gòu)建ApplicationInfo對象舷夺,需要注意的是該對象的desc.command中存放了在worker端啟動(dòng)Executor的命令苦酱。
然后看一下scheduler()方法售貌。該方法中實(shí)現(xiàn)了啟動(dòng)driver的邏輯,以及啟動(dòng)executor的入口方法疫萤。該部分代碼的邏輯請參考圖中的注釋颂跨。
然后看一下startExecutorsOnWorkers方法,該方法對多個(gè)app采用FIFO策略分配executor扯饶。首先選出內(nèi)存和cpu核數(shù)滿足條件的worker恒削,作為候選worker。然后按照scheduleExecutorsOnWorkers方法實(shí)現(xiàn)的策略分配executor尾序。然后通過allocateWorkerResourceToExecutors方法在指定的worker上通過事件啟動(dòng)該executor钓丰。方法講解參考備注。
?
接下來我們重點(diǎn)的看一下scheduleExecutorsOnWorkers方法每币。該方法實(shí)現(xiàn)了在worker集群中分配executor的策略携丁。
首先看一下局部方法canLaunchExecutor,該方法用來判斷指定序號的worker能否啟動(dòng)executor脯爪。判斷的兩個(gè)主要因素是內(nèi)存和cpu core數(shù)是否滿足單個(gè)executor的最小要求则北,除此之外還會考慮app要求的總資源數(shù),以及單個(gè)worker上能否啟動(dòng)多個(gè)executor的配置痕慢。具體的實(shí)現(xiàn)邏輯請參考截圖中的代碼注釋尚揣。
然后看一下scheduleExecutorsOnWorkers方法中的其他代碼:
這部分代碼是用來實(shí)現(xiàn)executor分配機(jī)制的。這段代碼比較清晰掖举,master會逐個(gè)遍歷當(dāng)前可用的worker快骗,如果該worker可用,直接為其分配一個(gè)executor基數(shù)的core塔次,然后會讀取一個(gè)系統(tǒng)參數(shù)spark.deploy.spreadOut方篮。當(dāng)該參數(shù)配置為false,master會將該worker上的core一直分配給這個(gè)app励负,直到當(dāng)前worker沒有足夠資源藕溅,或者app的要求已經(jīng)滿足。如果配置為true继榆,則每個(gè)worker在分配完一次資源后巾表,會跳轉(zhuǎn)到下一個(gè)worker繼續(xù)再分配,直到下一次對整個(gè)workers集群的遍歷重新開始略吨。這樣做的意義是盡可能的將executor分配到更多的worker上去執(zhí)行集币,有利于計(jì)算時(shí)的本地化計(jì)算,否則在計(jì)算時(shí)計(jì)算所需的數(shù)據(jù)不在當(dāng)前節(jié)點(diǎn)翠忠,就需要占用網(wǎng)絡(luò)資源拉取數(shù)據(jù)鞠苟。系統(tǒng)默認(rèn)配置為true。
我在閱讀這段代碼時(shí)有一個(gè)疑問。Executor的分配結(jié)果不僅是core当娱,還包含了mem和某個(gè)worker上啟動(dòng)executor的個(gè)數(shù)吃既。而方法的返回值只有某個(gè)worker上分配的core數(shù),那么如何判斷executor的個(gè)數(shù)呢趾访。對于不了解計(jì)算邏輯的人會認(rèn)為oneExecutorPerWorker模式下一個(gè)executor可能會被分配了minCorePerExecutor+個(gè)core态秧,此時(shí)如果單純通過worker上實(shí)際分配的core個(gè)數(shù)除以minCorePerExecutor就無法正確計(jì)算出executor個(gè)數(shù)了。這時(shí)就會想為什么不在方法返回值中帶有executor個(gè)數(shù)呢扼鞋?
繼續(xù)往下看就會找到答案申鱼,在實(shí)際啟動(dòng)executor時(shí),是根據(jù)用戶是否配置了coresPerExecutor來決定executor個(gè)數(shù)的云头,在沒有配置(也就是在OneExecutorPerWorker模式下)的情況下executor個(gè)數(shù)是固定為1的捐友,其他情況下是用已分配core數(shù)/ coresPerExecutor計(jì)算的。
然后點(diǎn)進(jìn)launchExecutor方法看一下溃槐,在master端會向worker端發(fā)送啟動(dòng)executor的命令LaunchExecutor匣砖,命令包含在exec.application.desc的command中。創(chuàng)建完成后會向driver端發(fā)送ExecutorAdded event昏滴,driver接收到后會打印日志猴鲫。
然后worker端在收到LaunchExecutor命令后,會實(shí)例化一個(gè)ExecutorRunner谣殊,然后調(diào)用其start方法拂共,在該方法中啟動(dòng)了一個(gè)workerThread線程,其run方法的實(shí)現(xiàn)邏輯為方法fetchAndRunExecutor姻几。這個(gè)方法就比較直觀了宜狐,取出創(chuàng)建Executor的命令appDesc.command封裝成ProcessBuilder類,用它來執(zhí)行啟動(dòng)Executor的命令蛇捌。
沒錯(cuò)抚恒,啟動(dòng)的命令就是前文中提到的command對象: