Flink源碼(一):Actor系統(tǒng)創(chuàng)建流程1

利用碎片時(shí)間閱讀了一下Flink的源碼侦鹏,選擇Flink主要出發(fā)點(diǎn)還是了解一個(gè)穩(wěn)定的分布式計(jì)算系統(tǒng)的實(shí)現(xiàn)适袜,另外也是由于Flink相對(duì)更加成熟的Spark有其獨(dú)到的優(yōu)勢(shì)苞氮,相信其在下一代分布式計(jì)算中也會(huì)占有重要的地位盼产。Flink的主要概念可以在官網(wǎng)了解

Flink系統(tǒng)作業(yè)的提交和調(diào)度都是利用AKKA的Actor通信,因此也是由此作為切入點(diǎn)礼华,首先理清整個(gè)系統(tǒng)的啟動(dòng)以及作業(yè)提交的流程和數(shù)據(jù)流咐鹤。

flink basic concepts

圖中可以看到,一個(gè)完整的Flink系統(tǒng)由三個(gè)Actor System構(gòu)成卓嫂,包括Client慷暂、JobManager(JM)以及TaskManager(TM)。下面對(duì)三個(gè)Actor系統(tǒng)的創(chuàng)建進(jìn)行分析。

JM ActorSystem

JM是Flink系統(tǒng)的調(diào)度中心行瑞,這部分除了會(huì)看到JM ActorSystem的創(chuàng)建奸腺,還會(huì)了解到整個(gè)Flink系統(tǒng)的各個(gè)模塊的初始化與運(yùn)行。

先找程序入口血久,從啟動(dòng)腳本可以追溯到突照,每一個(gè)啟動(dòng)腳本最終都會(huì)運(yùn)行flink_deamon.sh 腳本,查看該腳本:

...
...

case $DAEMON in
    (jobmanager)
        CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
    ;;

    (taskmanager)
        CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac


$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
...
...

由此找到JM的程序入口:org.apache.flink.runtime.jobmanager.JobManager.scala氧吐,代碼中可以找到main函數(shù)讹蘑,調(diào)用runJobManager方法:

def runJobManager(
      configuration: Configuration,
      executionMode: JobManagerMode,
      listeningAddress: String,
      listeningPort: Int)
    : Unit = {


    //startActorSystemAndJobManagerActors返回jobManagerSystem
    val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
      configuration,
      executionMode,
      listeningAddress,
      listeningPort,
      classOf[JobManager],
      classOf[MemoryArchivist],
      Option(classOf[StandaloneResourceManager])
    )

    // 阻塞,直到系統(tǒng)退出
    jobManagerSystem.awaitTermination()

    webMonitorOption.foreach{
      webMonitor =>
        try {
          webMonitor.stop()
        } catch {
          case t: Throwable =>
            LOG.warn("Could not properly stop the web monitor.", t)
        }
    }
  }

runJobManager方法邏輯比較簡(jiǎn)單筑舅,調(diào)用startActorSystemAndJobManagerActors方法中創(chuàng)建ActorSystem和JMActor座慰,然后阻塞等待系統(tǒng)退出,看具體的JM創(chuàng)建過(guò)程:

def startActorSystemAndJobManagerActors(
      configuration: Configuration,
      executionMode: JobManagerMode,
      listeningAddress: String,
      listeningPort: Int,
      jobManagerClass: Class[_ <: JobManager],
      archiveClass: Class[_ <: MemoryArchivist],
      resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
    : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {

    LOG.info("Starting JobManager")

    // Bring up the job manager actor system first, bind it to the given address.
    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
    LOG.info(s"Starting JobManager actor system at $hostPortUrl")

    val jobManagerSystem = try {
      val akkaConfig = AkkaUtils.getAkkaConfig(
        configuration,
        Some((listeningAddress, listeningPort))
      )
      if (LOG.isDebugEnabled) {
        LOG.debug("Using akka configuration\n " + akkaConfig)
      }
      
      AkkaUtils.createActorSystem(akkaConfig)//創(chuàng)建ActorSystem全局僅有一個(gè)
    }
    catch {
       ...
       ...
    }

    ...
    ...//此處省略webMonitor的創(chuàng)建
    
    try {
      // bring up the job manager actor
      LOG.info("Starting JobManager actor")
      val (jobManager, archive) = startJobManagerActors(
        configuration,
        jobManagerSystem,
        jobManagerClass,
        archiveClass)

      // start a process reaper that watches the JobManager. If the JobManager actor dies,
      // the process reaper will kill the JVM process (to ensure easy failure detection)
      LOG.debug("Starting JobManager process reaper")
      jobManagerSystem.actorOf(
        Props(
          classOf[ProcessReaper],
          jobManager,
          LOG.logger,
          RUNTIME_FAILURE_RETURN_CODE),
        "JobManager_Process_Reaper")

      // bring up a local task manager, if needed
      if (executionMode == JobManagerMode.LOCAL) {
        LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")

        val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
          configuration,
          ResourceID.generate(),
          jobManagerSystem,
          listeningAddress,
          Some(TaskManager.TASK_MANAGER_NAME),
          None,
          localTaskManagerCommunication = true,
          classOf[TaskManager])

        LOG.debug("Starting TaskManager process reaper")
        jobManagerSystem.actorOf(
          Props(
            classOf[ProcessReaper],
            taskManagerActor,
            LOG.logger,
            RUNTIME_FAILURE_RETURN_CODE),
          "TaskManager_Process_Reaper")
      }
      ...
      ...

      (jobManagerSystem, jobManager, archive, webMonitor, resourceManager)
    }
    ...
    ...
  }

