Flink之用戶代碼生成調(diào)度層圖結(jié)構(gòu)

Flink中,由用戶代碼生成調(diào)度層圖結(jié)構(gòu)蚓土,可以分成3步走:通過Stream API編寫的用戶代碼 -> StreamGraph -> JobGraph -> ExecutionGraph宙橱。

  • StreamGraph:根據(jù)用戶通過Stream API編寫的代碼生成的最初的圖姨俩,用來表示程序的拓?fù)浣Y(jié)構(gòu)蘸拔。
  • JobGraphStreamGraph經(jīng)過算子連接等優(yōu)化后生成的圖师郑,它是提交給JobManager的數(shù)據(jù)結(jié)構(gòu)环葵。
  • ExecutionGraphJobManager根據(jù)JobGraph生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)宝冕。

SocketWindowWordCount為例张遭,其執(zhí)行圖的演變過程如下圖所示:

轉(zhuǎn)換演示圖

詳解三部曲

Step1:通過Stream API編寫的用戶代碼 —> StreamGraph

本小節(jié)主要介紹Flink是如何根據(jù)用戶用Stream API編寫的程序,構(gòu)造出一個(gè)代表拓?fù)浣Y(jié)構(gòu)的StreamGraph的地梨。

找突破口

StreamGraph的相關(guān)代碼主要在org.apache.flink.streaming.api.graph包中菊卷。主要邏輯集中在StreamGraphGenerator類,入口函數(shù)是StreamGraphGenerator.generate(env, transformations)宝剖,該函數(shù)由觸發(fā)程序執(zhí)行的方法StreamExecutionEnvironment.execute()調(diào)用到洁闰。

理關(guān)鍵點(diǎn)
  • 根據(jù)用戶通過Stream API編寫的程序,構(gòu)建transformations參數(shù)万细。
  • 遍歷transformations集合扑眉,創(chuàng)建StreamNodeStreamEdge,構(gòu)造StreamGraph赖钞。
構(gòu)建過程

StreamGraphGenerator.generate()的一個(gè)關(guān)鍵參數(shù)是transformations腰素,它是env的成員變量之一,用List<StreamTransformation<? >>來保存雪营。其中弓千,StreamTransformation代表了從一個(gè)或多個(gè)DataStream生成新DataStream的操作。

DataStream上常見的transformationmap献起、flatmap洋访、filter等。這些transformation會(huì)構(gòu)造出一棵StreamTransformation樹谴餐,通過這棵樹轉(zhuǎn)換成StreamGraph姻政。

DataStream上的每一個(gè)transformation都對(duì)應(yīng)了一個(gè)StreamOperatorStreamOperator是運(yùn)行時(shí)的具體實(shí)現(xiàn)总寒,會(huì)決定UDF(User-Defined Funtion)的調(diào)用方式扶歪。

dataStream.map為例,用戶編寫的UDF(User-Defined Funtion)構(gòu)造出StreamTransformation的過程如下圖所示:

StreamTransformation

從上圖可以看出摄闸,map轉(zhuǎn)換善镰,首先是將用戶自定義的函數(shù)MapFunction包裝到StreamMap這個(gè)StreamOperator中;然后是將StreamMap包裝到OneInputTransformation中年枕,并建立與上游的關(guān)系炫欺;最后將transformation存到envtransformations集合中。

我們看一下SocketWindowWordCount示例熏兄,其transformations樹的結(jié)構(gòu)如下圖所示品洛,其中符號(hào)*input指針树姨,指向上游的transformation,從而形成了一顆transformations樹桥状。

transformations.png

當(dāng)調(diào)用env.execute()時(shí)帽揪,會(huì)觸發(fā)StreamGraphGenerator.generate(env, transformations)遍歷其中的transformations集合構(gòu)造出StreamGraph。自底向上遞歸調(diào)用每一個(gè)transformation辅斟,所以真正的處理順序是Source->Flat Map->Hash(keyBy)->TriggerWindow->Sink转晰。

入口:

public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
    return new StreamGraphGenerator(env).generateInternal(transformations);
}

真正的構(gòu)建過程:

private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);
    }
    return streamGraph;
}

遍歷transformations集合,并對(duì)其每一個(gè)StreamTransformation調(diào)用transform()方法士飒。

