Flink-1.10 源碼筆記 StreamGraph生成

Flink 1.10 源碼 -- SreamGraph

生成JobGraph

Flink的執(zhí)行計(jì)劃分為四層

img

StreamGraph
用戶層面上的, 用戶根據(jù)API編寫(xiě)的程序構(gòu)造出一個(gè)代表拓補(bǔ)圖結(jié)構(gòu)的StreamGraph
相關(guān)代碼在flink-streaming-java項(xiàng)目下StreamGraphGenerator類(lèi)的generate方法中,該方法會(huì)在StreamExecutionEnvironment.execute()方法調(diào)用到,也就是說(shuō)StreamGraph 是在Client端構(gòu)造的,意味著我們可以通過(guò)本地調(diào)試觀察StreamGraph的構(gòu)造過(guò)程

在構(gòu)建StreamGraph之前先了解一下Transformation

Transformation

在Flink流處理中,算子最終會(huì)將算子轉(zhuǎn)換成一個(gè)Transformation, 每個(gè)Transformation表示 一個(gè)DataStream或多個(gè)DataStream轉(zhuǎn)換成新DataStream的操作, 比如,map,filter,union等

我們根據(jù)map看一下轉(zhuǎn)換的一個(gè)過(guò)程,這是DataStream的map方法,接收用戶傳入的函數(shù),將函數(shù)包裝為StreamMap類(lèi)型,調(diào)用transform方法

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

跟進(jìn)transform方法 這里將map封裝為StreamOperatorFactory

    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperator<T, R> operator) {

        //這里將operator封裝入operator factory
        return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
    }

進(jìn)入doTransform方法,在這里將 operator封裝成了一個(gè)OneInputTransformation對(duì)象,也就是transformation對(duì)象

1591167501761.png
protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 檢查輸出類(lèi)型是否為MissingTypeInfo,如果是拋出異常,
        transformation.getOutputType();

        //創(chuàng)建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            transformation,          //input   --上游的 transformation
                operatorName,
                operatorFactory,     //需要進(jìn)行轉(zhuǎn)換操作的
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //多個(gè)級(jí)聯(lián)的map和filter操作會(huì)被transform成為一連串的OneInputTransformation形纺。
        // 后一個(gè)transformation的input指向前一個(gè)transformation
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

最終通過(guò)getExecutionEnvironment().addOperator添加到列表中

getExecutionEnvironment獲取執(zhí)行環(huán)境,
addOperator添加列表中
到這里算子就被封裝為transformation對(duì)象,保存到了StreamExecutionEnvironment中


    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;
    }
    
    protected final List<Transformation<?>> transformations = new ArrayList<>();
    
    @Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

StreamOperator

DataStream上的每一個(gè)transformation都對(duì)應(yīng)一個(gè)StreamOperator,StreamOperator是運(yùn)行時(shí)具體的實(shí)現(xiàn),會(huì)決定User-Defined Funtion(udf),的調(diào)用方式,可以看出托呕,所有實(shí)現(xiàn)類(lèi)都繼承了AbstractStreamOperator沫换。另外除了 project 操作席爽,其他所有可以執(zhí)行UDF代碼的實(shí)現(xiàn)類(lèi)都繼承自AbstractUdfStreamOperator,該類(lèi)是封裝了UDF的StreamOperator闽寡。UDF就是實(shí)現(xiàn)了Function接口的類(lèi)霸株,如MapFunction,FilterFunction

StreamOperator.png

transformation的類(lèi)型

1591168455176.png

OneInputTransformation
顧名思義OneInputTransformation只有一個(gè)輸入。代表的算子(單數(shù)據(jù)流)為:map狮辽,flatMap,fliter巢寡,process,assignTimestamps等椰苟。

TwoInputTransformation
TwoInputTransformation具有兩個(gè)輸入抑月。ConnectedStream的算子為雙流運(yùn)算,它的算子會(huì)被轉(zhuǎn)換為T(mén)woInputTransformation舆蝴。