這里可以看到startActorSystemAndJobManagerActors方法中利用AkkaUtils和flinkConfig創(chuàng)建了全局的ActorSystem翠拣,AkkaUtils也是對(duì)Actor創(chuàng)建的簡(jiǎn)單封裝版仔,這里不再贅述。緊接著利用剛創(chuàng)建的jobManagerSystem和jobManager的類名:jobManagerClass創(chuàng)建jobManager误墓。除了jobManager以外蛮粮,該方法中還創(chuàng)建了Flink的其他重要模塊,從返回值中可以清楚看到谜慌。另外本地模式啟動(dòng)方式下然想,還會(huì)創(chuàng)建本地的啟動(dòng)本地的taskManagerActor。繼續(xù)深入到startJobManagerActors欣范,該方法接收jobManagerSystem等參數(shù)变泄,創(chuàng)建jobManager和archive并返回:

def startJobManagerActors(
      configuration: Configuration,
      actorSystem: ActorSystem,
      jobManagerActorName: Option[String],
      archiveActorName: Option[String],
      jobManagerClass: Class[_ <: JobManager],
      archiveClass: Class[_ <: MemoryArchivist])
    : (ActorRef, ActorRef) = {

    val (executorService: ExecutorService,
    instanceManager,
    scheduler,
    libraryCacheManager,
    restartStrategy,
    timeout,
    archiveCount,
    leaderElectionService,
    submittedJobGraphs,
    checkpointRecoveryFactory,
    savepointStore,
    jobRecoveryTimeout,
    metricsRegistry) = createJobManagerComponents(
      configuration,
      None)

    val archiveProps = Props(archiveClass, archiveCount)

    // start the archiver with the given name, or without (avoid name conflicts)
    val archive: ActorRef = archiveActorName match {
      case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
      case None => actorSystem.actorOf(archiveProps)
    }

    val jobManagerProps = Props(
      jobManagerClass,
      configuration,
      executorService,
      instanceManager,
      scheduler,
      libraryCacheManager,
      archive,
      restartStrategy,
      timeout,
      leaderElectionService,
      submittedJobGraphs,
      checkpointRecoveryFactory,
      savepointStore,
      jobRecoveryTimeout,
      metricsRegistry)

    val jobManager: ActorRef = jobManagerActorName match {
      case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
      case None => actorSystem.actorOf(jobManagerProps)
    }

    (jobManager, archive)
 }

這里首先createJobManagerComponents方法創(chuàng)建了jobManager的重要組成模塊,包括了存儲(chǔ)熙卡、備份等策略的組件實(shí)現(xiàn)杖刷,還包括以后會(huì)遇到的scheduler励饵、submittedJobGraphs驳癌,分別負(fù)責(zé)job的調(diào)度和作業(yè)的提交,這里暫不深入役听。
jobManagerActor已經(jīng)成功創(chuàng)建颓鲜,但是Scala中一個(gè)Actor會(huì)繼承Actor類,并重寫receive方法接受信息并處理典予,由此可以發(fā)現(xiàn).JobManager類繼承FlinkActor甜滨,再看FlinkActor:

trait FlinkActor extends Actor {
  val log: Logger

  override def receive: Receive = handleMessage

  /** Handle incoming messages
    * @return
    */
  def handleMessage: Receive 

  def decorateMessage(message: Any): Any = {
    message
  }
}

可以看到receive方法被重寫,并賦值為handleMessage瘤袖,所以處理消息的操作被放在FlinkActor子類Jobmanager的handleMessage方法中:

override def handleMessage: Receive = {

    ...
    ...
    case SubmitJob(jobGraph, listeningBehaviour) =>
      val client = sender()

      val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
        jobGraph.getSessionTimeout)

      submitJob(jobGraph, jobInfo)

    ...
    ...