private Collection<Integer> transform(StreamTransformation<?> transform) {
    ...
    if (transform instanceof OneInputTransformation<?, ?>) {
        transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
    } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
        transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
    } else if (transform instanceof SourceTransformation<?>) {
        transformedIds = transformSource((SourceTransformation<?>) transform);
    } else if (transform instanceof SinkTransformation<?>) {
        transformedIds = transformSink((SinkTransformation<?>) transform);
    } else if (transform instanceof UnionTransformation<?>) {
        transformedIds = transformUnion((UnionTransformation<?>) transform);
    } else if (transform instanceof SplitTransformation<?>) {
        transformedIds = transformSplit((SplitTransformation<?>) transform);
    } else if (transform instanceof SelectTransformation<?>) {
        transformedIds = transformSelect((SelectTransformation<?>) transform);
    } else if (transform instanceof FeedbackTransformation<?>) {
        transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
    } else if (transform instanceof CoFeedbackTransformation<?>) {
        transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
    } else if (transform instanceof PartitionTransformation<?>) {
        transformedIds = transformPartition((PartitionTransformation<?>) transform);
    } else if (transform instanceof SideOutputTransformation<?>) {
        transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
    } else {
        throw new IllegalStateException("Unknown transformation: " + transform);
    }
    ...
}

從上述代碼可以看出查邢,針對(duì)具體的某一種類型的StreamTransformation,會(huì)調(diào)用其相應(yīng)的transformXXX()函數(shù)進(jìn)行轉(zhuǎn)換酵幕。transformXXX()首先會(huì)對(duì)transform的上游transform進(jìn)行遞歸轉(zhuǎn)換扰藕,確保上游的都已經(jīng)完成了轉(zhuǎn)化。然后通過addOperator()方法構(gòu)造出StreamNode芳撒,通過addEdge()方法與上游的transform進(jìn)行連接邓深,構(gòu)造出StreamEdge

注意:

對(duì)邏輯轉(zhuǎn)換(partition番官、union等)的處理庐完,不會(huì)生成具體的StreamNodeStreamEdge,而是通過streamGraph.addVirtualXXXNode()方法添加一個(gè)虛擬節(jié)點(diǎn)徘熔。當(dāng)下游transform添加edge時(shí)门躯,會(huì)把虛擬節(jié)點(diǎn)信息寫入到StreamEdge中。

示例講解:

SocketWindowWordCount示例中酷师,Flat Map操作會(huì)被封裝到OneInputTransformation類中讶凉,我們可以看一看transformOneInputTransform的實(shí)現(xiàn):

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
    // 遞歸對(duì)transformation的直接上游transformation進(jìn)行轉(zhuǎn)換,獲取直接上游id集合
    Collection<Integer> inputIds = transform(transform.getInput());
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    // 構(gòu)建StreamNode的入口
    streamGraph.addOperator(transform.getId(), 
            slotSharingGroup, 
            transform.getOperator(), 
            transform.getInputType(), 
            transform.getOutputType(), 
            transform.getName());
    if (transform.getStateKeySelector() != null) {
        TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
        streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
    }
    streamGraph.setParallelism(transform.getId(), transform.getParallelism());
    streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    // 構(gòu)建StreamEdge的入口
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }
    return Collections.singleton(transform.getId());
}

類似的山孔,在SocketWindowWordCount示例中懂讯,keyBy操作會(huì)被封裝到PartitionTransformation類中,最后我們?cè)僖?code>transformPartition為例看下對(duì)邏輯轉(zhuǎn)換的處理台颠。

private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
    StreamTransformation<T> input = partition.getInput();
    List<Integer> resultIds = new ArrayList<>();
    // 遞歸對(duì)該transformation的直接上游transformation進(jìn)行轉(zhuǎn)換褐望,獲得直接上游id集合
    Collection<Integer> transformedIds = transform(input);
    for (Integer transformedId: transformedIds) {
        int virtualId = StreamTransformation.getNewNodeId();
        // 添加一個(gè)虛擬分區(qū)節(jié)點(diǎn)VirtualPartitionNode,不會(huì)生成StreamNode
        streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
        resultIds.add(virtualId);
    }
  return resultIds;
}

transformPartition函數(shù)的實(shí)現(xiàn)可以看出串前,對(duì)transformPartition的轉(zhuǎn)換沒有生成具體的StreamNodeStreamEdge瘫里,而是通過streamGraph.addVirtualPartitionNode()方法添加了一個(gè)虛擬節(jié)點(diǎn)。當(dāng)partition的下游transform添加edge時(shí)(調(diào)用streamGraph.addEdge())荡碾,會(huì)把partition信息寫入到StreamEdge中谨读。

