在Flink
中,由用戶代碼生成調(diào)度層圖結(jié)構(gòu)蚓土,可以分成3
步走:通過Stream API
編寫的用戶代碼 -> StreamGraph
-> JobGraph
-> ExecutionGraph
宙橱。
-
StreamGraph
:根據(jù)用戶通過Stream API
編寫的代碼生成的最初的圖姨俩,用來表示程序的拓?fù)浣Y(jié)構(gòu)蘸拔。 -
JobGraph
:StreamGraph
經(jīng)過算子連接等優(yōu)化后生成的圖师郑,它是提交給JobManager
的數(shù)據(jù)結(jié)構(gòu)环葵。 -
ExecutionGraph
:JobManager
根據(jù)JobGraph
生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)宝冕。
以SocketWindowWordCount
為例张遭,其執(zhí)行圖的演變過程如下圖所示:
詳解三部曲
Step1:通過Stream API
編寫的用戶代碼 —> StreamGraph
本小節(jié)主要介紹Flink
是如何根據(jù)用戶用Stream API
編寫的程序,構(gòu)造出一個(gè)代表拓?fù)浣Y(jié)構(gòu)的StreamGraph
的地梨。
找突破口
StreamGraph
的相關(guān)代碼主要在org.apache.flink.streaming.api.graph
包中菊卷。主要邏輯集中在StreamGraphGenerator
類,入口函數(shù)是StreamGraphGenerator.generate(env, transformations)
宝剖,該函數(shù)由觸發(fā)程序執(zhí)行的方法StreamExecutionEnvironment.execute()
調(diào)用到洁闰。
理關(guān)鍵點(diǎn)
- 根據(jù)用戶通過
Stream API
編寫的程序,構(gòu)建transformations
參數(shù)万细。 - 遍歷
transformations
集合扑眉,創(chuàng)建StreamNode
和StreamEdge
,構(gòu)造StreamGraph
赖钞。
構(gòu)建過程
StreamGraphGenerator.generate()
的一個(gè)關(guān)鍵參數(shù)是transformations
腰素,它是env
的成員變量之一,用List<StreamTransformation<? >>
來保存雪营。其中弓千,StreamTransformation
代表了從一個(gè)或多個(gè)DataStream
生成新DataStream
的操作。
DataStream
上常見的transformation
有map
献起、flatmap
洋访、filter
等。這些transformation
會(huì)構(gòu)造出一棵StreamTransformation
樹谴餐,通過這棵樹轉(zhuǎn)換成StreamGraph
姻政。
DataStream
上的每一個(gè)transformation
都對(duì)應(yīng)了一個(gè)StreamOperator
,StreamOperator
是運(yùn)行時(shí)的具體實(shí)現(xiàn)总寒,會(huì)決定UDF(User-Defined Funtion)
的調(diào)用方式扶歪。
以dataStream.map
為例,用戶編寫的UDF(User-Defined Funtion)
構(gòu)造出StreamTransformation
的過程如下圖所示:
從上圖可以看出摄闸,map
轉(zhuǎn)換善镰,首先是將用戶自定義的函數(shù)MapFunction
包裝到StreamMap
這個(gè)StreamOperator
中;然后是將StreamMap
包裝到OneInputTransformation
中年枕,并建立與上游的關(guān)系炫欺;最后將transformation
存到env
的transformations
集合中。
我們看一下SocketWindowWordCount
示例熏兄,其transformations
樹的結(jié)構(gòu)如下圖所示品洛,其中符號(hào)*
為input
指針树姨,指向上游的transformation
,從而形成了一顆transformations
樹桥状。
當(dāng)調(diào)用env.execute()
時(shí)帽揪,會(huì)觸發(fā)StreamGraphGenerator.generate(env, transformations)
遍歷其中的transformations
集合構(gòu)造出StreamGraph
。自底向上遞歸調(diào)用每一個(gè)transformation
辅斟,所以真正的處理順序是Source->Flat Map->Hash(keyBy)->TriggerWindow->Sink
转晰。
入口:
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
真正的構(gòu)建過程:
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
遍歷transformations
集合,并對(duì)其每一個(gè)StreamTransformation
調(diào)用transform()
方法士飒。
private Collection<Integer> transform(StreamTransformation<?> transform) {
...
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
...
}
從上述代碼可以看出查邢,針對(duì)具體的某一種類型的StreamTransformation
,會(huì)調(diào)用其相應(yīng)的transformXXX()
函數(shù)進(jìn)行轉(zhuǎn)換酵幕。transformXXX()
首先會(huì)對(duì)transform
的上游transform
進(jìn)行遞歸轉(zhuǎn)換扰藕,確保上游的都已經(jīng)完成了轉(zhuǎn)化。然后通過addOperator()
方法構(gòu)造出StreamNode
芳撒,通過addEdge()
方法與上游的transform
進(jìn)行連接邓深,構(gòu)造出StreamEdge
。
注意:
對(duì)邏輯轉(zhuǎn)換(partition番官、union
等)的處理庐完,不會(huì)生成具體的StreamNode
和StreamEdge
,而是通過streamGraph.addVirtualXXXNode()
方法添加一個(gè)虛擬節(jié)點(diǎn)徘熔。當(dāng)下游transform
添加edge
時(shí)门躯,會(huì)把虛擬節(jié)點(diǎn)信息寫入到StreamEdge
中。
示例講解:
在SocketWindowWordCount
示例中酷师,Flat Map
操作會(huì)被封裝到OneInputTransformation
類中讶凉,我們可以看一看transformOneInputTransform
的實(shí)現(xiàn):
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
// 遞歸對(duì)transformation的直接上游transformation進(jìn)行轉(zhuǎn)換,獲取直接上游id集合
Collection<Integer> inputIds = transform(transform.getInput());
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 構(gòu)建StreamNode的入口
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 構(gòu)建StreamEdge的入口
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
類似的山孔,在SocketWindowWordCount
示例中懂讯,keyBy
操作會(huì)被封裝到PartitionTransformation
類中,最后我們?cè)僖?code>transformPartition為例看下對(duì)邏輯轉(zhuǎn)換的處理台颠。
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
StreamTransformation<T> input = partition.getInput();
List<Integer> resultIds = new ArrayList<>();
// 遞歸對(duì)該transformation的直接上游transformation進(jìn)行轉(zhuǎn)換褐望,獲得直接上游id集合
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
int virtualId = StreamTransformation.getNewNodeId();
// 添加一個(gè)虛擬分區(qū)節(jié)點(diǎn)VirtualPartitionNode,不會(huì)生成StreamNode
streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
resultIds.add(virtualId);
}
return resultIds;
}
從transformPartition
函數(shù)的實(shí)現(xiàn)可以看出串前,對(duì)transformPartition
的轉(zhuǎn)換沒有生成具體的StreamNode
和StreamEdge
瘫里,而是通過streamGraph.addVirtualPartitionNode()
方法添加了一個(gè)虛擬節(jié)點(diǎn)。當(dāng)partition
的下游transform
添加edge
時(shí)(調(diào)用streamGraph.addEdge()
)荡碾,會(huì)把partition
信息寫入到StreamEdge
中谨读。
最后的最后我們?cè)倏聪?code>streamGraph.addEdgeInternal()的實(shí)現(xiàn):
private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag) {
// 當(dāng)上游是sideOutput時(shí),遞歸調(diào)用坛吁,并傳入sideOutput信息
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
}
// 當(dāng)上游是select時(shí)劳殖,遞歸調(diào)用铐尚,并傳入select信息
else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
}
// 當(dāng)上游是partition時(shí),遞歸調(diào)用哆姻,并傳入partitioner信息
else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
}
// 不是以上邏輯轉(zhuǎn)換的情況宣增,真正構(gòu)建StreamEdge
else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 沒有指定partitioner時(shí),會(huì)為其選擇forward或者rebalance分區(qū)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
// 創(chuàng)建StreamEdge填具,并將該StreamEdge添加到上游的輸出统舀,下游的輸入
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
一句話總結(jié):
首先匆骗,根據(jù)用戶通過Stream API
自定義UDF
編寫的程序劳景,即在DataStream
上做的一系列轉(zhuǎn)換(map、shuffle
碉就、window
等)盟广,我們可以得到StreamTransformation
集合。然后通過調(diào)用streamGraphGenerator.generate(env, transformations)
瓮钥,遍歷transformations
集合筋量,并對(duì)其每一個(gè)StreamTransformation
調(diào)用transform()
方法,構(gòu)造出StreamNode
碉熄,并通過StreamEdge
與上游的transformation
進(jìn)行連接桨武,此處需要特別注意對(duì)邏輯轉(zhuǎn)換(partition
等)的處理,最后構(gòu)造出StreamGraph
锈津。
Step2:StreamGraph
—> JobGraph
本小節(jié)主要介紹Flink
是如何將StreamGraph
轉(zhuǎn)換成JobGraph
的呀酸。該轉(zhuǎn)換的關(guān)鍵在于,將多個(gè)符合條件的StreamNode
節(jié)點(diǎn)chain
在一起作為一個(gè)JobVertex
節(jié)點(diǎn)琼梆,這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗性誉。
找突破口
JobGraph
的相關(guān)數(shù)據(jù)結(jié)構(gòu)主要在flink-runtime
模塊下的org.apache.flink.runtime.jobgraph
包中。構(gòu)造JobGraph
的代碼主要集中在StreamingJobGraphGenerator
類中茎杂,開啟構(gòu)建之路的入口函數(shù)是StreamingJobGraphGenerator(streamGraph).createJobGraph()
错览。
理關(guān)鍵點(diǎn)
判斷算子chain
,合并創(chuàng)建JobVertex
煌往,并生成JobEdge
倾哺。JobVertex
和JobEdge
之間通過創(chuàng)建IntermediateDataSet
來連接」舨保可以簡(jiǎn)單分為3
個(gè)關(guān)鍵點(diǎn):
-
chain
的判斷 - 生成
JobVertex
- 創(chuàng)建
JobEdge
和IntermediateDataSet
構(gòu)建過程
入口:
private JobGraph createJobGraph() {
...
setChaining(); //遞歸創(chuàng)建JobVertex羞海、JobEdge、IntermediateDataSet曾棕,用以構(gòu)建JobGraph
...
return jobGraph;
}
真正的構(gòu)建過程:
void setChaining(){
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, ...);
}
}
private List<StreamEdge> createChain(Integer startNodeId, Integer currentNodeId, ...){
...
// 將當(dāng)前節(jié)點(diǎn)的出邊分為兩類:chainableOutputs和nonChainableOutputs
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
// 分別遍歷chainableOutputs和nonChainableOutputs扣猫,遞歸調(diào)用自身方法creatChain()
// 并將nonChainableOutputs的邊或者chainableOutputs調(diào)用createChain()的返回值添加到transitiveOutEdges中
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), ...));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), ...);
}
// 如果當(dāng)前節(jié)點(diǎn)是起始節(jié)點(diǎn),則直接創(chuàng)建JobVertex翘地,否則返回一個(gè)空的SteamConfig
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, ...) : new StreamConfig(new Configuration());
// 將StreamNode中的配置信息序列化到StreamConfig中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
// 再次判斷申尤,如果是chain的起始節(jié)點(diǎn)癌幕,執(zhí)行connect()方法,創(chuàng)建JobEdge和IntermediateDataSet昧穿;否則將當(dāng)前節(jié)點(diǎn)的StreamConfig添加到chainedConfig中
if (currentNodeId.equals(startNodeId)) {
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
} else {
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
...
return transitiveOutEdges;
}
setChaining()
會(huì)依次對(duì)source
調(diào)用createChain()
方法勺远,該方法會(huì)遞歸調(diào)用其下游節(jié)點(diǎn),從而構(gòu)建出node chains
时鸵。createChain()
會(huì)分析當(dāng)前節(jié)點(diǎn)的出邊胶逢,根據(jù)Operator Chains
中的條件進(jìn)行判斷isChainable()
,并將出邊分成兩類:chainalbeOutputs
和noChainableOutputs
饰潜,接著分別遞歸調(diào)用自身方法初坠。之后會(huì)將StreamNode
中的配置信息序列化到StreamConfig
中。如果當(dāng)前不是chain
中的子節(jié)點(diǎn)彭雾,則會(huì)構(gòu)建JobVertex
和JobEdge
相連碟刺。如果是chain
中的子節(jié)點(diǎn),則會(huì)將StreamConfig
添加到該chain
的chainedConfigs
集合中薯酝。
示例講解:
同樣地半沽,以SocketWindowWordCount
為例,我們分析下其創(chuàng)建過程:
如上圖所示吴菠,我們先給4
個(gè)StreamNode
節(jié)點(diǎn)進(jìn)行編號(hào)者填,Source
用1
表示,Flat Map
用2
表示做葵,Trigger Window
用3
表示占哟,Sink
用4
表示;相應(yīng)地蜂挪,3
條StreamEdge
則分別用1->2
重挑,2->3
,3->4
表示棠涮。
遞歸調(diào)用過程如下:
- 遞歸始于
Source
谬哀,調(diào)用createChain(1, 1)
,當(dāng)前節(jié)點(diǎn)1
的出邊為1->2
严肪,不可chain
史煎,將邊1->2
直接加入transitiveOutEdges
; - 然后遞歸調(diào)用
createChain(2, 2)
驳糯,當(dāng)前節(jié)點(diǎn)2
的出邊為2->3
篇梭,同樣的,不可chain
酝枢,將邊2->3
直接加入transitiveOutEdges
恬偷; - 繼續(xù)遞歸調(diào)用
createChain(3, 3)
,當(dāng)前節(jié)點(diǎn)3
的出邊為3->4
帘睦,要注意了袍患,可chain
坦康,等著將下游createChain()
的返回值加入transitiveOutEdges
; - 此處遞歸調(diào)用
createChain(3, 4)
诡延,當(dāng)前節(jié)點(diǎn)4
沒有出邊滞欠,遞歸終止。
遞歸結(jié)束條件:
- 當(dāng)前節(jié)點(diǎn)不再有出邊集合肆良,即
streamGraph.getStreamNode(currentId).getOutEdges()
為空 - 當(dāng)前節(jié)點(diǎn)已經(jīng)轉(zhuǎn)換完成筛璧,即
builtVertices.contains(startNodeId)
為false
遞歸調(diào)用過程中各種操作以及變量情況一覽表如下:
creatChain() |
getOutEdges() |
chainAble |
nonChainAble |
transitiveOutEdges |
JobVertex |
connect() |
---|---|---|---|---|---|---|
(1, 1) |
1->2 |
無 | 1->2 |
1->2 |
JobVertex |
Y |
(2, 2) |
2->3 |
無 | 2->3 |
2->3 |
JobVertex |
Y |
(3, 3) |
3->4 |
3->4 |
無 | 無 | JobVertex |
Y |
(3, 4) |
無 | 無 | 無 | 無 | StreamConfig |
N |
關(guān)鍵方法分析:
isChainable()
,用來判斷StreamNode chain
惹恃,一共有9
個(gè)條件:
- 下游節(jié)點(diǎn)的入邊為
1
-
StreamEdge
的下游節(jié)點(diǎn)對(duì)應(yīng)的算子不為null
-
StreamEdge
的上游節(jié)點(diǎn)對(duì)應(yīng)的算子不為null
-
StreamEdge
的上下游節(jié)點(diǎn)擁有相同的slotSharingGroup
夭谤,默認(rèn)都是default
- 下游算子的連接策略為
ALWAYS
- 上游算子的連接策略為
ALWAYS
或者HEAD
-
StreamEdge
的分區(qū)類型為ForwardPartitioner
- 上下游節(jié)點(diǎn)的并行度一致
- 當(dāng)前
StreamGraph
允許做chain
createJobVertex()
,用來創(chuàng)建JobVertex
節(jié)點(diǎn)座舍,并返回StreamConfig
沮翔。
createJobVertex()
傳入的參數(shù)為StreamNode
。首先會(huì)通過new JobVertex()
構(gòu)造出JobVertex
節(jié)點(diǎn)曲秉,然后通過JobVertex.setInvokableClass(streamNode.getJobVertexClass())
設(shè)置運(yùn)行時(shí)執(zhí)行類,再通過jobVertex.setParallelism(parallelism)
設(shè)置并行度疲牵,最后返回StreamConfig
承二。
connect()
,用來創(chuàng)建JobEdge
和IntermediateDataSet
纲爸,連接上下游JobVertex
節(jié)點(diǎn)亥鸠。
遍歷transitiveOutEdges
,并將每一條StreamEdge
邊作為參數(shù)傳入connect( )
函數(shù)中识啦。接下來就是依據(jù)StreamEdge
得到上下游JobVertex
節(jié)點(diǎn)负蚊;然后,通過StreamEdge.getPartitioner()
方法得到StreamPartitioner
屬性颓哮,對(duì)于ForwardPartitioner
和RescalePartitioner
兩種分區(qū)方式建立DistributionPattern.POINTWISE
類型的JobEdge
和IntermediateDataSet
家妆,而其他的分區(qū)方式則是DistributionPattern.ALL_TO_ALL
類型。至此已經(jīng)建立好上下游JobVertex
節(jié)點(diǎn)間的聯(lián)系冕茅。
一句話總結(jié):
首先伤极,通過streamGraph.getSourceIDs()
拿到source
節(jié)點(diǎn)集合,緊接著依次從source
節(jié)點(diǎn)開始遍歷姨伤,判斷StreamNode Chain
哨坪,遞歸創(chuàng)建JobVertex
,所以乍楚,其真正的處理順序其實(shí)是從sink
開始的当编。然后通過connect()
遍歷當(dāng)前節(jié)點(diǎn)的物理出邊transitiveOutEdges
集合,創(chuàng)建JobEdge
徒溪,建立當(dāng)前節(jié)點(diǎn)與下游節(jié)點(diǎn)的聯(lián)系忿偷,即JobVertex
與IntermediateDataSet
之間拧篮。
Step3:JobGraph
—> ExecutionGraph
本小節(jié)主要介紹Flink
是如何將JobGraph
轉(zhuǎn)換成ExecutionGraph
的。簡(jiǎn)單來說牵舱,就是并行化JobGraph
串绩,為調(diào)度做好準(zhǔn)備。
找突破口
ExecutionGraph
的相關(guān)數(shù)據(jù)結(jié)構(gòu)主要在org.apache.flink.runtime.executiongraph
包中芜壁,構(gòu)造ExecutionGraph
的代碼集中在ExecutionGraphBuilder
類和ExecutionGraph
類中礁凡,入口函數(shù)是ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)
。
理關(guān)鍵點(diǎn)
- 客戶端提交
JobGraph
給JobManager
- 構(gòu)建ExecutionGraph對(duì)象
- 將
JobGraph
進(jìn)行拓?fù)渑判蚧弁玫?code>sortedTopology頂點(diǎn)集合 - 將
JobVertex
封裝成ExecutionJobVertex
- 把
ExecutionVertex
節(jié)點(diǎn)通過ExecutionEdge
連接起來
- 將
構(gòu)建過程
1顷牌、JobClient
提交JobGraph
給JobManager
一個(gè)程序的JobGraph
真正被提交始于對(duì)JobClient
的submitJobAndWait()
方法的調(diào)用,而且submitJobAndWait()
方法會(huì)觸發(fā)基于Akka
的Actor
之間的消息通信塞淹。JobClient
在這其中起到了“橋接”的作用窟蓝,它連接了同步的方法調(diào)用和異步的消息通信。
在submitJobAndWait()
方法中饱普,首先會(huì)創(chuàng)建一個(gè)JobClientActor
的ActorRef
运挫,并向其發(fā)送一個(gè)包含JobGraph
實(shí)例的SubmitJobAndWait
消息。該SubmitJobAndWait
消息被JobClientActor
接收后套耕,調(diào)用trySubmitJob()
方法觸發(fā)真正的提交動(dòng)作谁帕,即通過jobManager.tell( )
的方式給JobManager Actor
發(fā)送封裝JobGraph
的SubmitJob
消息。隨后冯袍,JobManager Actor
會(huì)接收到來自JobClientActor
的該SubmitJob
消息匈挖,進(jìn)而觸發(fā)submitJob()
方法。
由此可見康愤,一個(gè)JobGraph
從提交開始會(huì)經(jīng)過多個(gè)對(duì)象層層遞交儡循,各個(gè)對(duì)象之間的交互關(guān)系如下圖所示:
2、
構(gòu)建ExecutionGraph
對(duì)象
入口:
JobManager
作為Actor
征冷,在handleMessage()
方法中择膝,針對(duì)SubmitJob
消息調(diào)用submitJob()
方法。
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
...
executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)
...
}
真正的構(gòu)建過程:
public static ExecutionGraph buildGraph(){
...
//對(duì)JobGraph中的JobVertex節(jié)點(diǎn)進(jìn)行拓?fù)渑判蜃手眩玫絃ist<JobVertex>
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology); //構(gòu)建ExecutionGraph的核心方法
...
}
由上可知调榄,attachJobGraph()
方法是構(gòu)建ExecutionGraph
圖結(jié)構(gòu)的核心方法。
public void attachJobGraph(List<JobVertex> topologiallySorted){
...
for (JobVertex jobVertex : topologiallySorted) {
...
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
...
}
...
}
下面詳細(xì)分析下呵扛,attachJobGraph()
方法主要完成的兩件事情:
- 將
JobVertex
封裝成ExecutionJobVertex
- 把節(jié)點(diǎn)通過
ExecutionEdge
連接
關(guān)鍵方法分析:
new ExecutionJobVertex()
方法每庆,用來將一個(gè)個(gè)JobVertex
封裝成ExecutionJobVertex
,并依次創(chuàng)建ExecutionVertex
今穿、Execution
缤灵、IntermediateResult
、IntermediateResultPartition
,用于豐富ExecutionGraph
腮出。
在ExecutionJobVertex
的構(gòu)造函數(shù)中帖鸦,首先是依據(jù)對(duì)應(yīng)的JobVertex
的并發(fā)度,生成對(duì)應(yīng)個(gè)數(shù)的ExecutionVertex
胚嘲。其中作儿,一個(gè)ExecutionVertex
代表著一個(gè)ExecutionJobVertex
的并發(fā)子task
。然后是將原來JobVertex
的中間結(jié)果IntermediateDataSet
轉(zhuǎn)化為ExecutionGraph
中的IntermediateResult
馋劈。
類似的攻锰,ExecutionVertex
的構(gòu)造函數(shù)中,首先會(huì)創(chuàng)建IntermediateResultPartition
妓雾,并通過IntermediateResult.setPartition( )
建立IntermediateResult
和IntermediateResultPartition
之間的關(guān)系娶吞;然后生成Execution
,并配置資源相關(guān)械姻。
新創(chuàng)建的ExecutionJobVertex
調(diào)用ejv.connectToPredecessor()
方法妒蛇,按照不同的分發(fā)策略連接上游,其參數(shù)為上游生成的IntermediateResult
集合楷拳。其中绣夺,根據(jù)JobEdge
中兩種不同的DistributionPattern
屬性,分別調(diào)用connectPointWise()
或者connectAllToAll( )
方法唯竹,創(chuàng)建ExecutionEdge
乐导,將ExecutionVertex
和上游的IntermediateResultPartition
連接起來。
其中浸颓,SocketWindowWordCount
示例中,就是采用了connectAllToAll()
的方式建立與上游的關(guān)系旺拉。
接下來产上,我們?cè)敿?xì)介紹下connectPointWise()
方法的實(shí)現(xiàn),即DistributionPattern.POINTWISE
策略蛾狗,該策略用來連接當(dāng)前ExecutionVertex
與上游的IntermediateResultPartition
晋涣。首先,獲取上游IntermediateResult
的partition
數(shù)沉桌,用numSources
表示谢鹊,以及此ExecutionJobVertex
的并發(fā)度,用parallelism
表示留凭;然后佃扼,根據(jù)其并行度的不同,分別創(chuàng)建ExecutionEdge
蔼夜。共分3
種情況:
(1) 如果并發(fā)數(shù)等于partition
數(shù)兼耀,則一對(duì)一進(jìn)行連接。如下圖所示:
即numSources == parallelism
(2) 如果并發(fā)數(shù)大于partition
數(shù),則一對(duì)多進(jìn)行連接瘤运。如下圖所示:
即numSources < parallelism
窍霞,且parallelism % numSources == 0
即
numSources < parallelism
,且parallelism % numSources != 0
(3) 如果并發(fā)數(shù)小于partition
數(shù)拯坟,則多對(duì)一進(jìn)行連接但金。如下圖所示:
即numSources > parallelism
,且numSources % parallelism == 0
即
numSources > parallelism
郁季,且numSources % parallelism != 0
一句話總結(jié):
將JobGraph
按照拓?fù)渑判蚝蟮玫揭粋€(gè)JobVertex
集合冷溃,遍歷該JobVertex
集合,即從source
開始巩踏,將JobVertex
封裝成ExecutionJobVertex
秃诵,并依次創(chuàng)建ExecutionVertex
、Execution
塞琼、IntermediateResult
和IntermediateResultPartition
菠净。然后通過ejv.connectToPredecessor()
方法,創(chuàng)建ExecutionEdge
彪杉,建立當(dāng)前節(jié)點(diǎn)與其上游節(jié)點(diǎn)之間的聯(lián)系毅往,即連接ExecutionVertex
和IntermediateResultPartition
。
終章
構(gòu)建好ExecutionGraph
派近,接下來會(huì)基于ExecutionGraph
觸發(fā)Job
的調(diào)度攀唯,申請(qǐng)Slot
,真正的部署任務(wù)渴丸,這是Task
被執(zhí)行的前提:
if (leaderElectionService.hasLeadership) {
log.info(s"Scheduling job $jobId ($jobName).")
executionGraph.scheduleForExecution() // 將生成好的ExecutionGraph進(jìn)行調(diào)度
} else {
self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
"this. I am not scheduling the job for execution.")
}
下一篇文章將介紹Flink
的Schedule
機(jī)制侯嘀,敬請(qǐng)期待~