handleMessage方法中處理的消息很多衣摩,包括了諸如作業(yè)恢復(fù),leader決策捂敌,TM注冊(cè)艾扮,作業(yè)的提交既琴、恢復(fù)與取消,這里暫時(shí)只關(guān)注消息SubmitJob(jobGraph, listeningBehaviour)泡嘴,消息的定義很簡(jiǎn)單甫恩,不再追溯。而SubmitJob消息的主要獲取Client傳來(lái)的jobGraph以及l(fā)isteningBehaviour酌予。Flink的作業(yè)最后都會(huì)抽象為jobGraph交給JM處理磺箕。關(guān)于jobGraph的生成,會(huì)在后面的Job生成的過(guò)程中進(jìn)行分析抛虫。
JM對(duì)job的處理函數(shù)submitJob(jobGraph, jobInfo)松靡,參數(shù)jobInfo中包括了Client端的ActorRef,用以Job處理結(jié)果的返回建椰,該函數(shù)中實(shí)現(xiàn)了JM對(duì)作業(yè)的提交與處理的細(xì)節(jié)击困,為突出重點(diǎn),放在作業(yè)處理部分分析广凸。但從該方法的注釋來(lái)看:

 /**
   * Submits a job to the job manager. The job is registered at the libraryCacheManager which
   * creates the job's class loader. The job graph is appended to the corresponding execution
   * graph and the execution vertices are queued for scheduling.
   *
   * @param jobGraph representing the Flink job
   * @param jobInfo the job info
   * @param isRecovery Flag indicating whether this is a recovery or initial submission
   */

在該方法中將Job注冊(cè)到libraryCacheManager阅茶,并將Job執(zhí)行餓的DAG加入到調(diào)度隊(duì)列。

小結(jié)

這里僅僅就JM Actor的創(chuàng)建過(guò)程對(duì)flink的源碼進(jìn)行了分析谅海,主要了解到flink系統(tǒng)JM部分ActorSystem的組織方式脸哀,main函數(shù)最終創(chuàng)建JM 監(jiān)聽(tīng)客戶端的消息,并對(duì)作業(yè)進(jìn)行調(diào)度和Job容錯(cuò)處理扭吁,最終交由TaskManager進(jìn)行處理撞蜂。對(duì)于具體的調(diào)度和處理策略,JM和TM的通信會(huì)在以后進(jìn)行分析侥袜。接下來(lái)首先看Client端的邏輯蝌诡。

原創(chuàng)文章,原文到我的博客

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末枫吧,一起剝皮案震驚了整個(gè)濱河市浦旱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌九杂,老刑警劉巖颁湖,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異例隆,居然都是意外死亡甥捺,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門镀层,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)镰禾,“玉大人,你說(shuō)我怎么就攤上這事∥庹欤” “怎么了谷饿?”我有些...
    開(kāi)封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)妈倔。 經(jīng)常有香客問(wèn)我博投,道長(zhǎng),這世上最難降的妖魔是什么盯蝴? 我笑而不...
    開(kāi)封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任毅哗,我火速辦了婚禮,結(jié)果婚禮上捧挺,老公的妹妹穿的比我還像新娘虑绵。我一直安慰自己,他們只是感情好闽烙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布翅睛。 她就那樣靜靜地躺著,像睡著了一般黑竞。 火紅的嫁衣襯著肌膚如雪捕发。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天很魂,我揣著相機(jī)與錄音扎酷,去河邊找鬼。 笑死遏匆,一個(gè)胖子當(dāng)著我的面吹牛法挨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播幅聘,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼凡纳,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了帝蒿?” 一聲冷哼從身側(cè)響起荐糜,我...
    開(kāi)封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陵叽,沒(méi)想到半個(gè)月后狞尔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體丛版,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡巩掺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了页畦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片胖替。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出独令,到底是詐尸還是另有隱情端朵,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布燃箭,位于F島的核電站冲呢,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏招狸。R本人自食惡果不足惜敬拓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望裙戏。 院中可真熱鬧乘凸,春花似錦、人聲如沸累榜。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)壹罚。三九已至葛作,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猖凛,已是汗流浹背进鸠。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留形病,地道東北人客年。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像漠吻,于是被迫代替她去往敵國(guó)和親量瓜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容

  • 簡(jiǎn)單之美 | Apache Flink:特性试读、概念杠纵、組件棧、架構(gòu)及原理分析http://shiyanjun.cn/...
    葡萄喃喃囈語(yǔ)閱讀 7,389評(píng)論 0 27
  • 介紹 概述 Apache Flink是一個(gè)面向數(shù)據(jù)流處理和批量數(shù)據(jù)處理的可分布式的開(kāi)源計(jì)算框架钩骇,它基于同一個(gè)Fli...
    stephen_k閱讀 50,802評(píng)論 0 22
  • 說(shuō)后來(lái)也許有些遙遠(yuǎn)比藻。其實(shí)認(rèn)識(shí)你不過(guò)一周有余铝量。 只是相見(jiàn)恨晚。 當(dāng)然我從沒(méi)冒然的去問(wèn)過(guò)你银亲,是否也是如此慢叨,我怕你不回消...
    凌阿深閱讀 1,047評(píng)論 10 15
  • 作者 言嘉 四月的春正濃 樹樹櫻花熱烈地開(kāi)著 幾片飄飛的花瓣 完美地點(diǎn)綴了明媚的春 似少...
    言嘉閱讀 190評(píng)論 0 2