最后的最后我們?cè)倏聪?code>streamGraph.addEdgeInternal()的實(shí)現(xiàn):

private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag) {
    // 當(dāng)上游是sideOutput時(shí),遞歸調(diào)用坛吁,并傳入sideOutput信息
    if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {
            outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
    }
    // 當(dāng)上游是select時(shí)劳殖,遞歸調(diào)用铐尚,并傳入select信息 
    else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
        if (outputNames.isEmpty()) {
            outputNames = virtualSelectNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
    }
    // 當(dāng)上游是partition時(shí),遞歸調(diào)用哆姻,并傳入partitioner信息 
    else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {
            partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
    }
    // 不是以上邏輯轉(zhuǎn)換的情況宣增,真正構(gòu)建StreamEdge 
    else {
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);
        // 沒有指定partitioner時(shí),會(huì)為其選擇forward或者rebalance分區(qū)
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = new ForwardPartitioner<Object>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }
        // 創(chuàng)建StreamEdge填具,并將該StreamEdge添加到上游的輸出统舀,下游的輸入
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

一句話總結(jié):
首先匆骗,根據(jù)用戶通過Stream API自定義UDF編寫的程序劳景,即在DataStream上做的一系列轉(zhuǎn)換(map、shuffle碉就、window等)盟广,我們可以得到StreamTransformation集合。然后通過調(diào)用streamGraphGenerator.generate(env, transformations)瓮钥,遍歷transformations集合筋量,并對(duì)其每一個(gè)StreamTransformation調(diào)用transform()方法,構(gòu)造出StreamNode碉熄,并通過StreamEdge與上游的transformation進(jìn)行連接桨武,此處需要特別注意對(duì)邏輯轉(zhuǎn)換(partition等)的處理,最后構(gòu)造出StreamGraph锈津。

Step2:StreamGraph —> JobGraph

本小節(jié)主要介紹Flink是如何將StreamGraph轉(zhuǎn)換成JobGraph的呀酸。該轉(zhuǎn)換的關(guān)鍵在于,將多個(gè)符合條件的StreamNode節(jié)點(diǎn)chain在一起作為一個(gè)JobVertex節(jié)點(diǎn)琼梆,這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗性誉。

找突破口

JobGraph的相關(guān)數(shù)據(jù)結(jié)構(gòu)主要在flink-runtime模塊下的org.apache.flink.runtime.jobgraph包中。構(gòu)造JobGraph的代碼主要集中在StreamingJobGraphGenerator類中茎杂,開啟構(gòu)建之路的入口函數(shù)是StreamingJobGraphGenerator(streamGraph).createJobGraph()错览。

理關(guān)鍵點(diǎn)

判斷算子chain,合并創(chuàng)建JobVertex煌往,并生成JobEdge倾哺。JobVertexJobEdge之間通過創(chuàng)建IntermediateDataSet來連接」舨保可以簡(jiǎn)單分為3個(gè)關(guān)鍵點(diǎn):

  • chain的判斷
  • 生成JobVertex
  • 創(chuàng)建JobEdgeIntermediateDataSet
構(gòu)建過程

入口:

private JobGraph createJobGraph() {

    ...
    setChaining();  //遞歸創(chuàng)建JobVertex羞海、JobEdge、IntermediateDataSet曾棕,用以構(gòu)建JobGraph
    ...
    return jobGraph;
}

真正的構(gòu)建過程:

void setChaining(){
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        createChain(sourceNodeId, sourceNodeId, ...);
    }
}

