Spark運行時消息通信
這里主要說明一下哄啄,當你launch一個Application之后,啟動Main方法時稻励,初始化SparkContext到注冊Application,注冊Executor以及TaskSchedulerImpl分配完Task之后交由Executor來進行執(zhí)行的這個過程绞愚。這里我都是以Standalone形式就行閱讀代碼的,其他的如Yarn, Mesos, Local需看對應(yīng)的代碼部分秧耗。其時序圖如下:
流程
(1)執(zhí)行應(yīng)用程序首先執(zhí)行Main方法,啟動SparkContext, 在SparkContext初始化中會先實例化StandaloneScheduleBackend對象
StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener,在該對象啟動start過程中會繼承DriverEndpoint和創(chuàng)建AppClient的ClientEndpoint(實際上是StandaloneAppClient)的兩個終端點舶治。SparkContext初始化會先new TaskSchedulerImpl分井,其會調(diào)用backend.start(),實際上 就是調(diào)用了CoarseGrainedSchedulerBackend的start方法,再次方法中:
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
可以知道創(chuàng)建了Driver的終端點并進行了注冊霉猛。而StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend繼承關(guān)系導(dǎo)致了StandaloneSchedulerBackend會繼承driverEndpoint尺锚。
后面執(zhí)行了
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
創(chuàng)建了StandaloneAppClient
(2)在StandaloneAppClient通過tryResisterAllMasters來實現(xiàn)Application向Master的注冊
(3)當Master收到注冊請求之后進行處理, 注冊完畢之后會發(fā)送注冊成功消息給StandaloneApplClient, 然后調(diào)用startExecutorsOnWorkers方法運行應(yīng)用惜浅。
(4)Executor注冊過程
a)調(diào)用startExecutorsOnWorkers會分配資源來運行應(yīng)用程序瘫辩, 調(diào)用allcateWorkerResourceToExecutors實現(xiàn)在Worker中啟動Executor,allcateWorkerResourceToExecutors里面有個lanchExecutor方法坛悉,這里面會調(diào)用send(LaunchTask)給Worker伐厌, Worker收到后會實例化ExecutorRunner對象,在ExecutorRunner創(chuàng)建進程生成器ProcessBuilder裸影,然后此生成器根據(jù)ApplicationInfo中的command創(chuàng)建CoarseGrainedExecutorBackend對象挣轨,也就是Executor運行的容器, 最后Worker向Master發(fā)送ExecutorStateChanged通知Executor容器創(chuàng)建完畢轩猩,
b)進程生成器創(chuàng)建CoarseGrainedExecutorBackend對象時卷扮,調(diào)用了start方法荡澎,其半生對象會注冊Executor終端點,會觸發(fā)onStart方法晤锹,會發(fā)送注冊Executor消息RegisterExecutor到Driverpoint,如果注冊成功Driverpoint會返回RegisteredExecutor消息給ExecutorEndppoint摩幔。當ExecutorEndppoint實際上是CoarseGrainedExecutorBackend收到注冊成功, 則會創(chuàng)建Executor對象鞭铆。
c)DriverEndpoint會創(chuàng)建一個守護線程或衡,監(jiān)聽是否有taskSets過來
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
這時候會去調(diào)用makeOffers()
d)makeoffers會調(diào)用launchTasks
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
進而轉(zhuǎn)向
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
進而CoarseGrainedExecutorBackend終端點(ExecutorEndpoint)接收到LaunchTask
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
launchTask會初始化TaskRunner(它實際上是個Runnable對象),然后通過threadPool將其加入線程池執(zhí)行衔彻。
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
TaskRunner的run方法體內(nèi)就會執(zhí)行Task薇宠, 當執(zhí)行完畢時會向Driver匯報此Task在Executor上執(zhí)行完畢了。
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
------------------------------------------------------------------------------------
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}