SourceTransformation
在env中配置數(shù)據(jù)源的時(shí)候會(huì)創(chuàng)建出一個(gè)DataStreamSource谦絮。該對(duì)象為dataStream的源頭。DataStreamSource的構(gòu)造函數(shù)中會(huì)創(chuàng)建一個(gè)SourceTransformation洁仗。

SinkTransformation
和SourceTransformation類(lèi)似层皱,在dataStream調(diào)用addSink方法的時(shí)候會(huì)生成一個(gè)DataStreamSink對(duì)象。該對(duì)象在創(chuàng)建的時(shí)候會(huì)同時(shí)構(gòu)造一個(gè)SinkTransformation赠潦。

UnionTransformation
該transformation為合并多個(gè)input到一個(gè)流中叫胖。代表算子為union。

SplitTransformation
DataStream調(diào)用split的時(shí)候會(huì)創(chuàng)建SplitStream她奥。SplitStream初始化時(shí)會(huì)構(gòu)建一個(gè)SplitTransformation瓮增。

SelectTransformation
SplitStream在調(diào)用select算子的時(shí)候會(huì)創(chuàng)建SelectTransformation怎棱。

FeedbackTransformation
創(chuàng)建IterativeStream的時(shí)候會(huì)使用到該transformation。

CoFeedbackTransformation
和FeedbackTransformation類(lèi)似绷跑,創(chuàng)建ConnectedIterativeStream的時(shí)候會(huì)使用到拳恋。

PartitionTransformation
涉及到控制數(shù)據(jù)流向的算子都屬于PartitionTransformation,例如shuffle砸捏,forward谬运,rebalance,broadcast垦藏,rescale梆暖,global,partitionCustom和keyBy等膝藕。

SideOutputTransformation
調(diào)用getSideOutput(獲取旁路輸出)的時(shí)候式廷,SideOutputTransformation會(huì)發(fā)生作用。

構(gòu)建StreamGraph

知道了算子轉(zhuǎn)換后的操作后,進(jìn)入到生成StreamGraph的邏輯,上面已經(jīng)知道了,生說(shuō)StreamGraph的邏輯在StreamExecutionEnvironment的execute方法中,在execute方法中調(diào)用調(diào)用的getStreamGraph方法生成StreamGraph

    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        
        //調(diào)用 execute方法, 需要傳入一個(gè)StreamGraph 
        // getStreamGraph 獲取StreamGraph
        return execute(getStreamGraph(jobName));
    }

進(jìn)入getStreamGraph,這里調(diào)用了同名方法

@Internal
public StreamGraph getStreamGraph(String jobName) {   
    return getStreamGraph(jobName, true);
}

繼續(xù)追進(jìn)getStreamGraph方法

    public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
        // getStreamGraphGenerator 創(chuàng)建一個(gè)StreamGraphGenerator對(duì)象
        // generate 生成StreamGraph
        // 生成StreamGraph
        StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
        if (clearTransformations) {
            this.transformations.clear();
        }
        return streamGraph;
    }

getStreamGraphGenerator方法,這個(gè)方法會(huì)生成一個(gè)StreamGraphGenerator對(duì)象,StreamGraph是通過(guò)這個(gè)類(lèi)進(jìn)行生成的

    private StreamGraphGenerator getStreamGraphGenerator() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        // 創(chuàng)建 StreamGraphGenerator對(duì)象 并設(shè)置一系列的參數(shù)
        return new StreamGraphGenerator(transformations, config, checkpointCfg)
            .setStateBackend(defaultStateBackend)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout);
    }

