Flink 作業(yè)生成②:StreamGraph -> JobGraph

由前文我們知道,StreamGraph 表示一個流任務的邏輯拓撲宵统,可以用一個 DAG 來表示(代碼實現(xiàn)上沒有一個 DAG 結(jié)構(gòu))姐扮,DAG 的頂點是 StreamNode,邊是 StreamEdge硫戈,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。本文我們主要介紹一個 StreamGraph 是如何轉(zhuǎn)換成一個 JobGraph下硕。

一丁逝、JobGraph 概述

  • JobGraph 將會在原來的基礎上做相應的優(yōu)化(主要是算子的 Chain 操作汁胆,Chain 在一起的算子將會在同一個 task 上運行,會極大減少 shuffle 的開銷)
  • JobGraph 用來由 JobClient 提交給 JobManager霜幼,是由頂點(JobVertex)沦泌、中間結(jié)果(IntermediateDataSet)和邊(JobEdge)組成的 DAG 圖
  • JobGraph 定義作業(yè)級別的配置,而每個頂點和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設置

為什么要有 StreamGraph 和 JobGraph 兩層的 Graph辛掠,最主要的原因是為兼容 batch process谢谦,Streaming process 最初產(chǎn)生的是 StreamGraph,而 batch process 產(chǎn)生的則是 OptimizedPlan萝衩,但是它們最后都會轉(zhuǎn)換為 JobGraph

1.1回挽、JobVertex

JobVertex 相當于是 JobGraph 的頂點,跟 StreamNode 的區(qū)別是猩谊,它是 Operator Chain 之后的頂點千劈,會包含多個 StreamNode。主要成員:

  • List<OperatorIDPair> operatorIDs:該 job 節(jié)點包含的所有 operator ids牌捷,以深度優(yōu)先方式存儲 ids
  • ArrayList<JobEdge> inputs:帶輸入數(shù)據(jù)的邊列表
  • ArrayList<IntermediateDataSet> results:job 節(jié)點計算出的中間結(jié)果

1.2墙牌、IntermediateDataSet

它是由一個 Operator(可能是 source,也可能是某個中間算子)產(chǎn)生的一個中間數(shù)據(jù)集暗甥。中間數(shù)據(jù)集可能會被其他 operators 讀取喜滨,物化或丟棄。主要成員:

  • JobVertex producer:該中間結(jié)果的生產(chǎn)者
  • List<JobEdge> consumers:該中間結(jié)果消費邊撤防,通過消費邊指向消費的節(jié)點
  • ResultPartitionType resultType:中間結(jié)果的分區(qū)類型
    • 流水線的(有界的或無界的):一旦產(chǎn)生數(shù)據(jù)就向下游發(fā)送虽风,可能是逐個發(fā)送的,有界或無界的記錄流寄月。
    • 阻塞:僅在生成完整結(jié)果時向下游發(fā)送數(shù)據(jù)辜膝。

1.3、JobEdge

它相當于是 JobGraph 中的邊(連接通道)漾肮,這個邊連接的是一個 IntermediateDataSet 跟一個要消費的 JobVertex厂抖。主要成員:

  • IntermediateDataSet sourc:邊的源
  • JobVertex target:邊的目標
  • DistributionPattern distributionPattern:決定了在上游節(jié)點(生產(chǎn)者)的子任務和下游節(jié)點(消費者)之間的連接模式
    • ALL_TO_ALL:每個生產(chǎn)子任務都連接到消費任務的每個子任務
    • POINTWISE:每個生產(chǎn)子任務都連接到使用任務的一個或多個子任務

二、Create Job Graph 主要流程

2.1克懊、核心步驟

2.2、setChaining

從 Source StreamNode 實例開始設置 task chain保檐,它將會遞歸地創(chuàng)建所有的 JobVertex 實例

這個方法首先從會遍歷這個 StreamGraph 的所有 source 節(jié)點崔梗,然后選擇從 source 節(jié)點開始執(zhí)行 createChain() 方法夜只,在具體的實現(xiàn)里蒜魄,主要邏輯如下

總結(jié)下這個流程:

  1. 從輸入節(jié)點開始场躯,判斷邊的輸出節(jié)點能否加入到該 chain
    • 如果可以,則繼續(xù)從輸出節(jié)點執(zhí)行擴展該 chain
    • 否則踢关,當前 chain 結(jié)束,以輸出節(jié)點為初始節(jié)點粘茄,遞歸創(chuàng)建新的 chain
  2. 如果當前節(jié)點為 chain 的首節(jié)點签舞,那么就創(chuàng)建一個 JobVertex,否則創(chuàng)建 StreamConfig柒瓣,記錄到 chainedConfigs(由于調(diào)用鏈上后面的節(jié)點先創(chuàng)建儒搭,因此創(chuàng)建首節(jié)點的 JobVertex 時,就可以使用 chainedConfigs 記錄的信息了)

