由前文我們知道,StreamGraph 表示一個流任務的邏輯拓撲宵统,可以用一個 DAG 來表示(代碼實現(xiàn)上沒有一個 DAG 結(jié)構(gòu))姐扮,DAG 的頂點是 StreamNode,邊是 StreamEdge硫戈,邊包含了由哪個 StreamNode 依賴哪個 StreamNode。本文我們主要介紹一個 StreamGraph 是如何轉(zhuǎn)換成一個 JobGraph下硕。
一丁逝、JobGraph 概述
- JobGraph 將會在原來的基礎上做相應的優(yōu)化(主要是算子的 Chain 操作汁胆,Chain 在一起的算子將會在同一個 task 上運行,會極大減少 shuffle 的開銷)
- JobGraph 用來由 JobClient 提交給 JobManager霜幼,是由頂點(
JobVertex
)沦泌、中間結(jié)果(IntermediateDataSet
)和邊(JobEdge
)組成的 DAG 圖 - JobGraph 定義作業(yè)級別的配置,而每個頂點和中間結(jié)果定義具體操作和中間數(shù)據(jù)的設置
為什么要有 StreamGraph 和 JobGraph 兩層的 Graph辛掠,最主要的原因是為兼容 batch process谢谦,Streaming process 最初產(chǎn)生的是 StreamGraph,而 batch process 產(chǎn)生的則是 OptimizedPlan萝衩,但是它們最后都會轉(zhuǎn)換為 JobGraph
1.1回挽、JobVertex
JobVertex 相當于是 JobGraph 的頂點,跟 StreamNode 的區(qū)別是猩谊,它是 Operator Chain 之后的頂點千劈,會包含多個 StreamNode。主要成員:
-
List<OperatorIDPair> operatorIDs
:該 job 節(jié)點包含的所有 operator ids牌捷,以深度優(yōu)先方式存儲 ids -
ArrayList<JobEdge> inputs
:帶輸入數(shù)據(jù)的邊列表 -
ArrayList<IntermediateDataSet> results
:job 節(jié)點計算出的中間結(jié)果
1.2墙牌、IntermediateDataSet
它是由一個 Operator(可能是 source,也可能是某個中間算子)產(chǎn)生的一個中間數(shù)據(jù)集暗甥。中間數(shù)據(jù)集可能會被其他 operators 讀取喜滨,物化或丟棄。主要成員:
-
JobVertex producer
:該中間結(jié)果的生產(chǎn)者 -
List<JobEdge> consumers
:該中間結(jié)果消費邊撤防,通過消費邊指向消費的節(jié)點 -
ResultPartitionType resultType
:中間結(jié)果的分區(qū)類型- 流水線的(有界的或無界的):一旦產(chǎn)生數(shù)據(jù)就向下游發(fā)送虽风,可能是逐個發(fā)送的,有界或無界的記錄流寄月。
- 阻塞:僅在生成完整結(jié)果時向下游發(fā)送數(shù)據(jù)辜膝。
1.3、JobEdge
它相當于是 JobGraph 中的邊(連接通道)漾肮,這個邊連接的是一個 IntermediateDataSet 跟一個要消費的 JobVertex厂抖。主要成員:
-
IntermediateDataSet sourc
:邊的源 -
JobVertex target
:邊的目標 -
DistributionPattern distributionPattern
:決定了在上游節(jié)點(生產(chǎn)者)的子任務和下游節(jié)點(消費者)之間的連接模式-
ALL_TO_ALL
:每個生產(chǎn)子任務都連接到消費任務的每個子任務 -
POINTWISE
:每個生產(chǎn)子任務都連接到使用任務的一個或多個子任務
-
二、Create Job Graph 主要流程
2.1克懊、核心步驟
2.2、setChaining
從 Source StreamNode 實例開始設置 task chain保檐,它將會遞歸地創(chuàng)建所有的 JobVertex 實例
這個方法首先從會遍歷這個 StreamGraph 的所有 source 節(jié)點崔梗,然后選擇從 source 節(jié)點開始執(zhí)行 createChain()
方法夜只,在具體的實現(xiàn)里蒜魄,主要邏輯如下
總結(jié)下這個流程:
- 從輸入節(jié)點開始场躯,判斷邊的輸出節(jié)點能否加入到該 chain
- 如果可以,則繼續(xù)從輸出節(jié)點執(zhí)行擴展該 chain
- 否則踢关,當前 chain 結(jié)束,以輸出節(jié)點為初始節(jié)點粘茄,遞歸創(chuàng)建新的 chain
- 如果當前節(jié)點為 chain 的首節(jié)點签舞,那么就創(chuàng)建一個 JobVertex,否則創(chuàng)建 StreamConfig柒瓣,記錄到 chainedConfigs(由于調(diào)用鏈上后面的節(jié)點先創(chuàng)建儒搭,因此創(chuàng)建首節(jié)點的 JobVertex 時,就可以使用 chainedConfigs 記錄的信息了)
其中 JobEdge 是通過下游 JobVertex 的 connectNewDataSetAsInput
方法來創(chuàng)建的芙贫,在創(chuàng)建 JobEdge 之前搂鲫,會先用上游 JobVertex 創(chuàng)建一個 IntermediateDataSet 實例,用來作為上游 JobVertex 的結(jié)果輸出磺平,然后作為 JobEdge 的輸入魂仍,構(gòu)建JobEdge實例,具體實現(xiàn)如下:
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
/** 創(chuàng)建輸入JobVertex的輸出數(shù)據(jù)集合 */
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
/** 構(gòu)建 JobEdge 實例 */
JobEdge edge = new JobEdge(dataSet, this, distPattern);
/** 將 JobEdge 實例拣挪,作為當前 JobVertex 的輸入 */
this.inputs.add(edge);
/** 設置中間結(jié)果集合 dataSet 的消費者是上面創(chuàng)建的 JobEdge */
dataSet.addConsumer(edge);
return edge;
}
通過上述的構(gòu)建過程擦酌,就可以實現(xiàn)上下游 JobVertex 的連接,上游 JobVertex ——> 中間結(jié)果集合 IntermediateDataSet ——> JobEdge ——> 下游 JobVertex
菠劝。其中:
- IntermediateDataSet 和 JobEdge 是用來建立上下游 JobVertex 之間連接的配置
- 一個 IntermediateDataSet 有一個 producer仑氛,可以有多個消費者 JobEdge
- 一個 JobEdge 則有一個數(shù)據(jù)源 IntermediateDataSet,一個目標JobVertex
- 一個 JobVertex 可以產(chǎn)生多個輸出 IntermediateDataSet闸英,也可以接受來自多個 JobEdge 的數(shù)據(jù)
2.3锯岖、算子 Chainable 的依據(jù)
isChainable()
的判斷依據(jù)如下:
return downStreamVertex.getInEdges().size() == 1 //
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) // 對應的 slotSharingGroup 一樣
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS // out operator 允許 chain 操作
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || // head Operator 允許跟后面的 chain 在一起
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner) // partitioner 是 ForwardPartitioner 類型
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() // 并發(fā)相等
&& streamGraph.isChainingEnabled(); // StreamGraph 允許 Chain 在一起
2.3.1、slotSharingGroup
一個 StreamNode 的 SlotSharingGroup 會按照下面這個邏輯來確定:
- 如果用戶指定了 SlotSharingGroup甫何,直接使用這個 SlotSharingGroup name出吹;
- 如果所有的 input 都是同一個 group name,使用這個即可辙喂;
- 否則使用 default group捶牢;
2.3.2、edge.getPartitioner()
StreamPartitioner 的實現(xiàn)
用戶可以在自己的代碼中調(diào)用 DataStream API (比如:broadcast()
巍耗、shuffle()
等)配置相應的 StreamPartitioner秋麸,如果這個沒有指定 StreamPartitioner 的話,則會走下面的邏輯創(chuàng)建默認的 StreamPartitioner:
//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的話炬太,會為其選擇 forward(并發(fā)設置相同時) 或 rebalance(并發(fā)設置不同時)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}