StreamGraphGeneratorgenerate方法,在該方法中會(huì)設(shè)置一些參數(shù),以及調(diào)用transform方法,最終獲取生成的StreamGraph進(jìn)行返回

    public StreamGraph generate() {
        // 生成StreamGraph對(duì)象芭挽,傳入執(zhí)行配置和檢查點(diǎn)配置
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        // 設(shè)置狀態(tài)后端
        streamGraph.setStateBackend(stateBackend);
        // 設(shè)置級(jí)聯(lián)配置滑废,為一項(xiàng)優(yōu)化配置  -- 是否可以將算子 chain在一起
        streamGraph.setChaining(chaining);
        // 設(shè)置調(diào)度方式,決定task延遲調(diào)度還是立刻調(diào)度
        streamGraph.setScheduleMode(scheduleMode);
        // StreamExecutionEnvironment的cacheFile會(huì)傳入該變量
        // cacheFile為需要分發(fā)到各個(gè)task manager的用戶文件
        // todo 用于分布式緩存的
        streamGraph.setUserArtifacts(userArtifacts);
        streamGraph.setTimeCharacteristic(timeCharacteristic);
        streamGraph.setJobName(jobName);
        // 設(shè)置各個(gè)級(jí)聯(lián)之間是否采用blocking 連接
        streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
        // 儲(chǔ)存已經(jīng)被處理的transformation
        alreadyTransformed = new HashMap<>();

        // 逐個(gè)處理transformation
        for (Transformation<?> transformation: transformations) {
        //該方法會(huì)進(jìn)行遞歸調(diào)用,主要邏輯也在這里
            transform(transformation);
        }

        // 獲取已生成的streamGraph
        final StreamGraph builtStreamGraph = streamGraph;

        // 清空中間變量
        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

追進(jìn)transform方法,主要處理所里都在這個(gè)方法中,

這個(gè)方法是一個(gè)遞歸的方法,在獲取transformedIds的邏輯中會(huì)遞歸的調(diào)用該方法, 也就是說(shuō)每一個(gè)transform都會(huì)遍歷去尋找上游,保證上游全部處理完
這個(gè)方法會(huì)將用戶編寫(xiě)代碼轉(zhuǎn)換成StreamGraph,在這之前已經(jīng)將算子轉(zhuǎn)為一個(gè)transform并添加到了env的transformas(List集合)中,這里會(huì)將遍歷transforms集合,將transform構(gòu)建成StreamNode以及StreamEdge
主要邏輯都在處理不同的transform方法中, 這里會(huì)對(duì)每個(gè)transform進(jìn)行判斷實(shí)例類(lèi)型,不同的transform實(shí)例會(huì)調(diào)用不同方法進(jìn)行處理


private Collection<Integer> transform(Transformation<?> transform) { //transform 為每一個(gè)算子

        //如果轉(zhuǎn)換后的map中包含當(dāng)前要轉(zhuǎn)換的 transform 那么說(shuō)明已經(jīng)被處理過(guò)了,直接返回
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);


        // 如果transformation的最大并行度沒(méi)有設(shè)置袜爪,全局的最大并行度已設(shè)置蠕趁,將全局最大并行度設(shè)置給transformation
        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        // 檢查transformation的輸出類(lèi)型,如果是MissingTypeInfo則程序拋出異常
        transform.getOutputType();

        Collection<Integer> transformedIds;
        // 根據(jù)不同的 transform 調(diào)用不同的處理方法
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof AbstractMultipleInputTransformation<?>) {
            transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
        } else if (transform instanceof SourceTransformation) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof LegacySourceTransformation<?>) {
            transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        // 如果該transformation沒(méi)有被處理辛馆,則加入已處理列表
        // 處理每個(gè)transformation的時(shí)候會(huì)先處理它的input(可能沒(méi)有input俺陋,也可能有一個(gè)或多個(gè)),transform方法會(huì)遞歸調(diào)用昙篙。
        // 在transform方法執(zhí)行前后雙重檢查transformation是否已被處理可以確保在遞歸調(diào)用的情況下不會(huì)被重復(fù)處理
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        // 設(shè)置 transform(network)的 緩沖器超時(shí)時(shí)間, 如果沒(méi)有設(shè)置則使用的默認(rèn)的  100L
        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }

        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }

        // 設(shè)置TransformationUserHash
        // transform.getUserProvidedNodeHash  獲取uid的hash, 在用戶編寫(xiě)代碼的時(shí)候在算子后面調(diào)用setUidHash設(shè)置的
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        // 如果自動(dòng)設(shè)置uid功能被關(guān)閉腊状,同時(shí)又沒(méi)有指定UserProvidedNodeHash和uid,程序拋出異常
        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
            if (transform instanceof PhysicalTransformation &&
                    transform.getUserProvidedNodeHash() == null &&
                    transform.getUid() == null) {
                throw new IllegalStateException("Auto generated UIDs have been disabled " +
                    "but no UID or hash has been assigned to operator " + transform.getName());
            }
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        // 設(shè)置托管內(nèi)存權(quán)重
        // 設(shè)置transformation的最小和最佳資源要求
        streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());

        return transformedIds;
    }

