StreamGraph是什么皿桑?
StreamGraph顧名思義是流圖详拙,它描述的是一個(gè)數(shù)據(jù)流的拓補(bǔ)結(jié)構(gòu)叹阔,包含了創(chuàng)建JobGraph的所有必要信息质蕉。StreamGraph由StreamNode和StreamEdge組成势篡,StreamNode描述了流程序中的一個(gè)操作符和相關(guān)的屬性翩肌,StreamEdge是連接兩個(gè)StreamNode的邊,代表的是一個(gè)數(shù)據(jù)流禁悠。
為什么需要StreamGraph念祭?
Flink是一個(gè)流式計(jì)算引擎,它支持兩種類型的執(zhí)行模式:流式計(jì)算和批處理碍侦。但是在運(yùn)行時(shí)粱坤,這兩種模式是被統(tǒng)一,并沒有區(qū)別對(duì)待瓷产,都是使用JobGraph進(jìn)行描述站玄。為了描述這兩種執(zhí)行模式,分別定義了不同的數(shù)據(jù)結(jié)構(gòu)濒旦,那就是StreamGraph和Plan株旷,基于這兩種數(shù)據(jù)結(jié)構(gòu)生成相應(yīng)的JobGraph。
如何創(chuàng)建StreamGraph尔邓?
在介紹如何創(chuàng)建StreamGraph之前晾剖,我們先介紹一些跟StreamGraph相關(guān)的和新概念和數(shù)據(jù)結(jié)構(gòu)。
StreamExecutionEnvironment
流的執(zhí)行環(huán)境梯嗽,包含流的ExecutionConfig齿尽,CheckpointConfig,StreamTransformation列表和StateBackend等灯节。它是生成StreamGraph的入口:
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
StreamGraphGenerator
顧名思義雕什,StreamGraph生成器,真正執(zhí)行和生成StreamGraph的類显晶。它根據(jù)創(chuàng)建流拓補(bǔ)結(jié)構(gòu)過程中生成的StreamTransformation贷岸,解析生成相應(yīng)StreamNode和StreamEdge,組合生成StreamGraph磷雇。
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
StreamTransformation
StreamTransformation代表了一個(gè)創(chuàng)建DataStream的操作偿警,每個(gè)DataStream都有一個(gè)底層的StreamTransformation。但是并不是每一個(gè)StreamTransformation都對(duì)應(yīng)一個(gè)運(yùn)行時(shí)的物理操作唯笙。例如:
上圖左邊對(duì)應(yīng)的是創(chuàng)建流拓補(bǔ)結(jié)構(gòu)是對(duì)應(yīng)的流圖螟蒸,而右邊則是運(yùn)行時(shí)的操作圖。
StreamTransformation是一個(gè)抽象類型崩掘,具體對(duì)應(yīng)的實(shí)現(xiàn)由很多種:
- SourceTransformation
- OneInputTransformation
- SideOutputTransformation
- SelectTransformation
- PartitionTransformation
- CoFeedbackTransformation
- FeedbackTransformation
- UnionTransformation
- SplitTransformation
- TwoInputTransformation
- SinkTransformation
每種類型的StreamTransformation都對(duì)應(yīng)相應(yīng)的operator七嫌,通過解析具體的StreamTransformation,創(chuàng)建StreamNode和StreamEdge苞慢。當(dāng)然前面已經(jīng)說過诵原,并不是每種類型的StreamTransformation都會(huì)創(chuàng)建StreamNode和StreamEdge,下面看下具體的生成過程:
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
以O(shè)neInputTransformation為例,看下是如何創(chuàng)建StreamNode和StreamEdge绍赛。
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
第一步:解析OneInputTransformation的輸入源(也是一個(gè)StreamTransformation蔓纠,其實(shí)這是一個(gè)遞歸的過程,從最后一個(gè)StreamTransformation一直解析到第一個(gè)StreamTransformation)吗蚌,得到輸入源的節(jié)點(diǎn)id腿倚。
Collection<Integer> inputIds = transform(transform.getInput());
第二步:創(chuàng)建StreamNode,并添加到StreamGraph中蚯妇。
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
第三步:往StreamGraph中添加關(guān)聯(lián)輸入源和節(jié)點(diǎn)的邊敷燎。
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
當(dāng)遍歷完所有的StreamTransformation,則相應(yīng)的StreamGraph也已經(jīng)生成箩言。