private List<StreamEdge> createChain(Integer startNodeId, Integer currentNodeId, ...){
    ...
    // 將當(dāng)前節(jié)點(diǎn)的出邊分為兩類:chainableOutputs和nonChainableOutputs
    for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
        if (isChainable(outEdge, streamGraph)) {
            chainableOutputs.add(outEdge);
        } else {
            nonChainableOutputs.add(outEdge);
        }
    }
    // 分別遍歷chainableOutputs和nonChainableOutputs扣猫,遞歸調(diào)用自身方法creatChain()
    // 并將nonChainableOutputs的邊或者chainableOutputs調(diào)用createChain()的返回值添加到transitiveOutEdges中
    for (StreamEdge chainable : chainableOutputs) {
        transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), ...));
    }
    for (StreamEdge nonChainable : nonChainableOutputs) {
        transitiveOutEdges.add(nonChainable);
        createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), ...);
    }
    // 如果當(dāng)前節(jié)點(diǎn)是起始節(jié)點(diǎn),則直接創(chuàng)建JobVertex翘地,否則返回一個(gè)空的SteamConfig
    StreamConfig config = currentNodeId.equals(startNodeId)
        ? createJobVertex(startNodeId, ...) : new StreamConfig(new Configuration());
    // 將StreamNode中的配置信息序列化到StreamConfig中
    setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
    // 再次判斷申尤,如果是chain的起始節(jié)點(diǎn)癌幕,執(zhí)行connect()方法,創(chuàng)建JobEdge和IntermediateDataSet昧穿;否則將當(dāng)前節(jié)點(diǎn)的StreamConfig添加到chainedConfig中
    if (currentNodeId.equals(startNodeId)) {
        for (StreamEdge edge : transitiveOutEdges) {
            connect(startNodeId, edge);
        }
    } else {
        chainedConfigs.get(startNodeId).put(currentNodeId, config);
    }
    ...
    return transitiveOutEdges;
}

setChaining()會(huì)依次對(duì)source調(diào)用createChain()方法勺远,該方法會(huì)遞歸調(diào)用其下游節(jié)點(diǎn),從而構(gòu)建出node chains时鸵。createChain()會(huì)分析當(dāng)前節(jié)點(diǎn)的出邊胶逢,根據(jù)Operator Chains中的條件進(jìn)行判斷isChainable(),并將出邊分成兩類:chainalbeOutputsnoChainableOutputs饰潜,接著分別遞歸調(diào)用自身方法初坠。之后會(huì)將StreamNode中的配置信息序列化到StreamConfig中。如果當(dāng)前不是chain中的子節(jié)點(diǎn)彭雾,則會(huì)構(gòu)建JobVertexJobEdge相連碟刺。如果是chain中的子節(jié)點(diǎn),則會(huì)將StreamConfig添加到該chainchainedConfigs集合中薯酝。

示例講解:

同樣地半沽,以SocketWindowWordCount為例,我們分析下其創(chuàng)建過程:

StreamGraph

如上圖所示吴菠,我們先給4個(gè)StreamNode節(jié)點(diǎn)進(jìn)行編號(hào)者填,Source1表示,Flat Map2表示做葵,Trigger Window3表示占哟,Sink4表示;相應(yīng)地蜂挪,3StreamEdge則分別用1->2重挑,2->33->4表示棠涮。

遞歸調(diào)用過程如下:

  • 遞歸始于Source谬哀,調(diào)用createChain(1, 1),當(dāng)前節(jié)點(diǎn)1的出邊為1->2严肪,不可chain史煎,將邊1->2直接加入transitiveOutEdges
  • 然后遞歸調(diào)用createChain(2, 2)驳糯,當(dāng)前節(jié)點(diǎn)2的出邊為2->3篇梭,同樣的,不可chain酝枢,將邊2->3直接加入transitiveOutEdges恬偷;
  • 繼續(xù)遞歸調(diào)用createChain(3, 3),當(dāng)前節(jié)點(diǎn)3的出邊為3->4帘睦,要注意了袍患,可chain坦康,等著將下游createChain()的返回值加入transitiveOutEdges
  • 此處遞歸調(diào)用createChain(3, 4)诡延,當(dāng)前節(jié)點(diǎn)4沒有出邊滞欠,遞歸終止。

遞歸結(jié)束條件:

  • 當(dāng)前節(jié)點(diǎn)不再有出邊集合肆良,即streamGraph.getStreamNode(currentId).getOutEdges()為空
  • 當(dāng)前節(jié)點(diǎn)已經(jīng)轉(zhuǎn)換完成筛璧,即builtVertices.contains(startNodeId)false

遞歸調(diào)用過程中各種操作以及變量情況一覽表如下:

creatChain() getOutEdges() chainAble nonChainAble transitiveOutEdges JobVertex connect()
(1, 1) 1->2 1->2 1->2 JobVertex Y
(2, 2) 2->3 2->3 2->3 JobVertex Y
(3, 3) 3->4 3->4 JobVertex Y
(3, 4) StreamConfig N