在處理transform的時(shí)候,會(huì)根據(jù)不同的transform進(jìn)行不同的處理,大部分處理邏輯都是類(lèi)似,這里查看一下transformOneInputTransform,SourceTransformation處理邏輯

transformOneInputTransform 該transform 方法的主要作用是在會(huì)遞歸的轉(zhuǎn)換input,在Graph中創(chuàng)建一個(gè)新的StreamNode,并將input連接到這個(gè)新節(jié)點(diǎn) (input -> 上游節(jié)點(diǎn)<StreamNode>)

在這個(gè)方法中主要的邏輯
1. 遞歸遍歷他的上游input,確保上游處理完畢并添加edge,并判斷如果遞歸過(guò)程中已經(jīng)處理完,那么直接返回
2. 調(diào)用appOperator方法,將transform轉(zhuǎn)換成StreamNode并添加到StreamNodes列表中
3. 如果該transform需要進(jìn)行keySelector進(jìn)行分區(qū),則會(huì)設(shè)置keySelector和序列化器
4. 調(diào)用addEdge方法將講個(gè)StreamNode連接

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        //首先會(huì)遞歸遍歷他的每一個(gè)上游input苔可,保證上游全部處理完畢缴挖。然后添加Edge
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        // 在遞歸的過(guò)程中,已經(jīng)轉(zhuǎn)換完畢, 直接返回,防止重復(fù)處理
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        // 獲取 slot 共享組  --  因?yàn)橛锌赡芏鄠€(gè)算子會(huì)被chain在一起 這樣可以提高slot的資源利用率
        // 用于后面調(diào)度task的
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
        // 在 streamGraph中 添加算子
        // 方法會(huì)將 operator(transform) 轉(zhuǎn)換成StreamNode,并添加到 StreamNodes列表中
        streamGraph.addOperator(transform.getId(), //transform的唯一id
                slotSharingGroup,   // slot 共享組
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(), //獲取transform的StreamOperatorFactory
                transform.getInputType(), //獲取輸入類(lèi)型
                transform.getOutputType(), // 獲取輸出類(lèi)型
                transform.getName()); // 獲取 該轉(zhuǎn)換的name

        // 如果該轉(zhuǎn)換需要進(jìn)行keySelector進(jìn)行分區(qū) 需要對(duì)key進(jìn)行序列話及設(shè)置key的eKeySelector
        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        //獲取 transform的平行度
        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            transform.getParallelism() : executionConfig.getParallelism();
        //設(shè)置 transformId的并行度
        streamGraph.setParallelism(transform.getId(), parallelism);
        //設(shè)置 最大并行度
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        //循環(huán)上游的input, 保證上游全部處理完
        // 這一步是關(guān)鍵,添加一個(gè)edge對(duì)象焚辅,將此oneInputTransformation轉(zhuǎn)換成的vertex和input transformation轉(zhuǎn)換為的vertex連接起來(lái)
        for (Integer inputId: inputIds) {
            //將 input 添加到 edge
            // 兩個(gè)StreamNode 會(huì)通過(guò)StreamEdge連接在一起,   每一個(gè)transform會(huì)生成一個(gè)StreamNode
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

現(xiàn)在看一下 appOperator和addEdge方法

先看一下appOperator方法
該法最主要的是調(diào)用了addNode

            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo,
            String operatorName,
            Class<? extends AbstractInvokable> invokableClass) {
        // 將transform 轉(zhuǎn)換成StreamNode 并添加到StreamNodes列表中
        addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
        // 添加序列化
        setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));

        //設(shè)置operatorFactory輸出類(lèi)型
        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            // sets the output type which must be know at StreamGraph creation time
            operatorFactory.setOutputType(outTypeInfo, executionConfig);
        }
        //設(shè)置operatorFactory輸入類(lèi)型
        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, executionConfig);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", vertexID);
        }
    }

