Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
什么是StreamGraph
StreamGraph是Flink任務(wù)執(zhí)行流程拓?fù)鋱D的封裝。在Flink的client端,Environment執(zhí)行execute()方法的時(shí)候艘狭,用戶編寫的數(shù)據(jù)處理流程會(huì)轉(zhuǎn)變?yōu)镾treamGraph护赊。
各個(gè)算子最終會(huì)變成什么
Flink流處理的各個(gè)算子會(huì)被當(dāng)做一系列transformation儲(chǔ)存起來(lái)。具體請(qǐng)參見(jiàn)Flink 源碼之基本算子酪碘。
下面以DataStream的Map方法為例說(shuō)明。
DataStream的map方法:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 獲取mapper函數(shù)的返回類型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
// 對(duì)map操作封裝為transformation,并返回SingleOutputStreamOperator
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
transform方法的代碼:
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
// 這里將operator封裝入operator factory
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
doTransform方法:
private <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 如果transformation的輸出類型為MissingTypeInfo的話管呵,程序會(huì)拋異常
transformation.getOutputType();
// 構(gòu)造新的transformation
// map類型的transformation只有一個(gè)輸入,因此它輸入OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
// 構(gòu)造返回的stream哺窄,供后續(xù)的算子鏈?zhǔn)秸{(diào)用
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
// 將transformation寫入ExecutionEnvironment中
// ExecutionEnvironment維護(hù)了一個(gè)叫做transformations的ArrayList對(duì)象捐下,用于儲(chǔ)存所有的transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
執(zhí)行到此,map算子已被封裝為transformation萌业,存儲(chǔ)到了ExecutionEnvironment中坷襟。
幾種transformation的類型
OneInputTransformation
顧名思義OneInputTransformation只有一個(gè)輸入。代表的算子(單數(shù)據(jù)流)為:map生年,flatMap婴程,fliter,process抱婉,assignTimestamps等档叔。
TwoInputTransformation
TwoInputTransformation具有兩個(gè)輸入桌粉。ConnectedStream的算子為雙流運(yùn)算,它的算子會(huì)被轉(zhuǎn)換為TwoInputTransformation衙四。
SourceTransformation
在env中配置數(shù)據(jù)源的時(shí)候會(huì)創(chuàng)建出一個(gè)DataStreamSource铃肯。該對(duì)象為dataStream的源頭。DataStreamSource的構(gòu)造函數(shù)中會(huì)創(chuàng)建一個(gè)SourceTransformation传蹈。
SinkTransformation
和SourceTransformation類似押逼,在dataStream調(diào)用addSink方法的時(shí)候會(huì)生成一個(gè)DataStreamSink對(duì)象。該對(duì)象在創(chuàng)建的時(shí)候會(huì)同時(shí)構(gòu)造一個(gè)SinkTransformation惦界。
UnionTransformation
該transformation為合并多個(gè)input到一個(gè)流中宴胧。代表算子為union。
SplitTransformation
DataStream調(diào)用split的時(shí)候會(huì)創(chuàng)建SplitStream表锻。SplitStream初始化時(shí)會(huì)構(gòu)建一個(gè)SplitTransformation恕齐。
SelectTransformation
SplitStream在調(diào)用select算子的時(shí)候會(huì)創(chuàng)建SelectTransformation。
FeedbackTransformation
創(chuàng)建IterativeStream的時(shí)候會(huì)使用到該transformation瞬逊。
CoFeedbackTransformation
和FeedbackTransformation類似显歧,創(chuàng)建ConnectedIterativeStream的時(shí)候會(huì)使用到。
PartitionTransformation
涉及到控制數(shù)據(jù)流向的算子都屬于PartitionTransformation确镊,例如shuffle士骤,forward,rebalance蕾域,broadcast拷肌,rescale,global旨巷,partitionCustom和keyBy等巨缘。
SideOutputTransformation
調(diào)用getSideOutput(獲取旁路輸出)的時(shí)候,SideOutputTransformation會(huì)發(fā)生作用采呐。
根據(jù)Transformation構(gòu)建Stream Graph
我們從StreamExecutionEnvironment的execute方法開(kāi)始分析stream graph的生成過(guò)程若锁。
StreamExecutionEnvironment的execute方法源碼:
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
return execute(getStreamGraph(jobName));
}
該方法中調(diào)用了getStreamGraph
方法。找到這個(gè)方法斧吐,如下所示:
public StreamGraph getStreamGraph(String jobName) {
// 創(chuàng)建一個(gè)StreamGraphGenerator對(duì)象又固,設(shè)置參數(shù),并調(diào)用generate方法生成stream graph
return getStreamGraphGenerator().setJobName(jobName).generate();
}
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
// 此處向StreamGraphGenerator傳入transformations煤率,以及其他的配置
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
接著跟蹤到StreamGraphGenerator
的generate
方法仰冠,如下所示:
public StreamGraph generate() {
// 生成StreamGraph對(duì)象,傳入執(zhí)行配置和檢查點(diǎn)配置
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
// 設(shè)置狀態(tài)后端
streamGraph.setStateBackend(stateBackend);
// 設(shè)置級(jí)聯(lián)配置蝶糯,為一項(xiàng)優(yōu)化配置
streamGraph.setChaining(chaining);
// 設(shè)置調(diào)度方式洋只,決定task延遲調(diào)度還是立刻調(diào)度
streamGraph.setScheduleMode(scheduleMode);
// StreamExecutionEnvironment的cacheFile會(huì)傳入該變量
// cacheFile為需要分發(fā)到各個(gè)task manager的用戶文件
streamGraph.setUserArtifacts(userArtifacts);
// 設(shè)置時(shí)間特征,是event time,processing time還是ingestion time
streamGraph.setTimeCharacteristic(timeCharacteristic);
// 設(shè)置作業(yè)名稱
streamGraph.setJobName(jobName);
// 設(shè)置各個(gè)級(jí)聯(lián)之間是否采用blocking 連接
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
// 儲(chǔ)存已經(jīng)被處理的transformation
alreadyTransformed = new HashMap<>();
// 逐個(gè)處理transformation
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
// 獲取已生成的streamGraph
final StreamGraph builtStreamGraph = streamGraph;
// 清空中間變量
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
這樣看來(lái)木张,重點(diǎn)就在transform(transformation);
這一行代碼了。
transform方法:
private Collection<Integer> transform(Transformation<?> transform) {
// 檢查該transformation是否已被處理端三,如果已處理直接返回
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 如果transformation的最大并行度沒(méi)有設(shè)置舷礼,全局的最大并行度已設(shè)置,將全局最大并行度設(shè)置給transformation
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
// 檢查transformation的輸出類型郊闯,如果是MissingTypeInfo則程序拋出異常
transform.getOutputType();
Collection<Integer> transformedIds;
// 依照transformation的具體類型妻献,提供不同的處理方法
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
// 如果該transformation沒(méi)有被處理,則加入已處理列表
// 處理每個(gè)transformation的時(shí)候會(huì)先處理它的input(可能沒(méi)有input团赁,也可能有一個(gè)或多個(gè))育拨,transform方法會(huì)遞歸調(diào)用。
// 在transform方法執(zhí)行前后雙重檢查transformation是否已被處理可以確保在遞歸調(diào)用的情況下不會(huì)被重復(fù)處理
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
// 設(shè)置network的buffer超時(shí)時(shí)間
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}
// 設(shè)置uid
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
// 設(shè)置UserProvidedNodeHash
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
// 如果自動(dòng)設(shè)置uid功能被關(guān)閉欢摄,同時(shí)又沒(méi)有指定UserProvidedNodeHash和uid熬丧,程序拋出異常
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}
// 設(shè)置transformation的最小和最佳資源要求
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
接下來(lái)從transformSource
方法開(kāi)始解析下StreamGraph的生成。
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
// 返回slotSharingGroup怀挠。此處返回“default”
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
// StreamGraph增加數(shù)據(jù)源
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
// 設(shè)置輸入數(shù)據(jù)類型
if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
streamGraph.setInputFormat(source.getId(),
((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
}
// 設(shè)置并行度
int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
source.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(source.getId(), parallelism);
streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
StreamGraph的addSource方法
public <IN, OUT> void addSource(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
這個(gè)方法邏輯不多析蝴,先增加operator,再把SourceTransformation設(shè)置為StreamGraph的source绿淋。
addOperator方法:
public <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
// 判斷是否為StreamSource闷畸,如果是數(shù)據(jù)源的sourceFunction,它會(huì)被封裝入StreamSource對(duì)象吞滞,此處返回true
if (operatorFactory.isStreamSource()) {
addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
} else {
addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
}
// 構(gòu)建輸入輸出類型的序列化器
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
// 設(shè)置序列化器
setSerializers(vertexID, inSerializer, null, outSerializer);
// 設(shè)置operatorFactory的輸入和輸出類型
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);
}
if (operatorFactory.isInputTypeConfigurable()) {
operatorFactory.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
addNode方法:
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
// 此處創(chuàng)建一個(gè)StreamNode佑菩,加入到streamNodes集合中
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
transformSource方法的邏輯相對(duì)簡(jiǎn)單。在StreamGraph中增加了一個(gè)節(jié)點(diǎn)裁赠,還有指定了stream的sources殿漠。下面我們?cè)傺芯肯掠玫谋容^多的transformOneInputTransform
方法。
OneInputTransformation具有一個(gè)Input佩捞,指向它前一個(gè)transformation凸舵。如此可以形成一種鏈表結(jié)構(gòu),如下所示:
transformOneInputTransform代碼如下所示:
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
// 先處理此Transformation的input transformation
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
// 前一個(gè)遞歸調(diào)用中可能已經(jīng)將方法入口的transformation處理過(guò)了失尖,這里加以判斷啊奄,防止重復(fù)處理
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 下面步驟和transformSource類似,直到增加edge
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 這一步是關(guān)鍵掀潮,添加一個(gè)edge對(duì)象菇夸,將此oneInputTransformation轉(zhuǎn)換成的vertex和input transformation轉(zhuǎn)換為的vertex連接起來(lái)
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
繼續(xù)跟蹤streamGraph.addEdge
方法。addEdge
方法又調(diào)用了addEdgeInternal
方法仪吧,如下所示:
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
// 稍后分析這些
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
// 獲取上游和下游的節(jié)點(diǎn)
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 分區(qū)器設(shè)置庄新,后面說(shuō)明
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
// 創(chuàng)建一個(gè)新的StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
// 連接剛剛創(chuàng)建的edge到上下游節(jié)點(diǎn)
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
在方法開(kāi)始我們看到了virtualSideOutputNodes
,virtualSelectNodes
和virtualPartitionNodes
的處理邏輯。這幾類transformation會(huì)被處理為虛擬節(jié)點(diǎn)择诈。什么是虛擬節(jié)點(diǎn)呢械蹋?我們發(fā)現(xiàn)sideOutput,select和分區(qū)操作不需要用戶傳入自定義的處理邏輯羞芍,即userFunction哗戈。這些類型的變換會(huì)被處理成虛擬節(jié)點(diǎn)。虛擬節(jié)點(diǎn)嚴(yán)格來(lái)說(shuō)不是StreamNode類型荷科,不包含物理轉(zhuǎn)換邏輯唯咬。
虛擬節(jié)點(diǎn)的不會(huì)出現(xiàn)在StreamGraph的處理流中,在添加edge的時(shí)候如果上有節(jié)點(diǎn)為虛擬節(jié)點(diǎn)畏浆,會(huì)通過(guò)遞歸的方式尋找上游節(jié)點(diǎn)胆胰,直至找到一個(gè)非虛擬節(jié)點(diǎn),再執(zhí)行添加edge邏輯刻获。虛擬節(jié)點(diǎn)通過(guò)內(nèi)部的originalId屬性蜀涨,附著于非虛擬節(jié)點(diǎn)上。
還有Partitioner需要說(shuō)明蝎毡。如果沒(méi)有指定partitioner勉盅,并且上下游的并行度相同,則使用ForwardPartitioner顶掉,直接推數(shù)據(jù)到本地下游的operator草娜。如果上游和下游的并行度設(shè)置不相同,使用RebalancePartitioner痒筒。該P(yáng)artitioner通過(guò)輪詢的方式發(fā)送數(shù)據(jù)到下游通道宰闰。
不能在上下游并行度不同的時(shí)候使用ForwardPartitioner。否則程序會(huì)拋異常簿透。
一張圖總結(jié)
以如下程序?yàn)槔?/p>
val stream = env.fromElements("hello", "world")
stream.map((_, 0)).keyBy(0).countWindow(1).process(new ProcessWindowFunction[(String, Int), String, Tuple, GlobalWindow] {
override def process(key: Tuple, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
for (elem <- elements) {
out.collect(elem._1)
}
}
}).print()
該程序的轉(zhuǎn)換流程如圖所示: