Flink 源碼之StreamGraph生成

Flink源碼分析系列文檔目錄

請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄

什么是StreamGraph

StreamGraph是Flink任務(wù)執(zhí)行流程拓?fù)鋱D的封裝。在Flink的client端,Environment執(zhí)行execute()方法的時(shí)候艘狭,用戶編寫的數(shù)據(jù)處理流程會(huì)轉(zhuǎn)變?yōu)镾treamGraph护赊。

各個(gè)算子最終會(huì)變成什么

Flink流處理的各個(gè)算子會(huì)被當(dāng)做一系列transformation儲(chǔ)存起來(lái)。具體請(qǐng)參見(jiàn)Flink 源碼之基本算子酪碘。
下面以DataStream的Map方法為例說(shuō)明。
DataStream的map方法:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    // 獲取mapper函數(shù)的返回類型
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
            Utils.getCallLocationName(), true);
    // 對(duì)map操作封裝為transformation,并返回SingleOutputStreamOperator
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

transform方法的代碼:

public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    // 這里將operator封裝入operator factory
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

doTransform方法:

private <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    // 如果transformation的輸出類型為MissingTypeInfo的話管呵,程序會(huì)拋異常
    transformation.getOutputType();

    // 構(gòu)造新的transformation
    // map類型的transformation只有一個(gè)輸入,因此它輸入OneInputTransformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            this.transformation,
            operatorName,
            operatorFactory,
            outTypeInfo,
            environment.getParallelism());

    // 構(gòu)造返回的stream哺窄,供后續(xù)的算子鏈?zhǔn)秸{(diào)用
    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    // 將transformation寫入ExecutionEnvironment中
    // ExecutionEnvironment維護(hù)了一個(gè)叫做transformations的ArrayList對(duì)象捐下,用于儲(chǔ)存所有的transformation
    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

執(zhí)行到此,map算子已被封裝為transformation萌业,存儲(chǔ)到了ExecutionEnvironment中坷襟。

幾種transformation的類型

OneInputTransformation

顧名思義OneInputTransformation只有一個(gè)輸入。代表的算子(單數(shù)據(jù)流)為:map生年,flatMap婴程,fliter,process抱婉,assignTimestamps等档叔。

TwoInputTransformation

TwoInputTransformation具有兩個(gè)輸入桌粉。ConnectedStream的算子為雙流運(yùn)算,它的算子會(huì)被轉(zhuǎn)換為TwoInputTransformation衙四。

SourceTransformation

在env中配置數(shù)據(jù)源的時(shí)候會(huì)創(chuàng)建出一個(gè)DataStreamSource铃肯。該對(duì)象為dataStream的源頭。DataStreamSource的構(gòu)造函數(shù)中會(huì)創(chuàng)建一個(gè)SourceTransformation传蹈。

SinkTransformation

和SourceTransformation類似押逼,在dataStream調(diào)用addSink方法的時(shí)候會(huì)生成一個(gè)DataStreamSink對(duì)象。該對(duì)象在創(chuàng)建的時(shí)候會(huì)同時(shí)構(gòu)造一個(gè)SinkTransformation惦界。

UnionTransformation

該transformation為合并多個(gè)input到一個(gè)流中宴胧。代表算子為union。

SplitTransformation

DataStream調(diào)用split的時(shí)候會(huì)創(chuàng)建SplitStream表锻。SplitStream初始化時(shí)會(huì)構(gòu)建一個(gè)SplitTransformation恕齐。

SelectTransformation

SplitStream在調(diào)用select算子的時(shí)候會(huì)創(chuàng)建SelectTransformation。

FeedbackTransformation

創(chuàng)建IterativeStream的時(shí)候會(huì)使用到該transformation瞬逊。

CoFeedbackTransformation

和FeedbackTransformation類似显歧,創(chuàng)建ConnectedIterativeStream的時(shí)候會(huì)使用到。

PartitionTransformation