進(jìn)入到addNode, 這個(gè)方法中會(huì)將transform轉(zhuǎn)換成StreamNode,并添加到Nodes列表中

    protected StreamNode addNode(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperatorFactory<?> operatorFactory,
            String operatorName) {

        //如果StreamNodes中存在該 node 拋出異常
        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        //創(chuàng)建一個(gè) StreamNode  --> transform 轉(zhuǎn)換成 StreamNode 在轉(zhuǎn)換成Vertex --> JobVertex
        StreamNode vertex = new StreamNode(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                operatorName,
                new ArrayList<OutputSelector<?>>(),
                vertexClass);

        // 將 StreamNode添加到集合
        streamNodes.put(vertexID, vertex);

        return vertex;
    }

在看一下addEdge方法

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        addEdgeInternal(upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                null,
                new ArrayList<String>(),
                null,
                null);

    }

進(jìn)入addEdgeInternal方法,這個(gè)方法是添加Edge的邏輯

在這個(gè)方法中主要的邏輯,判斷上游是否為一下幾種transform ,virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes, 如果是則將他們寫(xiě)入到虛擬節(jié)點(diǎn)中,然后創(chuàng)建StremaEdge,將兩個(gè)StreamNode進(jìn)行連接,并將虛擬節(jié)點(diǎn)的信息寫(xiě)入到StreamEdge中,否則直接將兩個(gè)StreamNode通過(guò)StreamEdge進(jìn)行連接

虛擬節(jié)點(diǎn)
這幾類(lèi)的transform:irtualSideOutputNodes映屋,virtualSelectNodes和virtualPartitionNodes,這幾類(lèi)transform會(huì)被處理成虛擬節(jié)點(diǎn), 虛擬節(jié)點(diǎn)是什么時(shí)候生成的,是在對(duì)不同的transform處理邏輯中生成的,在這幾種的transform處理邏輯中會(huì)將transform添加到虛擬節(jié)點(diǎn), 嚴(yán)格的說(shuō)虛擬節(jié)點(diǎn)并不屬于StreamNode,不包含邏輯邏輯轉(zhuǎn)換,虛擬節(jié)點(diǎn)不會(huì)出現(xiàn)在StreamGraph圖中,在進(jìn)行edge過(guò)程中,會(huì)判斷上游是否為虛擬節(jié)點(diǎn),這里是通過(guò)遞歸的方式去獲取上游節(jié)點(diǎn)信息,直到找到非虛擬節(jié)點(diǎn)即StreamNode,會(huì)執(zhí)行edge邏輯,并將虛擬節(jié)點(diǎn)的信息記錄在StreamEdge中

    private void addEdgeInternal(Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag,
            ShuffleMode shuffleMode) {

        /**
         * todo virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes
         *  這幾類(lèi)transform都會(huì)被處理成虛擬節(jié)點(diǎn),當(dāng)下游生成StreamNode后,發(fā)現(xiàn)上游為虛擬節(jié)點(diǎn)
         *  會(huì)找到虛擬節(jié)點(diǎn)的上游,并創(chuàng)建StreamEdge與虛擬節(jié)點(diǎn)上游的transform進(jìn)行連接.并把
         *  虛擬節(jié)點(diǎn)的信息寫(xiě)入到StreamEdge中
         */
        // 當(dāng)上有是 SideOutput, 遞歸調(diào)用,并傳入SideOutput信息, 下面同理
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
        } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
            if (outputNames.isEmpty()) {
                // selections that happen downstream override earlier selections
                outputNames = virtualSelectNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            shuffleMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
        } else {
            // 獲取上下游 StreamNode
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            // 如果沒(méi)有指定分區(qū)器,并且上游和下游操作符的并行度匹配使用forward同蜻,則使用rebalance

            //                                      上游并行度        ==   下游并行度
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                //  使用 ForwardPartitioner 分區(qū)
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) { //否則使用 該分區(qū)
                partitioner = new RebalancePartitioner<Object>();
            }


            //進(jìn)行判斷 如果使用ForwardPartitioner分區(qū)  并且上下游并行度不相等 則不能使用該分區(qū)策略,會(huì)拋出異常
            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow " +
                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            // 如果沒(méi)有指定shuffle模式 則由框架決定shuffle模式
            if (shuffleMode == null) {
                shuffleMode = ShuffleMode.UNDEFINED;
            }

            // 創(chuàng)建一個(gè)新的StreamEdge
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

            // 連接剛剛創(chuàng)建的edge到上下游節(jié)點(diǎn)
            // getStreamNode  獲取到指定id的StreamNode
            // addOutEdge和 addInEdge  將edge添加到獲取的StreamNode中
            // 在 SteamNode類(lèi)中, 由兩個(gè)list 用于收集此egde
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }

