Flink 1.10 源碼 -- SreamGraph
Flink的執(zhí)行計(jì)劃分為四層
StreamGraph
用戶層面上的, 用戶根據(jù)API編寫(xiě)的程序構(gòu)造出一個(gè)代表拓補(bǔ)圖結(jié)構(gòu)的StreamGraph
相關(guān)代碼在flink-streaming-java項(xiàng)目下StreamGraphGenerator類(lèi)的generate方法中,該方法會(huì)在StreamExecutionEnvironment.execute()方法調(diào)用到,也就是說(shuō)StreamGraph 是在Client端構(gòu)造的,意味著我們可以通過(guò)本地調(diào)試觀察StreamGraph的構(gòu)造過(guò)程
在構(gòu)建StreamGraph之前先了解一下Transformation
Transformation
在Flink流處理中,算子最終會(huì)將算子轉(zhuǎn)換成一個(gè)Transformation, 每個(gè)Transformation表示 一個(gè)DataStream或多個(gè)DataStream轉(zhuǎn)換成新DataStream的操作, 比如,map,filter,union等
我們根據(jù)map看一下轉(zhuǎn)換的一個(gè)過(guò)程,這是DataStream的map方法,接收用戶傳入的函數(shù),將函數(shù)包裝為StreamMap類(lèi)型,調(diào)用transform方法
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
跟進(jìn)transform方法 這里將map封裝為StreamOperatorFactory
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
//這里將operator封裝入operator factory
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
進(jìn)入doTransform方法,在這里將 operator封裝成了一個(gè)OneInputTransformation對(duì)象,也就是transformation對(duì)象
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 檢查輸出類(lèi)型是否為MissingTypeInfo,如果是拋出異常,
transformation.getOutputType();
//創(chuàng)建OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
transformation, //input --上游的 transformation
operatorName,
operatorFactory, //需要進(jìn)行轉(zhuǎn)換操作的
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//多個(gè)級(jí)聯(lián)的map和filter操作會(huì)被transform成為一連串的OneInputTransformation形纺。
// 后一個(gè)transformation的input指向前一個(gè)transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
最終通過(guò)getExecutionEnvironment().addOperator添加到列表中
getExecutionEnvironment獲取執(zhí)行環(huán)境,
addOperator添加列表中
到這里算子就被封裝為transformation對(duì)象,保存到了StreamExecutionEnvironment中
public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}
protected final List<Transformation<?>> transformations = new ArrayList<>();
@Internal
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
StreamOperator
DataStream上的每一個(gè)transformation都對(duì)應(yīng)一個(gè)StreamOperator,StreamOperator是運(yùn)行時(shí)具體的實(shí)現(xiàn),會(huì)決定User-Defined Funtion(udf)
,的調(diào)用方式,可以看出托呕,所有實(shí)現(xiàn)類(lèi)都繼承了AbstractStreamOperator
沫换。另外除了 project 操作席爽,其他所有可以執(zhí)行UDF代碼的實(shí)現(xiàn)類(lèi)都繼承自AbstractUdfStreamOperator
,該類(lèi)是封裝了UDF的StreamOperator闽寡。UDF就是實(shí)現(xiàn)了Function
接口的類(lèi)霸株,如MapFunction
,FilterFunction
transformation的類(lèi)型
OneInputTransformation
顧名思義OneInputTransformation只有一個(gè)輸入。代表的算子(單數(shù)據(jù)流)為:map狮辽,flatMap,fliter巢寡,process,assignTimestamps等椰苟。
TwoInputTransformation
TwoInputTransformation具有兩個(gè)輸入抑月。ConnectedStream的算子為雙流運(yùn)算,它的算子會(huì)被轉(zhuǎn)換為T(mén)woInputTransformation舆蝴。
SourceTransformation
在env中配置數(shù)據(jù)源的時(shí)候會(huì)創(chuàng)建出一個(gè)DataStreamSource谦絮。該對(duì)象為dataStream的源頭。DataStreamSource的構(gòu)造函數(shù)中會(huì)創(chuàng)建一個(gè)SourceTransformation洁仗。
SinkTransformation
和SourceTransformation類(lèi)似层皱,在dataStream調(diào)用addSink方法的時(shí)候會(huì)生成一個(gè)DataStreamSink對(duì)象。該對(duì)象在創(chuàng)建的時(shí)候會(huì)同時(shí)構(gòu)造一個(gè)SinkTransformation赠潦。
UnionTransformation
該transformation為合并多個(gè)input到一個(gè)流中叫胖。代表算子為union。
SplitTransformation
DataStream調(diào)用split的時(shí)候會(huì)創(chuàng)建SplitStream她奥。SplitStream初始化時(shí)會(huì)構(gòu)建一個(gè)SplitTransformation瓮增。
SelectTransformation
SplitStream在調(diào)用select算子的時(shí)候會(huì)創(chuàng)建SelectTransformation怎棱。
FeedbackTransformation
創(chuàng)建IterativeStream的時(shí)候會(huì)使用到該transformation。
CoFeedbackTransformation
和FeedbackTransformation類(lèi)似绷跑,創(chuàng)建ConnectedIterativeStream的時(shí)候會(huì)使用到拳恋。
PartitionTransformation
涉及到控制數(shù)據(jù)流向的算子都屬于PartitionTransformation,例如shuffle砸捏,forward谬运,rebalance,broadcast垦藏,rescale梆暖,global,partitionCustom和keyBy等膝藕。
SideOutputTransformation
調(diào)用getSideOutput(獲取旁路輸出)的時(shí)候式廷,SideOutputTransformation會(huì)發(fā)生作用。
構(gòu)建StreamGraph
知道了算子轉(zhuǎn)換后的操作后,進(jìn)入到生成StreamGraph的邏輯,上面已經(jīng)知道了,生說(shuō)StreamGraph的邏輯在StreamExecutionEnvironment的execute方法中,在execute方法中調(diào)用調(diào)用的getStreamGraph方法生成StreamGraph
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
//調(diào)用 execute方法, 需要傳入一個(gè)StreamGraph
// getStreamGraph 獲取StreamGraph
return execute(getStreamGraph(jobName));
}
進(jìn)入getStreamGraph,這里調(diào)用了同名方法
@Internal
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraph(jobName, true);
}
繼續(xù)追進(jìn)getStreamGraph方法
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
// getStreamGraphGenerator 創(chuàng)建一個(gè)StreamGraphGenerator對(duì)象
// generate 生成StreamGraph
// 生成StreamGraph
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
getStreamGraphGenerator方法,這個(gè)方法會(huì)生成一個(gè)StreamGraphGenerator對(duì)象,StreamGraph是通過(guò)這個(gè)類(lèi)進(jìn)行生成的
private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
// 創(chuàng)建 StreamGraphGenerator對(duì)象 并設(shè)置一系列的參數(shù)
return new StreamGraphGenerator(transformations, config, checkpointCfg)
.setStateBackend(defaultStateBackend)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout);
}
StreamGraphGenerator
的generate
方法,在該方法中會(huì)設(shè)置一些參數(shù),以及調(diào)用transform方法,最終獲取生成的StreamGraph進(jìn)行返回
public StreamGraph generate() {
// 生成StreamGraph對(duì)象芭挽,傳入執(zhí)行配置和檢查點(diǎn)配置
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
// 設(shè)置狀態(tài)后端
streamGraph.setStateBackend(stateBackend);
// 設(shè)置級(jí)聯(lián)配置滑废,為一項(xiàng)優(yōu)化配置 -- 是否可以將算子 chain在一起
streamGraph.setChaining(chaining);
// 設(shè)置調(diào)度方式,決定task延遲調(diào)度還是立刻調(diào)度
streamGraph.setScheduleMode(scheduleMode);
// StreamExecutionEnvironment的cacheFile會(huì)傳入該變量
// cacheFile為需要分發(fā)到各個(gè)task manager的用戶文件
// todo 用于分布式緩存的
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
// 設(shè)置各個(gè)級(jí)聯(lián)之間是否采用blocking 連接
streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
// 儲(chǔ)存已經(jīng)被處理的transformation
alreadyTransformed = new HashMap<>();
// 逐個(gè)處理transformation
for (Transformation<?> transformation: transformations) {
//該方法會(huì)進(jìn)行遞歸調(diào)用,主要邏輯也在這里
transform(transformation);
}
// 獲取已生成的streamGraph
final StreamGraph builtStreamGraph = streamGraph;
// 清空中間變量
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
追進(jìn)transform方法,主要處理所里都在這個(gè)方法中,
這個(gè)方法是一個(gè)遞歸的方法,在獲取transformedIds的邏輯中會(huì)遞歸的調(diào)用該方法, 也就是說(shuō)每一個(gè)transform都會(huì)遍歷去尋找上游,保證上游全部處理完
這個(gè)方法會(huì)將用戶編寫(xiě)代碼轉(zhuǎn)換成StreamGraph,在這之前已經(jīng)將算子轉(zhuǎn)為一個(gè)transform并添加到了env的transformas(List集合)中,這里會(huì)將遍歷transforms集合,將transform構(gòu)建成StreamNode以及StreamEdge
主要邏輯都在處理不同的transform方法中, 這里會(huì)對(duì)每個(gè)transform進(jìn)行判斷實(shí)例類(lèi)型,不同的transform實(shí)例會(huì)調(diào)用不同方法進(jìn)行處理
private Collection<Integer> transform(Transformation<?> transform) { //transform 為每一個(gè)算子
//如果轉(zhuǎn)換后的map中包含當(dāng)前要轉(zhuǎn)換的 transform 那么說(shuō)明已經(jīng)被處理過(guò)了,直接返回
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 如果transformation的最大并行度沒(méi)有設(shè)置袜爪,全局的最大并行度已設(shè)置蠕趁,將全局最大并行度設(shè)置給transformation
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
// 檢查transformation的輸出類(lèi)型,如果是MissingTypeInfo則程序拋出異常
transform.getOutputType();
Collection<Integer> transformedIds;
// 根據(jù)不同的 transform 調(diào)用不同的處理方法
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof AbstractMultipleInputTransformation<?>) {
transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
} else if (transform instanceof SourceTransformation) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof LegacySourceTransformation<?>) {
transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
// 如果該transformation沒(méi)有被處理辛馆,則加入已處理列表
// 處理每個(gè)transformation的時(shí)候會(huì)先處理它的input(可能沒(méi)有input俺陋,也可能有一個(gè)或多個(gè)),transform方法會(huì)遞歸調(diào)用昙篙。
// 在transform方法執(zhí)行前后雙重檢查transformation是否已被處理可以確保在遞歸調(diào)用的情況下不會(huì)被重復(fù)處理
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
// 設(shè)置 transform(network)的 緩沖器超時(shí)時(shí)間, 如果沒(méi)有設(shè)置則使用的默認(rèn)的 100L
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
// 設(shè)置TransformationUserHash
// transform.getUserProvidedNodeHash 獲取uid的hash, 在用戶編寫(xiě)代碼的時(shí)候在算子后面調(diào)用setUidHash設(shè)置的
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
// 如果自動(dòng)設(shè)置uid功能被關(guān)閉腊状,同時(shí)又沒(méi)有指定UserProvidedNodeHash和uid,程序拋出異常
if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform instanceof PhysicalTransformation &&
transform.getUserProvidedNodeHash() == null &&
transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
// 設(shè)置托管內(nèi)存權(quán)重
// 設(shè)置transformation的最小和最佳資源要求
streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());
return transformedIds;
}
在處理transform的時(shí)候,會(huì)根據(jù)不同的transform進(jìn)行不同的處理,大部分處理邏輯都是類(lèi)似,這里查看一下transformOneInputTransform,SourceTransformation處理邏輯
transformOneInputTransform
該transform 方法的主要作用是在會(huì)遞歸的轉(zhuǎn)換input,在Graph中創(chuàng)建一個(gè)新的StreamNode,并將input連接到這個(gè)新節(jié)點(diǎn) (input -> 上游節(jié)點(diǎn)<StreamNode>)
在這個(gè)方法中主要的邏輯
1. 遞歸遍歷他的上游input,確保上游處理完畢并添加edge,并判斷如果遞歸過(guò)程中已經(jīng)處理完,那么直接返回
2. 調(diào)用appOperator方法,將transform轉(zhuǎn)換成StreamNode并添加到StreamNodes列表中
3. 如果該transform需要進(jìn)行keySelector進(jìn)行分區(qū),則會(huì)設(shè)置keySelector和序列化器
4. 調(diào)用addEdge方法將講個(gè)StreamNode連接
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//首先會(huì)遞歸遍歷他的每一個(gè)上游input苔可,保證上游全部處理完畢缴挖。然后添加Edge
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
// 在遞歸的過(guò)程中,已經(jīng)轉(zhuǎn)換完畢, 直接返回,防止重復(fù)處理
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 獲取 slot 共享組 -- 因?yàn)橛锌赡芏鄠€(gè)算子會(huì)被chain在一起 這樣可以提高slot的資源利用率
// 用于后面調(diào)度task的
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 在 streamGraph中 添加算子
// 方法會(huì)將 operator(transform) 轉(zhuǎn)換成StreamNode,并添加到 StreamNodes列表中
streamGraph.addOperator(transform.getId(), //transform的唯一id
slotSharingGroup, // slot 共享組
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(), //獲取transform的StreamOperatorFactory
transform.getInputType(), //獲取輸入類(lèi)型
transform.getOutputType(), // 獲取輸出類(lèi)型
transform.getName()); // 獲取 該轉(zhuǎn)換的name
// 如果該轉(zhuǎn)換需要進(jìn)行keySelector進(jìn)行分區(qū) 需要對(duì)key進(jìn)行序列話及設(shè)置key的eKeySelector
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
//獲取 transform的平行度
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
//設(shè)置 transformId的并行度
streamGraph.setParallelism(transform.getId(), parallelism);
//設(shè)置 最大并行度
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
//循環(huán)上游的input, 保證上游全部處理完
// 這一步是關(guān)鍵,添加一個(gè)edge對(duì)象焚辅,將此oneInputTransformation轉(zhuǎn)換成的vertex和input transformation轉(zhuǎn)換為的vertex連接起來(lái)
for (Integer inputId: inputIds) {
//將 input 添加到 edge
// 兩個(gè)StreamNode 會(huì)通過(guò)StreamEdge連接在一起, 每一個(gè)transform會(huì)生成一個(gè)StreamNode
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
現(xiàn)在看一下 appOperator和addEdge方法
先看一下appOperator方法
該法最主要的是調(diào)用了addNode
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName,
Class<? extends AbstractInvokable> invokableClass) {
// 將transform 轉(zhuǎn)換成StreamNode 并添加到StreamNodes列表中
addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
// 添加序列化
setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
//設(shè)置operatorFactory輸出類(lèi)型
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);
}
//設(shè)置operatorFactory輸入類(lèi)型
if (operatorFactory.isInputTypeConfigurable()) {
operatorFactory.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
進(jìn)入到addNode, 這個(gè)方法中會(huì)將transform轉(zhuǎn)換成StreamNode,并添加到Nodes列表中
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
//如果StreamNodes中存在該 node 拋出異常
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
//創(chuàng)建一個(gè) StreamNode --> transform 轉(zhuǎn)換成 StreamNode 在轉(zhuǎn)換成Vertex --> JobVertex
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
// 將 StreamNode添加到集合
streamNodes.put(vertexID, vertex);
return vertex;
}
在看一下addEdge方法
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>(),
null,
null);
}
進(jìn)入addEdgeInternal方法,這個(gè)方法是添加Edge的邏輯
在這個(gè)方法中主要的邏輯,判斷上游是否為一下幾種transform ,virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes, 如果是則將他們寫(xiě)入到虛擬節(jié)點(diǎn)中,然后創(chuàng)建StremaEdge,將兩個(gè)StreamNode進(jìn)行連接,并將虛擬節(jié)點(diǎn)的信息寫(xiě)入到StreamEdge中,否則直接將兩個(gè)StreamNode通過(guò)StreamEdge進(jìn)行連接
虛擬節(jié)點(diǎn)
這幾類(lèi)的transform:irtualSideOutputNodes映屋,virtualSelectNodes和virtualPartitionNodes,這幾類(lèi)transform會(huì)被處理成虛擬節(jié)點(diǎn), 虛擬節(jié)點(diǎn)是什么時(shí)候生成的,是在對(duì)不同的transform處理邏輯中生成的,在這幾種的transform處理邏輯中會(huì)將transform添加到虛擬節(jié)點(diǎn), 嚴(yán)格的說(shuō)虛擬節(jié)點(diǎn)并不屬于StreamNode,不包含邏輯邏輯轉(zhuǎn)換,虛擬節(jié)點(diǎn)不會(huì)出現(xiàn)在StreamGraph圖中,在進(jìn)行edge過(guò)程中,會(huì)判斷上游是否為虛擬節(jié)點(diǎn),這里是通過(guò)遞歸的方式去獲取上游節(jié)點(diǎn)信息,直到找到非虛擬節(jié)點(diǎn)即StreamNode,會(huì)執(zhí)行edge邏輯,并將虛擬節(jié)點(diǎn)的信息記錄在StreamEdge中
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
/**
* todo virtualSideOutputNodes,virtualSelectNodes,virtualPartitionNodes
* 這幾類(lèi)transform都會(huì)被處理成虛擬節(jié)點(diǎn),當(dāng)下游生成StreamNode后,發(fā)現(xiàn)上游為虛擬節(jié)點(diǎn)
* 會(huì)找到虛擬節(jié)點(diǎn)的上游,并創(chuàng)建StreamEdge與虛擬節(jié)點(diǎn)上游的transform進(jìn)行連接.并把
* 虛擬節(jié)點(diǎn)的信息寫(xiě)入到StreamEdge中
*/
// 當(dāng)上有是 SideOutput, 遞歸調(diào)用,并傳入SideOutput信息, 下面同理
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
// 獲取上下游 StreamNode
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
// 如果沒(méi)有指定分區(qū)器,并且上游和下游操作符的并行度匹配使用forward同蜻,則使用rebalance
// 上游并行度 == 下游并行度
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
// 使用 ForwardPartitioner 分區(qū)
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) { //否則使用 該分區(qū)
partitioner = new RebalancePartitioner<Object>();
}
//進(jìn)行判斷 如果使用ForwardPartitioner分區(qū) 并且上下游并行度不相等 則不能使用該分區(qū)策略,會(huì)拋出異常
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
// 如果沒(méi)有指定shuffle模式 則由框架決定shuffle模式
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
// 創(chuàng)建一個(gè)新的StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
// 連接剛剛創(chuàng)建的edge到上下游節(jié)點(diǎn)
// getStreamNode 獲取到指定id的StreamNode
// addOutEdge和 addInEdge 將edge添加到獲取的StreamNode中
// 在 SteamNode類(lèi)中, 由兩個(gè)list 用于收集此egde
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
transformSource方法
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
// todo 如果用戶指定了組名棚点,則按原樣執(zhí)行。如果沒(méi)有指定任何內(nèi)容,
// 并且輸入操作都具有相同的組名稱(chēng)湾蔓,則使用此名稱(chēng)瘫析。否則,選擇默認(rèn)組。
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
// 為StreamGraph 添加Source
streamGraph.addSource(source.getId(), //每個(gè)transform都會(huì)獲得一個(gè)遞增的id, 該id在后面會(huì)轉(zhuǎn)換成 vertexID
slotSharingGroup,
source.getCoLocationGroupKey(),
source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
//獲取設(shè)置并行度
int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
source.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(source.getId(), parallelism);
streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
進(jìn)入到addSource方法,這個(gè)方法做了兩件事,
一個(gè)是將Source生成StreamNode添加到StreamNodes列表
另一個(gè)是將soutce添加到了StreamGraph的sources(為set集合)列表中
可以看到addSource中并沒(méi)有生成edge 因?yàn)閟ource屬于圖中的頂點(diǎn),所以只會(huì)通過(guò)下游去連接source節(jié)點(diǎn)
public <IN, OUT> void addSource(
Integer vertexID, // transform的id
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
SourceOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
// 方法會(huì)將 operator(transform) 轉(zhuǎn)換成StreamNode,并添加到 StreamNodes列表中
addOperator(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
inTypeInfo,
outTypeInfo,
operatorName,
SourceOperatorStreamTask.class);
// 將source(operator -> transform)的Id添加的sources的set集合中
// todo source有可能多個(gè) set保證了不會(huì)重復(fù)
sources.add(vertexID);
}
通過(guò)下面代碼,看一下轉(zhuǎn)換的過(guò)程
通過(guò)Debug斷點(diǎn)到構(gòu)建StreamGraph得時(shí)候,可以看到transformations列表存有四個(gè)transform,為map,window,process以及sink的transform,每個(gè)transform的input都指向上游的transform,其中source,keyBy的transform并沒(méi)有添加到transformations列表中,等下會(huì)進(jìn)行解釋
在上面我們發(fā)現(xiàn)沒(méi)有有些算子的transform并沒(méi)有添加到tranfromtions列表中現(xiàn)在解釋一下
source, 在創(chuàng)建source的時(shí)候,會(huì)創(chuàng)建一個(gè)sourceOperator,根據(jù)sourceOperator創(chuàng)建DataStreamSouce,在DataStramSource中會(huì)將sourceOperator構(gòu)建成SourceTransformation,最終構(gòu)建到DataStream,將transform賦值給DataStream的成員變量transform,在下游算子中,會(huì)根據(jù)成員變量的transform構(gòu)建自己的transform,該transform的input就是source,最終將構(gòu)建好的transform添加到env的transformations列表中
keyBy, 在keyBy中創(chuàng)建KeyedStream中,最終將keySelector賦值給了 keyedStream的變量中, KeySelector不會(huì)被添加到env的transformations中, 會(huì)將信息記錄在下游的算子中
keyBy算子,會(huì)創(chuàng)建一個(gè)keyedStream,并將keyBy傳入的值轉(zhuǎn)換成keyedSelector,賦值給keyedStream對(duì)象的成員變量中,在創(chuàng)建KyedStream的時(shí)候會(huì)創(chuàng)建一個(gè)PartitionTransformation,該transform會(huì)被賦值到DataStream的transformation變量中,在調(diào)用timeWindow的時(shí)候會(huì)將WindowAssigner以及this(即keyedStream)傳入創(chuàng)建WindowedStream,在aggregate算子中,會(huì)創(chuàng)建一個(gè)windowOperator,通過(guò)input.transform,將windowOperator轉(zhuǎn)換transform,添加到env的transformations中
keyBy會(huì)生成一個(gè) PartitionTransformation的 transform, 但是并不會(huì)添加到env的transformations的列表中,而是將信息記錄在了下游的transform中即window中,最終將window生成一個(gè)transform添加到列表
window生成 通過(guò)windowAssigner,keySelector,trigger和evictor等參數(shù)構(gòu)建windowOperator,最終通過(guò)transform方法轉(zhuǎn)換成一個(gè)transform并添加到env的transformations列表中,
在window中會(huì)包含他的上游input(即 keyBy<PartitionTransformation>)
當(dāng)所有transform都遍歷完,這時(shí)候構(gòu)建出StreamGraph,可以看到StreamNode中,已經(jīng)存在了除了keyBy的所有StreamNode,到這里StreamGraph就已經(jīng)構(gòu)建好了