關(guān)鍵方法分析:

isChainable(),用來判斷StreamNode chain惹恃,一共有9個(gè)條件:

  • 下游節(jié)點(diǎn)的入邊為1
  • StreamEdge的下游節(jié)點(diǎn)對(duì)應(yīng)的算子不為null
  • StreamEdge的上游節(jié)點(diǎn)對(duì)應(yīng)的算子不為null
  • StreamEdge的上下游節(jié)點(diǎn)擁有相同的slotSharingGroup夭谤,默認(rèn)都是default
  • 下游算子的連接策略為ALWAYS
  • 上游算子的連接策略為ALWAYS或者HEAD
  • StreamEdge的分區(qū)類型為ForwardPartitioner
  • 上下游節(jié)點(diǎn)的并行度一致
  • 當(dāng)前StreamGraph允許做chain

createJobVertex(),用來創(chuàng)建JobVertex節(jié)點(diǎn)座舍,并返回StreamConfig沮翔。

createJobVertex()傳入的參數(shù)為StreamNode。首先會(huì)通過new JobVertex()構(gòu)造出JobVertex節(jié)點(diǎn)曲秉,然后通過JobVertex.setInvokableClass(streamNode.getJobVertexClass())設(shè)置運(yùn)行時(shí)執(zhí)行類,再通過jobVertex.setParallelism(parallelism)設(shè)置并行度疲牵,最后返回StreamConfig承二。

connect(),用來創(chuàng)建JobEdgeIntermediateDataSet纲爸,連接上下游JobVertex節(jié)點(diǎn)亥鸠。

遍歷transitiveOutEdges,并將每一條StreamEdge邊作為參數(shù)傳入connect( )函數(shù)中识啦。接下來就是依據(jù)StreamEdge得到上下游JobVertex節(jié)點(diǎn)负蚊;然后,通過StreamEdge.getPartitioner()方法得到StreamPartitioner屬性颓哮,對(duì)于ForwardPartitionerRescalePartitioner兩種分區(qū)方式建立DistributionPattern.POINTWISE類型的JobEdgeIntermediateDataSet家妆,而其他的分區(qū)方式則是DistributionPattern.ALL_TO_ALL類型。至此已經(jīng)建立好上下游JobVertex節(jié)點(diǎn)間的聯(lián)系冕茅。

一句話總結(jié):

首先伤极,通過streamGraph.getSourceIDs()拿到source節(jié)點(diǎn)集合,緊接著依次從source節(jié)點(diǎn)開始遍歷姨伤,判斷StreamNode Chain哨坪,遞歸創(chuàng)建JobVertex,所以乍楚,其真正的處理順序其實(shí)是從sink開始的当编。然后通過connect()遍歷當(dāng)前節(jié)點(diǎn)的物理出邊transitiveOutEdges集合,創(chuàng)建JobEdge徒溪,建立當(dāng)前節(jié)點(diǎn)與下游節(jié)點(diǎn)的聯(lián)系忿偷,即JobVertexIntermediateDataSet之間拧篮。

Step3:JobGraph —> ExecutionGraph

本小節(jié)主要介紹Flink是如何將JobGraph轉(zhuǎn)換成ExecutionGraph的。簡(jiǎn)單來說牵舱,就是并行化JobGraph串绩,為調(diào)度做好準(zhǔn)備。

找突破口

ExecutionGraph的相關(guān)數(shù)據(jù)結(jié)構(gòu)主要在org.apache.flink.runtime.executiongraph包中芜壁,構(gòu)造ExecutionGraph的代碼集中在ExecutionGraphBuilder類和ExecutionGraph類中礁凡,入口函數(shù)是ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)

理關(guān)鍵點(diǎn)
  • 客戶端提交JobGraphJobManager
  • 構(gòu)建ExecutionGraph對(duì)象
    • JobGraph進(jìn)行拓?fù)渑判蚧弁玫?code>sortedTopology頂點(diǎn)集合
    • JobVertex封裝成ExecutionJobVertex
    • ExecutionVertex節(jié)點(diǎn)通過ExecutionEdge連接起來
構(gòu)建過程

1顷牌、JobClient提交JobGraphJobManager

