Flink1.10基于工廠模式的任務(wù)提交與SPI機制

本文僅為筆者平日學(xué)習(xí)記錄之用彤敛,侵刪
原文:https://mp.weixin.qq.com/s/t9Z2d1Pay4EiJBIU1dNrsw

Flink任務(wù)執(zhí)行模式包含了yarn-session鳖宾、standalone称鳞、per-job、local, 在1.10中又增加k8s的執(zhí)行模式蹲姐,那么在任務(wù)提交過程中如何根據(jù)不同的執(zhí)行模式進行任務(wù)提交呢扛芽?主要通過兩個接口來實現(xiàn):PipelineExecutorFactory 與PipelineExecutor。PipelineExecutorFactory用于在不同模式下創(chuàng)建不同的PipelineExecutor, 用于提交任務(wù)帆精,PipelineExecutorFactory表示的一個創(chuàng)建執(zhí)行器工廠接口,PipelineExecutor 表示一個執(zhí)行器接口隧魄,正如你所想這里使用的就是經(jīng)典的工廠設(shè)計模式卓练,在任務(wù)提交過程中會根據(jù)不同的提交模式, 使用不同的PipelineExecutorFactory創(chuàng)建不同的PipelineExecutor购啄。

public interface PipelineExecutorFactory {
   /**
    * Returns the name of the executor that this factory creates.
    */
   String getName();
   /**
      根據(jù)configuration判斷是否滿足當(dāng)前的factory
    */
   boolean isCompatibleWith(final Configuration configuration);
   /**
    * 獲取對應(yīng)模式下的executor
    */
   PipelineExecutor getExecutor(final Configuration configuration);
}

PipelineExecutorFactory幾個實現(xiàn)分別為:

  1. LocalExecutorFactory(local)
  2. RemoteExecutorFactory(standalone)
  3. YarnJobClusterExecutorFactory(per-job)
  4. YarnSessionClusterExecutorFactory(yarn-session)
public interface PipelineExecutor {

   /**
    * 執(zhí)行任務(wù)
    */
   CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
}

PipelineExecutor對應(yīng)實現(xiàn):

  1. RemoteExecutor(standalone)

  2. LocalExecutor(local)

  3. YarnJobClusterExecutor(per-job)

  4. YarnSessionClusterExecutor(yarn-session)

那么具體是如何選擇factory呢襟企?由PipelineExecutorServiceLoader接口來完成,其只有一個實現(xiàn)類DefaultExecutorServiceLoader狮含, 透過命名你可能會才想到這里面用到了ServiceLoader顽悼,你的猜想是正確的,它就是通過SPI機制去加載flink所提供的不同factory辉川,在META-INF.services 下可以找到其對應(yīng)的配置:

DefaultExecutorServiceLoader.java部分源碼

//SPI機制
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);

//獲取對應(yīng)的factory
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
   checkNotNull(configuration);

   final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
   final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
   while (factories.hasNext()) {
      try {
         final PipelineExecutorFactory factory = factories.next();
         //判斷標(biāo)準(zhǔn) 根據(jù)任務(wù)啟動配置
         if (factory != null && factory.isCompatibleWith(configuration)) {
            compatibleFactories.add(factory);
         }
      } catch (Throwable e) {
         if (e.getCause() instanceof NoClassDefFoundError) {
            LOG.info("Could not load factory due to missing dependencies.");
         } else {
            throw e;
         }
      }
   }
   //只能有一個factory符合要求
   if (compatibleFactories.size() > 1) {
      final String configStr =
            configuration.toMap().entrySet().stream()
                  .map(e -> e.getKey() + "=" + e.getValue())
                  .collect(Collectors.joining("\n"));

      throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
   }
   return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
}

ServiceLoader.load(PipelineExecutorFactory.class) 會從類路徑的META-INF.services下找到PipelineExecutorFactory的全路徑文件拴测,然后實例化出所有的factory集索,通過PipelineExecutorFactory.isCompatibleWith找到匹配的factory妆距。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市忌穿,隨后出現(xiàn)的幾起案子掠剑,更是在濱河造成了極大的恐慌朴译,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件坟乾,死亡現(xiàn)場離奇詭異甚侣,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門窒盐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人葡粒,你說我怎么就攤上這事∷越唬” “怎么了伯铣?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長轮纫。 經(jīng)常有香客問我腔寡,道長,這世上最難降的妖魔是什么掌唾? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任放前,我火速辦了婚禮,結(jié)果婚禮上糯彬,老公的妹妹穿的比我還像新娘凭语。我一直安慰自己,他們只是感情好撩扒,可當(dāng)我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布似扔。 她就那樣靜靜地躺著,像睡著了一般搓谆。 火紅的嫁衣襯著肌膚如雪炒辉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天泉手,我揣著相機與錄音黔寇,去河邊找鬼。 笑死斩萌,一個胖子當(dāng)著我的面吹牛缝裤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颊郎,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼憋飞,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了姆吭?” 一聲冷哼從身側(cè)響起榛做,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猾编,沒想到半個月后瘤睹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡答倡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了驴党。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瘪撇。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出倔既,到底是詐尸還是另有隱情恕曲,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布渤涌,位于F島的核電站佩谣,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏实蓬。R本人自食惡果不足惜茸俭,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望安皱。 院中可真熱鬧调鬓,春花似錦、人聲如沸酌伊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽居砖。三九已至虹脯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間奏候,已是汗流浹背归形。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留鼻由,地道東北人暇榴。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像蕉世,于是被迫代替她去往敵國和親蔼紧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,446評論 2 348