本文內(nèi)容是基于Flink 1.9來講解缸沃。在執(zhí)行Flink任務(wù)的時候,會涉及到三個Graph修械,分別是StreamGraph趾牧,JobGraph,ExecutionGraph肯污。其中StreamGraph和JobGraph是在client端生成的武氓,ExecutionGraph是在JobMaster中執(zhí)行的。
- StreamGraph是根據(jù)用戶代碼生成的最原始執(zhí)行圖仇箱,也就是直接翻譯用戶邏輯得到的圖
- JobGraph是對StreamGraph進(jìn)行優(yōu)化,比如設(shè)置哪些算子可以chain东羹,減少網(wǎng)絡(luò)開銷
- ExecutionGraph是用于作業(yè)調(diào)度的執(zhí)行圖剂桥,對JobGraph加了并行度的概念
本篇文章首先介紹下StreamGraph的生成
1. transformations生成
Flink引擎有很多算子,比如map, flatMap, join等属提,這些算子都會生成一個transformation权逗。比如對于flatMap算子美尸,咱們跟下源碼,看下DataStream#flatMap方法
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
- 先獲取返回值類型相關(guān)信息
- transform算子斟薇,跟進(jìn)去源碼繼續(xù)看的話师坎,會首先構(gòu)建一個OneInputTransformation對象,然后把該對象加入StreamExecutionEnvironment 的 transformations對象中
2. StreamGraph生成入口 StreamGraphGenerator#generate()方法
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
streamGraph.setStateBackend(stateBackend);
streamGraph.setChaining(chaining);
streamGraph.setScheduleMode(scheduleMode);
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
alreadyTransformed = new HashMap<>();
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
這個generate方法會對所有的transformations進(jìn)行轉(zhuǎn)換堪滨,咱們接著看下transform邏輯
/**
* Transforms one {@code Transformation}.
*
* <p>This checks whether we already transformed it and exits early in that case. If not it
* delegates to one of the transformation specific methods.
*/
private Collection<Integer> transform(Transformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform instanceof PhysicalTransformation &&
transform.getUserProvidedNodeHash() == null &&
transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
從源碼可以看出胯陋,transform在構(gòu)建的時候,會有多種類型袱箱,比如分為Source, Sink, OneInput, Split等遏乔。比如flatMap,就屬于OneInputTransformation发笔,接下來以比較常見的transformOneInputTransform進(jìn)行介紹盟萨。
/**
* Transforms a {@code OneInputTransformation}.
*
* <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
* wired the inputs to this new node.
*/
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
- 遞歸調(diào)用該算子所有的input節(jié)點(diǎn)
- 把該Operator加入streamGraph中,實(shí)際會生成一個StreamNode對象加入streamGraph了讨。
?? 1. 首先調(diào)用streamGraph.addOperator
?? 2. 然后調(diào)用addNode方法
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
這個addNode方法捻激,會把該operator轉(zhuǎn)換成一個StreamNode,然后加到StreamGraph中前计,vertexID對應(yīng)transform.getId()
- 為該Operator的所有輸入與該Operator之間加上StreamEdge
這樣通過 StreamNode 和 SteamEdge胞谭,就構(gòu)建出了 DAG 中的所有節(jié)點(diǎn)和邊,以及它們之間的連接關(guān)系残炮,拓?fù)浣Y(jié)構(gòu)也就建立了韭赘。
3. 小結(jié)
StreamGraph其實(shí)就是由用戶代碼中涉及到transformations轉(zhuǎn)換來的,SteamEdge用來表示transformation之間的連接關(guān)系势就,StreamNode用來表示具體的operator泉瞻。
- 從sink節(jié)點(diǎn)開始遍歷
- 每個transformation,會在StreamGraph中新創(chuàng)建一個StreamNode苞冯,并且把新創(chuàng)建的StreamNode和它所有的input之間添加SteamEdge袖牙。
- Partitioning, split/select 和 union 并不會在StreamNode中增加一個真實(shí)的StreamNode,而是創(chuàng)建一個具有特殊屬性的虛擬節(jié)點(diǎn)舅锄,比如partitioning, selector等鞭达,也就是在邊上加了屬性信息。