Flink源碼解析之JobManager啟動

JobManager職責

JobManager的職責主要是接收Flink作業(yè)匾委,調(diào)度Task破花,收集作業(yè)狀態(tài)和管理TaskManager。它包含一個Actor慰照,并且接收如下信息:

  • RegisterTaskManager: 它由想要注冊到JobManager的TaskManager發(fā)送剃盾。注冊成功會通過AcknowledgeRegistration消息進行Ack腺占。
  • SubmitJob: 由提交作業(yè)到系統(tǒng)的Client發(fā)送。提交的信息是JobGraph形式的作業(yè)描述信息痒谴。
  • CancelJob: 請求取消指定id的作業(yè)衰伯。成功會返回CancellationSuccess,否則返回CancellationFailure积蔚。
  • UpdateTaskExecutionState: 由TaskManager發(fā)送意鲸,用來更新執(zhí)行節(jié)點(ExecutionVertex)的狀態(tài)。成功則返回true尽爆,否則返回false怎顾。
  • RequestNextInputSplit: TaskManager上的Task請求下一個輸入split,成功則返回NextInputSplit漱贱,否則返回null槐雾。
  • JobStatusChanged: 它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化。這個消息由ExecutionGraph發(fā)送幅狮。

JobManager啟動過程

代碼在org.apache.flink.runtime.jobmanager.JobManager.scala文件中募强,入口是main方法,通過腳本啟動JobManager時彪笼,調(diào)用的就是main方法钻注。main方法是通過調(diào)用runJobManager方法來啟動JobManager的蚂且,下面我們主要看下runJobManager方法配猫。

def runJobManager(
                     configuration: Configuration,
                     executionMode: JobManagerMode,
                     listeningAddress: String,
                     listeningPort: Int)
  : Unit = {

    ....

    // 首先啟動JobManager的ActorSystem,因為如果端口號是0杏死,它決定了使用哪個端口泵肄,并會更新相應的配置捆交。
    val jobManagerSystem = startActorSystem(
      configuration,
      listeningAddress,
      listeningPort)

    // 創(chuàng)建高可靠的服務,比如通過ZooKeeper配置了多個JobManager
    val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
      configuration,
      ioExecutor,
      AddressResolution.NO_ADDRESS_RESOLUTION)
    
    ...
 
    //啟動JobManager的所有組件腐巢,包括library緩存品追,實例管理和調(diào)度器,最終啟動 JobManager Actor冯丙。
    val (_, _, webMonitorOption, _) = try {
      startJobManagerActors(
        jobManagerSystem,
        configuration,
        executionMode,
        listeningAddress,
        futureExecutor,
        ioExecutor,
        highAvailabilityServices,
        metricRegistry,
        classOf[JobManager],
        classOf[MemoryArchivist],
        Option(classOf[StandaloneResourceManager])
      )
    } catch {
      case t: Throwable =>
        futureExecutor.shutdownNow()
        ioExecutor.shutdownNow()

        throw t
    }

    // 阻塞肉瓦,直到系統(tǒng)退出
    jobManagerSystem.awaitTermination()

    webMonitorOption.foreach {
      webMonitor =>
        try {
          webMonitor.stop()
        } catch {
          case t: Throwable =>
            LOG.warn("Could not properly stop the web monitor.", t)
        }
    }

    try {
      highAvailabilityServices.close()
    } catch {
      case t: Throwable =>
        LOG.warn("Could not properly stop the high availability services.", t)
    }

    try {
      metricRegistry.shutdown()
    } catch {
      case t: Throwable =>
        LOG.warn("Could not properly shut down the metric registry.", t)
    }

    ExecutorUtils.gracefulShutdown(
      timeout.toMillis,
      TimeUnit.MILLISECONDS,
      futureExecutor,
      ioExecutor)
  }

JobManager高可用性

目前JobManager的高可用性模式分為兩種:

  • NONE:意味著沒有高可用性,只有一個JobManager節(jié)點胃惜。
  • ZooKeeper:通過ZooKeeper實現(xiàn)高可用性泞莉,多個JobManager節(jié)點組成一個集群,通過ZooKeeper選舉出master節(jié)點船殉,由master節(jié)點提供服務鲫趁,其它節(jié)點作為備份。

當使用NONE模式時利虫,只有一個JobManager節(jié)點提供服務挨厚,且JobManager不會保存提交的jar包信息,將Checkpoint和metadata信息保存在Java堆或者本地文件系統(tǒng)中糠惫,因此意味著沒有搞可用性疫剃。

而使用ZooKeeper模式時,有一個Master和多個Standby節(jié)點硼讽,當Master故障時慌申,Standby節(jié)點會通過選舉產(chǎn)生新的Master節(jié)點。這樣不會產(chǎn)生單點故障理郑,只要有新的Master生成蹄溉,程序可以繼續(xù)執(zhí)行。Standby JobManager和Master JobManager實例之間沒有明確區(qū)別您炉。 每個JobManager可以成為Master或Standby節(jié)點柒爵。

舉例,使用三個JobManager節(jié)點的情況下赚爵,進行以下設(shè)置:


使用NONE或ZooKeeper模式棉胀,通過如下配置進行設(shè)置:

  high-availability: none/zookeeper

高可用服務創(chuàng)建過程

public static HighAvailabilityServices createHighAvailabilityServices(
        Configuration configuration,
        Executor executor,
        AddressResolution addressResolution) throws Exception {

        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

        switch(highAvailabilityMode) {
            case NONE:     //NONE模式
                final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

                final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    JobMaster.JOB_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    ResourceManager.RESOURCE_MANAGER_NAME,
                    addressResolution,
                    configuration);
                final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                    hostnamePort.f0,
                    hostnamePort.f1,
                    Dispatcher.DISPATCHER_NAME,
                    addressResolution,
                    configuration);

                return new StandaloneHaServices(
                    resourceManagerRpcUrl,
                    dispatcherRpcUrl,
                    jobManagerRpcUrl);
            case ZOOKEEPER:     //ZOOKEEPER模式
                //存儲JobManager Metadata 數(shù)據(jù)的Service
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);

                //基于ZooKeeper的JobManager 高可用Service
                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(configuration),
                    executor,
                    configuration,
                    blobStoreService);
            default:
                throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市冀膝,隨后出現(xiàn)的幾起案子唁奢,更是在濱河造成了極大的恐慌,老刑警劉巖窝剖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件麻掸,死亡現(xiàn)場離奇詭異,居然都是意外死亡赐纱,警方通過查閱死者的電腦和手機脊奋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門熬北,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人诚隙,你說我怎么就攤上這事讶隐。” “怎么了久又?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵巫延,是天一觀的道長。 經(jīng)常有香客問我地消,道長烈评,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任犯建,我火速辦了婚禮讲冠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘适瓦。我一直安慰自己竿开,他們只是感情好,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布玻熙。 她就那樣靜靜地躺著否彩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嗦随。 梳的紋絲不亂的頭發(fā)上列荔,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機與錄音枚尼,去河邊找鬼贴浙。 笑死,一個胖子當著我的面吹牛署恍,可吹牛的內(nèi)容都是我干的崎溃。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼盯质,長吁一口氣:“原來是場噩夢啊……” “哼袁串!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起呼巷,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤囱修,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后王悍,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體破镰,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了啤咽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡渠脉,死狀恐怖宇整,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芋膘,我是刑警寧澤鳞青,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站为朋,受9級特大地震影響臂拓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜习寸,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一胶惰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霞溪,春花似錦孵滞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至殴蓬,卻和暖如春匿级,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背染厅。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工痘绎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肖粮。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓简逮,卻偏偏與公主長得像,于是被迫代替她去往敵國和親尿赚。 傳聞我的和親對象是個殘疾皇子散庶,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 簡單之美 | Apache Flink:特性、概念、組件棧轻腺、架構(gòu)及原理分析http://shiyanjun.cn/...
    葡萄喃喃囈語閱讀 7,360評論 0 27
  • Flink初體驗 安裝 官網(wǎng):http://flink.apache.org/downloads.html 可以看...
    it_zzy閱讀 29,788評論 0 10
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理乐疆,服務發(fā)現(xiàn),斷路器贬养,智...
    卡卡羅2017閱讀 134,600評論 18 139
  • 聽遠方參軍的兄弟講挤土,他們?nèi)ソM織海上實彈打靶,早上一大早就去安裝海上浮靶误算,可裝完回來仰美,警戒人員準備去開始實施警戒時,...
  • 說到碰瓷很多人的感覺可能是既熟悉又陌生儿礼,說他熟悉是因為我們身邊有無數(shù)人說過咖杂,電視廣播也是時有出現(xiàn),說他陌生是...
    嫩水無聲閱讀 275評論 0 0