transformSource方法

    private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
        // todo 如果用戶指定了組名棚点,則按原樣執(zhí)行。如果沒(méi)有指定任何內(nèi)容,
        //  并且輸入操作都具有相同的組名稱(chēng)湾蔓,則使用此名稱(chēng)瘫析。否則,選擇默認(rèn)組。
        String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());

        // 為StreamGraph 添加Source
        streamGraph.addSource(source.getId(),  //每個(gè)transform都會(huì)獲得一個(gè)遞增的id, 該id在后面會(huì)轉(zhuǎn)換成 vertexID
                slotSharingGroup,
                source.getCoLocationGroupKey(),
                source.getOperatorFactory(),
                null,
                source.getOutputType(),
                "Source: " + source.getName());

        //獲取設(shè)置并行度
        int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
                source.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(source.getId(), parallelism);
        streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());

        return Collections.singleton(source.getId());
    }

進(jìn)入到addSource方法,這個(gè)方法做了兩件事,
一個(gè)是將Source生成StreamNode添加到StreamNodes列表
另一個(gè)是將soutce添加到了StreamGraph的sources(為set集合)列表中
可以看到addSource中并沒(méi)有生成edge 因?yàn)閟ource屬于圖中的頂點(diǎn),所以只會(huì)通過(guò)下游去連接source節(jié)點(diǎn)

    public <IN, OUT> void addSource(
            Integer vertexID,  // transform的id
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            SourceOperatorFactory<OUT> operatorFactory,
            TypeInformation<IN> inTypeInfo,
            TypeInformation<OUT> outTypeInfo,
            String operatorName) {
        // 方法會(huì)將 operator(transform) 轉(zhuǎn)換成StreamNode,并添加到 StreamNodes列表中
        addOperator(
                vertexID,
                slotSharingGroup,
                coLocationGroup,
                operatorFactory,
                inTypeInfo,
                outTypeInfo,
                operatorName,
                SourceOperatorStreamTask.class);
        // 將source(operator -> transform)的Id添加的sources的set集合中
        // todo source有可能多個(gè) set保證了不會(huì)重復(fù)
        sources.add(vertexID);
    }

通過(guò)下面代碼,看一下轉(zhuǎn)換的過(guò)程


1591323006811.png

通過(guò)Debug斷點(diǎn)到構(gòu)建StreamGraph得時(shí)候,可以看到transformations列表存有四個(gè)transform,為map,window,process以及sink的transform,每個(gè)transform的input都指向上游的transform,其中source,keyBy的transform并沒(méi)有添加到transformations列表中,等下會(huì)進(jìn)行解釋


transformations
在上面我們發(fā)現(xiàn)沒(méi)有有些算子的transform并沒(méi)有添加到tranfromtions列表中現(xiàn)在解釋一下