其中 JobEdge 是通過下游 JobVertex 的 connectNewDataSetAsInput 方法來創(chuàng)建的芙贫,在創(chuàng)建 JobEdge 之前搂鲫,會先用上游 JobVertex 創(chuàng)建一個 IntermediateDataSet 實例,用來作為上游 JobVertex 的結(jié)果輸出磺平,然后作為 JobEdge 的輸入魂仍,構(gòu)建JobEdge實例,具體實現(xiàn)如下:

public JobEdge connectNewDataSetAsInput(
      JobVertex input,
      DistributionPattern distPattern,
      ResultPartitionType partitionType) {
   /** 創(chuàng)建輸入JobVertex的輸出數(shù)據(jù)集合 */
   IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
   /** 構(gòu)建 JobEdge 實例 */
   JobEdge edge = new JobEdge(dataSet, this, distPattern);
   /** 將 JobEdge 實例拣挪,作為當前 JobVertex 的輸入 */
   this.inputs.add(edge);
   /** 設置中間結(jié)果集合 dataSet 的消費者是上面創(chuàng)建的 JobEdge */
   dataSet.addConsumer(edge);
   return edge;
}

通過上述的構(gòu)建過程擦酌,就可以實現(xiàn)上下游 JobVertex 的連接,上游 JobVertex ——> 中間結(jié)果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex菠劝。其中:

  • IntermediateDataSet 和 JobEdge 是用來建立上下游 JobVertex 之間連接的配置
  • 一個 IntermediateDataSet 有一個 producer仑氛,可以有多個消費者 JobEdge
  • 一個 JobEdge 則有一個數(shù)據(jù)源 IntermediateDataSet,一個目標JobVertex
  • 一個 JobVertex 可以產(chǎn)生多個輸出 IntermediateDataSet闸英,也可以接受來自多個 JobEdge 的數(shù)據(jù)

2.3锯岖、算子 Chainable 的依據(jù)

isChainable() 的判斷依據(jù)如下:

return downStreamVertex.getInEdges().size() == 1 // 
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 對應的 slotSharingGroup 一樣
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允許 chain 操作
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允許跟后面的 chain 在一起
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 類型
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并發(fā)相等
            && streamGraph.isChainingEnabled(); // StreamGraph 允許 Chain 在一起

2.3.1、slotSharingGroup

一個 StreamNode 的 SlotSharingGroup 會按照下面這個邏輯來確定:

  1. 如果用戶指定了 SlotSharingGroup甫何,直接使用這個 SlotSharingGroup name出吹;
  2. 如果所有的 input 都是同一個 group name,使用這個即可辙喂;
  3. 否則使用 default group捶牢;

2.3.2、edge.getPartitioner()

StreamPartitioner 的實現(xiàn)

用戶可以在自己的代碼中調(diào)用 DataStream API (比如:broadcast()巍耗、shuffle() 等)配置相應的 StreamPartitioner秋麸,如果這個沒有指定 StreamPartitioner 的話,則會走下面的邏輯創(chuàng)建默認的 StreamPartitioner:

//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的話炬太,會為其選擇 forward(并發(fā)設置相同時) 或 rebalance(并發(fā)設置不同時)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
    partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
    partitioner = new RebalancePartitioner<Object>();
}

三灸蟆、參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市亲族,隨后出現(xiàn)的幾起案子炒考,更是在濱河造成了極大的恐慌可缚,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件斋枢,死亡現(xiàn)場離奇詭異帘靡,居然都是意外死亡,警方通過查閱死者的電腦和手機瓤帚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來戈次,“玉大人,你說我怎么就攤上這事朝扼。” “怎么了榛斯?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵驮俗,是天一觀的道長。 經(jīng)常有香客問我王凑,道長聋丝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任百姓,我火速辦了婚禮,結(jié)果婚禮上垒拢,老公的妹妹穿的比我還像新娘火惊。我一直安慰自己,他們只是感情好屹耐,可當我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般鸵贬。 火紅的嫁衣襯著肌膚如雪脖捻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天地沮,我揣著相機與錄音摩疑,去河邊找鬼。 笑死雷袋,一個胖子當著我的面吹牛辞居,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瓦灶,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼贼陶,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了碉怔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤庙楚,失蹤者是張志新(化名)和其女友劉穎趴樱,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體涣楷,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了狡忙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡呢袱,死狀恐怖羞福,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情治专,我是刑警寧澤遭顶,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站喘批,受9級特大地震影響铣揉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜老速,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望额湘。 院中可真熱鬧旁舰,春花似錦、人聲如沸箭窜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽竹捉。三九已至,卻和暖如春块差,著一層夾襖步出監(jiān)牢的瞬間倔丈,已是汗流浹背状蜗。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留宏邮,地道東北人。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓蜀铲,卻偏偏與公主長得像属百,于是被迫代替她去往敵國和親变姨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容