一個(gè)程序的JobGraph真正被提交始于對(duì)JobClientsubmitJobAndWait()方法的調(diào)用,而且submitJobAndWait()方法會(huì)觸發(fā)基于AkkaActor之間的消息通信塞淹。JobClient在這其中起到了“橋接”的作用窟蓝,它連接了同步的方法調(diào)用和異步的消息通信。

submitJobAndWait()方法中饱普,首先會(huì)創(chuàng)建一個(gè)JobClientActorActorRef运挫,并向其發(fā)送一個(gè)包含JobGraph實(shí)例的SubmitJobAndWait消息。該SubmitJobAndWait消息被JobClientActor接收后套耕,調(diào)用trySubmitJob()方法觸發(fā)真正的提交動(dòng)作谁帕,即通過jobManager.tell( )的方式給JobManager Actor發(fā)送封裝JobGraphSubmitJob消息。隨后冯袍,JobManager Actor會(huì)接收到來自JobClientActor的該SubmitJob消息匈挖,進(jìn)而觸發(fā)submitJob()方法。

由此可見康愤,一個(gè)JobGraph從提交開始會(huì)經(jīng)過多個(gè)對(duì)象層層遞交儡循,各個(gè)對(duì)象之間的交互關(guān)系如下圖所示:

Actor交互

2、構(gòu)建ExecutionGraph對(duì)象

入口:
JobManager作為Actor征冷,在handleMessage()方法中择膝,針對(duì)SubmitJob消息調(diào)用submitJob()方法。

private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
    ...
    executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, jobGraph, ...)
    ...
}

真正的構(gòu)建過程:

public static ExecutionGraph buildGraph(){
    ...
    //對(duì)JobGraph中的JobVertex節(jié)點(diǎn)進(jìn)行拓?fù)渑判蜃手眩玫絃ist<JobVertex>
    List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
    executionGraph.attachJobGraph(sortedTopology);  //構(gòu)建ExecutionGraph的核心方法
    ...
}

由上可知调榄,attachJobGraph()方法是構(gòu)建ExecutionGraph圖結(jié)構(gòu)的核心方法。

public void attachJobGraph(List<JobVertex> topologiallySorted){
    ...
    for (JobVertex jobVertex : topologiallySorted) {
        ...
        // create the execution job vertex and attach it to the graph
        ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
        ejv.connectToPredecessors(this.intermediateResults);
        ...
    }
    ...
}

下面詳細(xì)分析下呵扛,attachJobGraph()方法主要完成的兩件事情:

  • JobVertex封裝成ExecutionJobVertex
  • 把節(jié)點(diǎn)通過ExecutionEdge連接

關(guān)鍵方法分析:

new ExecutionJobVertex()方法每庆,用來將一個(gè)個(gè)JobVertex封裝成ExecutionJobVertex,并依次創(chuàng)建ExecutionVertex今穿、Execution缤灵、IntermediateResultIntermediateResultPartition,用于豐富ExecutionGraph腮出。

ExecutionJobVertex的構(gòu)造函數(shù)中帖鸦,首先是依據(jù)對(duì)應(yīng)的JobVertex的并發(fā)度,生成對(duì)應(yīng)個(gè)數(shù)的ExecutionVertex胚嘲。其中作儿,一個(gè)ExecutionVertex代表著一個(gè)ExecutionJobVertex的并發(fā)子task。然后是將原來JobVertex的中間結(jié)果IntermediateDataSet轉(zhuǎn)化為ExecutionGraph中的IntermediateResult馋劈。

類似的攻锰,ExecutionVertex的構(gòu)造函數(shù)中,首先會(huì)創(chuàng)建IntermediateResultPartition妓雾,并通過IntermediateResult.setPartition( )建立IntermediateResultIntermediateResultPartition之間的關(guān)系娶吞;然后生成Execution,并配置資源相關(guān)械姻。

新創(chuàng)建的ExecutionJobVertex調(diào)用ejv.connectToPredecessor()方法妒蛇,按照不同的分發(fā)策略連接上游,其參數(shù)為上游生成的IntermediateResult集合楷拳。其中绣夺,根據(jù)JobEdge中兩種不同的DistributionPattern屬性,分別調(diào)用connectPointWise()或者connectAllToAll( )方法唯竹,創(chuàng)建ExecutionEdge乐导,將ExecutionVertex和上游的IntermediateResultPartition連接起來。

其中浸颓,SocketWindowWordCount示例中,就是采用了connectAllToAll()的方式建立與上游的關(guān)系旺拉。

