Spark篇運行時消息通信

Spark運行時消息通信

這里主要說明一下哄啄,當你launch一個Application之后,啟動Main方法時稻励,初始化SparkContext到注冊Application,注冊Executor以及TaskSchedulerImpl分配完Task之后交由Executor來進行執(zhí)行的這個過程绞愚。這里我都是以Standalone形式就行閱讀代碼的,其他的如Yarn, Mesos, Local需看對應(yīng)的代碼部分秧耗。其時序圖如下:


消息通信.png

流程

(1)執(zhí)行應(yīng)用程序首先執(zhí)行Main方法,啟動SparkContext, 在SparkContext初始化中會先實例化StandaloneScheduleBackend對象
StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener,在該對象啟動start過程中會繼承DriverEndpoint和創(chuàng)建AppClient的ClientEndpoint(實際上是StandaloneAppClient)的兩個終端點舶治。SparkContext初始化會先new TaskSchedulerImpl分井,其會調(diào)用backend.start(),實際上 就是調(diào)用了CoarseGrainedSchedulerBackend的start方法,再次方法中:

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }

  protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
  }

可以知道創(chuàng)建了Driver的終端點并進行了注冊霉猛。而StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend繼承關(guān)系導(dǎo)致了StandaloneSchedulerBackend會繼承driverEndpoint尺锚。
后面執(zhí)行了

client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

創(chuàng)建了StandaloneAppClient

(2)在StandaloneAppClient通過tryResisterAllMasters來實現(xiàn)Application向Master的注冊

(3)當Master收到注冊請求之后進行處理, 注冊完畢之后會發(fā)送注冊成功消息給StandaloneApplClient, 然后調(diào)用startExecutorsOnWorkers方法運行應(yīng)用惜浅。
(4)Executor注冊過程
a)調(diào)用startExecutorsOnWorkers會分配資源來運行應(yīng)用程序瘫辩, 調(diào)用allcateWorkerResourceToExecutors實現(xiàn)在Worker中啟動Executor,allcateWorkerResourceToExecutors里面有個lanchExecutor方法坛悉,這里面會調(diào)用send(LaunchTask)給Worker伐厌, Worker收到后會實例化ExecutorRunner對象,在ExecutorRunner創(chuàng)建進程生成器ProcessBuilder裸影,然后此生成器根據(jù)ApplicationInfo中的command創(chuàng)建CoarseGrainedExecutorBackend對象挣轨,也就是Executor運行的容器, 最后Worker向Master發(fā)送ExecutorStateChanged通知Executor容器創(chuàng)建完畢轩猩,

b)進程生成器創(chuàng)建CoarseGrainedExecutorBackend對象時卷扮,調(diào)用了start方法荡澎,其半生對象會注冊Executor終端點,會觸發(fā)onStart方法晤锹,會發(fā)送注冊Executor消息RegisterExecutor到Driverpoint,如果注冊成功Driverpoint會返回RegisteredExecutor消息給ExecutorEndppoint摩幔。當ExecutorEndppoint實際上是CoarseGrainedExecutorBackend收到注冊成功, 則會創(chuàng)建Executor對象鞭铆。

c)DriverEndpoint會創(chuàng)建一個守護線程或衡,監(jiān)聽是否有taskSets過來

private val reviveThread =
      ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

    override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

這時候會去調(diào)用makeOffers()

d)makeoffers會調(diào)用launchTasks

// Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

進而轉(zhuǎn)向

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

進而CoarseGrainedExecutorBackend終端點(ExecutorEndpoint)接收到LaunchTask

case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

launchTask會初始化TaskRunner(它實際上是個Runnable對象),然后通過threadPool將其加入線程池執(zhí)行衔彻。

def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

TaskRunner的run方法體內(nèi)就會執(zhí)行Task薇宠, 當執(zhí)行完畢時會向Driver匯報此Task在Executor上執(zhí)行完畢了。

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

------------------------------------------------------------------------------------
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末艰额,一起剝皮案震驚了整個濱河市澄港,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柄沮,老刑警劉巖回梧,帶你破解...
    沈念sama閱讀 222,865評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異祖搓,居然都是意外死亡狱意,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,296評論 3 399
  • 文/潘曉璐 我一進店門拯欧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來详囤,“玉大人,你說我怎么就攤上這事镐作〔亟悖” “怎么了?”我有些...
    開封第一講書人閱讀 169,631評論 0 364
  • 文/不壞的土叔 我叫張陵该贾,是天一觀的道長羔杨。 經(jīng)常有香客問我,道長杨蛋,這世上最難降的妖魔是什么兜材? 我笑而不...
    開封第一講書人閱讀 60,199評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮逞力,結(jié)果婚禮上曙寡,老公的妹妹穿的比我還像新娘。我一直安慰自己寇荧,他們只是感情好卵皂,可當我...
    茶點故事閱讀 69,196評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著砚亭,像睡著了一般灯变。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上捅膘,一...
    開封第一講書人閱讀 52,793評論 1 314
  • 那天添祸,我揣著相機與錄音,去河邊找鬼寻仗。 笑死刃泌,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的署尤。 我是一名探鬼主播耙替,決...
    沈念sama閱讀 41,221評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼曹体!你這毒婦竟也來了俗扇?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,174評論 0 277
  • 序言:老撾萬榮一對情侶失蹤箕别,失蹤者是張志新(化名)和其女友劉穎铜幽,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體串稀,經(jīng)...
    沈念sama閱讀 46,699評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡除抛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,770評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了母截。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片到忽。...
    茶點故事閱讀 40,918評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖清寇,靈堂內(nèi)的尸體忽然破棺而出喘漏,到底是詐尸還是另有隱情,我是刑警寧澤颗管,帶...
    沈念sama閱讀 36,573評論 5 351
  • 正文 年R本政府宣布陷遮,位于F島的核電站,受9級特大地震影響垦江,放射性物質(zhì)發(fā)生泄漏帽馋。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,255評論 3 336
  • 文/蒙蒙 一比吭、第九天 我趴在偏房一處隱蔽的房頂上張望绽族。 院中可真熱鬧,春花似錦衩藤、人聲如沸吧慢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,749評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽检诗。三九已至匈仗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間逢慌,已是汗流浹背悠轩。 一陣腳步聲響...
    開封第一講書人閱讀 33,862評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留攻泼,地道東北人火架。 一個月前我還...
    沈念sama閱讀 49,364評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像忙菠,于是被迫代替她去往敵國和親何鸡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,926評論 2 361

推薦閱讀更多精彩內(nèi)容