涉及到控制數(shù)據(jù)流向的算子都屬于PartitionTransformation确镊,例如shuffle士骤,forward,rebalance蕾域,broadcast拷肌,rescale,global旨巷,partitionCustom和keyBy等巨缘。

SideOutputTransformation

調(diào)用getSideOutput(獲取旁路輸出)的時(shí)候,SideOutputTransformation會(huì)發(fā)生作用采呐。

根據(jù)Transformation構(gòu)建Stream Graph

我們從StreamExecutionEnvironment的execute方法開(kāi)始分析stream graph的生成過(guò)程若锁。

StreamExecutionEnvironment的execute方法源碼:

public JobExecutionResult execute(String jobName) throws Exception {
    Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

    return execute(getStreamGraph(jobName));
}

該方法中調(diào)用了getStreamGraph方法。找到這個(gè)方法斧吐,如下所示:

public StreamGraph getStreamGraph(String jobName) {
    // 創(chuàng)建一個(gè)StreamGraphGenerator對(duì)象又固,設(shè)置參數(shù),并調(diào)用generate方法生成stream graph
    return getStreamGraphGenerator().setJobName(jobName).generate();
}

private StreamGraphGenerator getStreamGraphGenerator() {
    if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }
    // 此處向StreamGraphGenerator傳入transformations煤率,以及其他的配置
    return new StreamGraphGenerator(transformations, config, checkpointCfg)
        .setStateBackend(defaultStateBackend)
        .setChaining(isChainingEnabled)
        .setUserArtifacts(cacheFile)
        .setTimeCharacteristic(timeCharacteristic)
        .setDefaultBufferTimeout(bufferTimeout);
}

接著跟蹤到StreamGraphGeneratorgenerate方法仰冠,如下所示:

public StreamGraph generate() {
    // 生成StreamGraph對(duì)象,傳入執(zhí)行配置和檢查點(diǎn)配置
    streamGraph = new StreamGraph(executionConfig, checkpointConfig);
    // 設(shè)置狀態(tài)后端
    streamGraph.setStateBackend(stateBackend);
    // 設(shè)置級(jí)聯(lián)配置蝶糯,為一項(xiàng)優(yōu)化配置
    streamGraph.setChaining(chaining);
    // 設(shè)置調(diào)度方式洋只,決定task延遲調(diào)度還是立刻調(diào)度
    streamGraph.setScheduleMode(scheduleMode);
    // StreamExecutionEnvironment的cacheFile會(huì)傳入該變量
    // cacheFile為需要分發(fā)到各個(gè)task manager的用戶文件
    streamGraph.setUserArtifacts(userArtifacts);
    // 設(shè)置時(shí)間特征,是event time,processing time還是ingestion time
    streamGraph.setTimeCharacteristic(timeCharacteristic);
    // 設(shè)置作業(yè)名稱
    streamGraph.setJobName(jobName);
    // 設(shè)置各個(gè)級(jí)聯(lián)之間是否采用blocking 連接
    streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

    // 儲(chǔ)存已經(jīng)被處理的transformation
    alreadyTransformed = new HashMap<>();

    // 逐個(gè)處理transformation
    for (Transformation<?> transformation: transformations) {
        transform(transformation);
    }

    // 獲取已生成的streamGraph
    final StreamGraph builtStreamGraph = streamGraph;

    // 清空中間變量
    alreadyTransformed.clear();
    alreadyTransformed = null;
    streamGraph = null;

    return builtStreamGraph;
}