接下來产上,我們?cè)敿?xì)介紹下connectPointWise()方法的實(shí)現(xiàn),即DistributionPattern.POINTWISE策略蛾狗,該策略用來連接當(dāng)前ExecutionVertex與上游的IntermediateResultPartition晋涣。首先,獲取上游IntermediateResultpartition數(shù)沉桌,用numSources表示谢鹊,以及此ExecutionJobVertex的并發(fā)度,用parallelism表示留凭;然后佃扼,根據(jù)其并行度的不同,分別創(chuàng)建ExecutionEdge蔼夜。共分3種情況:

(1) 如果并發(fā)數(shù)等于partition數(shù)兼耀,則一對(duì)一進(jìn)行連接。如下圖所示:
numSources == parallelism

OneToOne

(2) 如果并發(fā)數(shù)大于partition數(shù),則一對(duì)多進(jìn)行連接瘤运。如下圖所示:
numSources < parallelism窍霞,且parallelism % numSources == 0

OneToMany-1

numSources < parallelism,且parallelism % numSources != 0
OneToMany-2

(3) 如果并發(fā)數(shù)小于partition數(shù)拯坟,則多對(duì)一進(jìn)行連接但金。如下圖所示:
numSources > parallelism,且numSources % parallelism == 0

ManyToOne-1

numSources > parallelism郁季,且numSources % parallelism != 0
ManyToOne-2

一句話總結(jié):

JobGraph按照拓?fù)渑判蚝蟮玫揭粋€(gè)JobVertex集合冷溃,遍歷該JobVertex集合,即從source開始巩踏,將JobVertex封裝成ExecutionJobVertex秃诵,并依次創(chuàng)建ExecutionVertexExecution塞琼、IntermediateResultIntermediateResultPartition菠净。然后通過ejv.connectToPredecessor()方法,創(chuàng)建ExecutionEdge彪杉,建立當(dāng)前節(jié)點(diǎn)與其上游節(jié)點(diǎn)之間的聯(lián)系毅往,即連接ExecutionVertexIntermediateResultPartition

終章

構(gòu)建好ExecutionGraph派近,接下來會(huì)基于ExecutionGraph觸發(fā)Job的調(diào)度攀唯,申請(qǐng)Slot,真正的部署任務(wù)渴丸,這是Task被執(zhí)行的前提:

if (leaderElectionService.hasLeadership) {
    log.info(s"Scheduling job $jobId ($jobName).")
    executionGraph.scheduleForExecution()  // 將生成好的ExecutionGraph進(jìn)行調(diào)度
} else {
    self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
    log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
              "this. I am not scheduling the job for execution.")
}

下一篇文章將介紹FlinkSchedule機(jī)制侯嘀,敬請(qǐng)期待~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市谱轨,隨后出現(xiàn)的幾起案子戒幔,更是在濱河造成了極大的恐慌,老刑警劉巖土童,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诗茎,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡献汗,警方通過查閱死者的電腦和手機(jī)敢订,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來罢吃,“玉大人楚午,你說我怎么就攤上這事∪恤铮” “怎么了醒叁?”我有些...
    開封第一講書人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我把沼,道長啊易,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任饮睬,我火速辦了婚禮租谈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘捆愁。我一直安慰自己割去,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開白布昼丑。 她就那樣靜靜地躺著呻逆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪菩帝。 梳的紋絲不亂的頭發(fā)上咖城,一...
    開封第一講書人閱讀 49,071評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音呼奢,去河邊找鬼宜雀。 笑死,一個(gè)胖子當(dāng)著我的面吹牛握础,可吹牛的內(nèi)容都是我干的辐董。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼禀综,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼简烘!你這毒婦竟也來了绊茧?” 一聲冷哼從身側(cè)響起馋艺,我...
    開封第一講書人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎媳拴,沒想到半個(gè)月后依鸥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡悼沈,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年贱迟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片絮供。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡衣吠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出壤靶,到底是詐尸還是另有隱情缚俏,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站忧换,受9級(jí)特大地震影響恬惯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亚茬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一酪耳、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧刹缝,春花似錦碗暗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至颂砸,卻和暖如春噪奄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沾凄。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來泰國打工梗醇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人撒蟀。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓叙谨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親保屯。 傳聞我的和親對(duì)象是個(gè)殘疾皇子手负,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345