source, 在創(chuàng)建source的時(shí)候,會(huì)創(chuàng)建一個(gè)sourceOperator,根據(jù)sourceOperator創(chuàng)建DataStreamSouce,在DataStramSource中會(huì)將sourceOperator構(gòu)建成SourceTransformation,最終構(gòu)建到DataStream,將transform賦值給DataStream的成員變量transform,在下游算子中,會(huì)根據(jù)成員變量的transform構(gòu)建自己的transform,該transform的input就是source,最終將構(gòu)建好的transform添加到env的transformations列表中

keyBy, 在keyBy中創(chuàng)建KeyedStream中,最終將keySelector賦值給了 keyedStream的變量中, KeySelector不會(huì)被添加到env的transformations中, 會(huì)將信息記錄在下游的算子中

keyBy算子,會(huì)創(chuàng)建一個(gè)keyedStream,并將keyBy傳入的值轉(zhuǎn)換成keyedSelector,賦值給keyedStream對(duì)象的成員變量中,在創(chuàng)建KyedStream的時(shí)候會(huì)創(chuàng)建一個(gè)PartitionTransformation,該transform會(huì)被賦值到DataStream的transformation變量中,在調(diào)用timeWindow的時(shí)候會(huì)將WindowAssigner以及this(即keyedStream)傳入創(chuàng)建WindowedStream,在aggregate算子中,會(huì)創(chuàng)建一個(gè)windowOperator,通過(guò)input.transform,將windowOperator轉(zhuǎn)換transform,添加到env的transformations中

keyBy會(huì)生成一個(gè) PartitionTransformation的 transform, 但是并不會(huì)添加到env的transformations的列表中,而是將信息記錄在了下游的transform中即window中,最終將window生成一個(gè)transform添加到列表

window生成 通過(guò)windowAssigner,keySelector,trigger和evictor等參數(shù)構(gòu)建windowOperator,最終通過(guò)transform方法轉(zhuǎn)換成一個(gè)transform并添加到env的transformations列表中,
在window中會(huì)包含他的上游input(即 keyBy<PartitionTransformation>)

當(dāng)所有transform都遍歷完,這時(shí)候構(gòu)建出StreamGraph,可以看到StreamNode中,已經(jīng)存在了除了keyBy的所有StreamNode,到這里StreamGraph就已經(jīng)構(gòu)建好了


StreamGraph生成后的StreamNodes
執(zhí)行圖

生成JobGraph

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末颁股,一起剝皮案震驚了整個(gè)濱河市么库,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌甘有,老刑警劉巖诉儒,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異亏掀,居然都是意外死亡忱反,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)滤愕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)温算,“玉大人,你說(shuō)我怎么就攤上這事间影∽⒏停” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵魂贬,是天一觀的道長(zhǎng)巩割。 經(jīng)常有香客問(wèn)我,道長(zhǎng)付燥,這世上最難降的妖魔是什么宣谈? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮键科,結(jié)果婚禮上闻丑,老公的妹妹穿的比我還像新娘。我一直安慰自己勋颖,他們只是感情好嗦嗡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著饭玲,像睡著了一般酸钦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咱枉,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音徒恋,去河邊找鬼蚕断。 笑死,一個(gè)胖子當(dāng)著我的面吹牛入挣,可吹牛的內(nèi)容都是我干的亿乳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼葛假!你這毒婦竟也來(lái)了障陶?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤聊训,失蹤者是張志新(化名)和其女友劉穎抱究,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體带斑,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鼓寺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了勋磕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妈候。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖挂滓,靈堂內(nèi)的尸體忽然破棺而出苦银,到底是詐尸還是另有隱情,我是刑警寧澤赶站,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布幔虏,位于F島的核電站,受9級(jí)特大地震影響亲怠,放射性物質(zhì)發(fā)生泄漏所计。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一团秽、第九天 我趴在偏房一處隱蔽的房頂上張望主胧。 院中可真熱鬧,春花似錦习勤、人聲如沸踪栋。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)夷都。三九已至,卻和暖如春予颤,著一層夾襖步出監(jiān)牢的瞬間囤官,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工蛤虐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留党饮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓驳庭,卻偏偏與公主長(zhǎng)得像刑顺,于是被迫代替她去往敵國(guó)和親氯窍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容