這樣看來(lái)木张,重點(diǎn)就在transform(transformation);這一行代碼了。
transform方法:

    private Collection<Integer> transform(Transformation<?> transform) {

                // 檢查該transformation是否已被處理端三,如果已處理直接返回
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

                // 如果transformation的最大并行度沒(méi)有設(shè)置舷礼,全局的最大并行度已設(shè)置,將全局最大并行度設(shè)置給transformation
        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
                // 檢查transformation的輸出類型郊闯,如果是MissingTypeInfo則程序拋出異常
        transform.getOutputType();

        Collection<Integer> transformedIds;
                // 依照transformation的具體類型妻献,提供不同的處理方法
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        // 如果該transformation沒(méi)有被處理,則加入已處理列表
        // 處理每個(gè)transformation的時(shí)候會(huì)先處理它的input(可能沒(méi)有input团赁,也可能有一個(gè)或多個(gè))育拨,transform方法會(huì)遞歸調(diào)用。
        // 在transform方法執(zhí)行前后雙重檢查transformation是否已被處理可以確保在遞歸調(diào)用的情況下不會(huì)被重復(fù)處理
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        // 設(shè)置network的buffer超時(shí)時(shí)間
        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }

         // 設(shè)置uid
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        // 設(shè)置UserProvidedNodeHash
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        // 如果自動(dòng)設(shè)置uid功能被關(guān)閉欢摄,同時(shí)又沒(méi)有指定UserProvidedNodeHash和uid熬丧,程序拋出異常
        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
            if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
                throw new IllegalStateException("Auto generated UIDs have been disabled " +
                    "but no UID or hash has been assigned to operator " + transform.getName());
            }
        }

        // 設(shè)置transformation的最小和最佳資源要求
        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

接下來(lái)從transformSource方法開(kāi)始解析下StreamGraph的生成。

private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
    // 返回slotSharingGroup怀挠。此處返回“default”
    String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());

    // StreamGraph增加數(shù)據(jù)源
    streamGraph.addSource(source.getId(),
            slotSharingGroup,
            source.getCoLocationGroupKey(),
            source.getOperatorFactory(),
            null,
            source.getOutputType(),
            "Source: " + source.getName());
    // 設(shè)置輸入數(shù)據(jù)類型
    if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
        streamGraph.setInputFormat(source.getId(),
                ((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
    }
    // 設(shè)置并行度
    int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        source.getParallelism() : executionConfig.getParallelism();
    streamGraph.setParallelism(source.getId(), parallelism);
    streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
    return Collections.singleton(source.getId());
}

StreamGraph的addSource方法

public <IN, OUT> void addSource(Integer vertexID,
    @Nullable String slotSharingGroup,
    @Nullable String coLocationGroup,
    StreamOperatorFactory<OUT> operatorFactory,
    TypeInformation<IN> inTypeInfo,
    TypeInformation<OUT> outTypeInfo,
    String operatorName) {
    addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
    sources.add(vertexID);
}

這個(gè)方法邏輯不多析蝴,先增加operator,再把SourceTransformation設(shè)置為StreamGraph的source绿淋。

addOperator方法:

public <IN, OUT> void addOperator(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {

    // 判斷是否為StreamSource闷畸,如果是數(shù)據(jù)源的sourceFunction,它會(huì)被封裝入StreamSource對(duì)象吞滞,此處返回true
    if (operatorFactory.isStreamSource()) {
        addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
    } else {
        addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
    }

    // 構(gòu)建輸入輸出類型的序列化器
    TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;

    TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;

    // 設(shè)置序列化器
    setSerializers(vertexID, inSerializer, null, outSerializer);

    // 設(shè)置operatorFactory的輸入和輸出類型
    if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
        // sets the output type which must be know at StreamGraph creation time
        operatorFactory.setOutputType(outTypeInfo, executionConfig);
    }

    if (operatorFactory.isInputTypeConfigurable()) {
        operatorFactory.setInputType(inTypeInfo, executionConfig);
    }

    if (LOG.isDebugEnabled()) {
        LOG.debug("Vertex: {}", vertexID);
    }
}

addNode方法:

protected StreamNode addNode(Integer vertexID,
    @Nullable String slotSharingGroup,
    @Nullable String coLocationGroup,
    Class<? extends AbstractInvokable> vertexClass,
    StreamOperatorFactory<?> operatorFactory,
    String operatorName) {

    if (streamNodes.containsKey(vertexID)) {
        throw new RuntimeException("Duplicate vertexID " + vertexID);
    }

    // 此處創(chuàng)建一個(gè)StreamNode佑菩,加入到streamNodes集合中
    StreamNode vertex = new StreamNode(
        vertexID,
        slotSharingGroup,
        coLocationGroup,
        operatorFactory,
        operatorName,
        new ArrayList<OutputSelector<?>>(),
        vertexClass);

    streamNodes.put(vertexID, vertex);

    return vertex;
}

transformSource方法的邏輯相對(duì)簡(jiǎn)單。在StreamGraph中增加了一個(gè)節(jié)點(diǎn)裁赠,還有指定了stream的sources殿漠。下面我們?cè)傺芯肯掠玫谋容^多的transformOneInputTransform方法。
OneInputTransformation具有一個(gè)Input佩捞,指向它前一個(gè)transformation凸舵。如此可以形成一種鏈表結(jié)構(gòu),如下所示:

OneInputTransformation結(jié)構(gòu)

transformOneInputTransform代碼如下所示:

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

    // 先處理此Transformation的input transformation
    Collection<Integer> inputIds = transform(transform.getInput());

    // the recursive call might have already transformed this
    // 前一個(gè)遞歸調(diào)用中可能已經(jīng)將方法入口的transformation處理過(guò)了失尖,這里加以判斷啊奄,防止重復(fù)處理
    if (alreadyTransformed.containsKey(transform)) {
        return alreadyTransformed.get(transform);
    }

    // 下面步驟和transformSource類似,直到增加edge
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

    streamGraph.addOperator(transform.getId(),
            slotSharingGroup,
            transform.getCoLocationGroupKey(),
            transform.getOperatorFactory(),
            transform.getInputType(),
            transform.getOutputType(),
            transform.getName());

    if (transform.getStateKeySelector() != null) {
        TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
        streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
    }

    int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
        transform.getParallelism() : executionConfig.getParallelism();
    streamGraph.setParallelism(transform.getId(), parallelism);
    streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

    // 這一步是關(guān)鍵掀潮,添加一個(gè)edge對(duì)象菇夸,將此oneInputTransformation轉(zhuǎn)換成的vertex和input transformation轉(zhuǎn)換為的vertex連接起來(lái)
    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

    return Collections.singleton(transform.getId());
}

繼續(xù)跟蹤streamGraph.addEdge方法。addEdge方法又調(diào)用了addEdgeInternal方法仪吧,如下所示:

private void addEdgeInternal(Integer upStreamVertexID,
        Integer downStreamVertexID,
        int typeNumber,
        StreamPartitioner<?> partitioner,
        List<String> outputNames,
        OutputTag outputTag,
        ShuffleMode shuffleMode) {

    // 稍后分析這些
    if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
        if (outputTag == null) {
            outputTag = virtualSideOutputNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
    } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
        if (outputNames.isEmpty()) {
            // selections that happen downstream override earlier selections
            outputNames = virtualSelectNodes.get(virtualId).f1;
        }
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
        int virtualId = upStreamVertexID;
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
        if (partitioner == null) {
            partitioner = virtualPartitionNodes.get(virtualId).f1;
        }
        shuffleMode = virtualPartitionNodes.get(virtualId).f2;
        addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
    } else {
        // 獲取上游和下游的節(jié)點(diǎn)
        StreamNode upstreamNode = getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = getStreamNode(downStreamVertexID);

        // If no partitioner was specified and the parallelism of upstream and downstream
        // operator matches use forward partitioning, use rebalance otherwise.
        // 分區(qū)器設(shè)置庄新,后面說(shuō)明
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = new ForwardPartitioner<Object>();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner<Object>();
        }

        if (partitioner instanceof ForwardPartitioner) {
            if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                throw new UnsupportedOperationException("Forward partitioning does not allow " +
                        "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                        ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                        " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
        }

        if (shuffleMode == null) {
            shuffleMode = ShuffleMode.UNDEFINED;
        }

        // 創(chuàng)建一個(gè)新的StreamEdge
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

        // 連接剛剛創(chuàng)建的edge到上下游節(jié)點(diǎn)
        getStreamNode(edge.getSourceId()).addOutEdge(edge);
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

在方法開(kāi)始我們看到了virtualSideOutputNodesvirtualSelectNodesvirtualPartitionNodes的處理邏輯。這幾類transformation會(huì)被處理為虛擬節(jié)點(diǎn)择诈。什么是虛擬節(jié)點(diǎn)呢械蹋?我們發(fā)現(xiàn)sideOutput,select和分區(qū)操作不需要用戶傳入自定義的處理邏輯羞芍,即userFunction哗戈。這些類型的變換會(huì)被處理成虛擬節(jié)點(diǎn)。虛擬節(jié)點(diǎn)嚴(yán)格來(lái)說(shuō)不是StreamNode類型荷科,不包含物理轉(zhuǎn)換邏輯唯咬。
虛擬節(jié)點(diǎn)的不會(huì)出現(xiàn)在StreamGraph的處理流中,在添加edge的時(shí)候如果上有節(jié)點(diǎn)為虛擬節(jié)點(diǎn)畏浆,會(huì)通過(guò)遞歸的方式尋找上游節(jié)點(diǎn)胆胰,直至找到一個(gè)非虛擬節(jié)點(diǎn),再執(zhí)行添加edge邏輯刻获。虛擬節(jié)點(diǎn)通過(guò)內(nèi)部的originalId屬性蜀涨,附著于非虛擬節(jié)點(diǎn)上。

還有Partitioner需要說(shuō)明蝎毡。如果沒(méi)有指定partitioner勉盅,并且上下游的并行度相同,則使用ForwardPartitioner顶掉,直接推數(shù)據(jù)到本地下游的operator草娜。如果上游和下游的并行度設(shè)置不相同,使用RebalancePartitioner痒筒。該P(yáng)artitioner通過(guò)輪詢的方式發(fā)送數(shù)據(jù)到下游通道宰闰。
不能在上下游并行度不同的時(shí)候使用ForwardPartitioner。否則程序會(huì)拋異常簿透。

一張圖總結(jié)

以如下程序?yàn)槔?/p>

val stream = env.fromElements("hello", "world")
stream.map((_, 0)).keyBy(0).countWindow(1).process(new ProcessWindowFunction[(String, Int), String, Tuple, GlobalWindow] {
    override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
        for (elem <- elements) {
            out.collect(elem._1)
        }
    }
}).print()

該程序的轉(zhuǎn)換流程如圖所示:


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末移袍,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子老充,更是在濱河造成了極大的恐慌葡盗,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啡浊,死亡現(xiàn)場(chǎng)離奇詭異觅够,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)巷嚣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門喘先,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人廷粒,你說(shuō)我怎么就攤上這事窘拯『烨遥” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵涤姊,是天一觀的道長(zhǎng)暇番。 經(jīng)常有香客問(wèn)我,道長(zhǎng)思喊,這世上最難降的妖魔是什么壁酬? 我笑而不...
    開(kāi)封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮搔涝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘和措。我一直安慰自己庄呈,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布派阱。 她就那樣靜靜地躺著诬留,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贫母。 梳的紋絲不亂的頭發(fā)上文兑,一...
    開(kāi)封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音腺劣,去河邊找鬼绿贞。 笑死,一個(gè)胖子當(dāng)著我的面吹牛橘原,可吹牛的內(nèi)容都是我干的籍铁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼趾断,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼拒名!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起芋酌,我...
    開(kāi)封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤增显,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后脐帝,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體同云,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年堵腹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了梢杭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡秸滴,死狀恐怖武契,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤咒唆,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布届垫,位于F島的核電站,受9級(jí)特大地震影響全释,放射性物質(zhì)發(fā)生泄漏装处。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一浸船、第九天 我趴在偏房一處隱蔽的房頂上張望妄迁。 院中可真熱鬧,春花似錦李命、人聲如沸登淘。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)黔州。三九已至,卻和暖如春阔籽,著一層夾襖步出監(jiān)牢的瞬間流妻,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工笆制, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留绅这,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓在辆,卻偏偏與公主長(zhǎng)得像君躺,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子开缎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容