例子
如SocketWindowWordCount例子為例譬猫,分析Graph的構(gòu)建過程撇眯;
public static void main(String[] args) throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts =
text.flatMap(
new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(
String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
/**
* han_pf
* keyby,window不會往transformations list中裝入transformation捎琐,reduce才會
*/
.reduce(
new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
如代碼,整個(gè)算子鏈執(zhí)行流程如下圖所示:
其中在算子構(gòu)成過程中否彩,當(dāng)前transformation創(chuàng)建時(shí)會以上游算子作為輸入(input);而對于某些只是改變數(shù)據(jù)流類型的算子對應(yīng)的transformation不會添加到transformations集合中嗦随,這種算子只是對流的類型作了改變列荔,而沒有具體的業(yè)務(wù)處理。在生成streamNode過程中會生成virtualPartitionNodes
;如下圖枚尼,只有flatMap贴浙、reduce、sink對應(yīng)的transformation才會加入transformations集合中署恍。
StreamGraph的生成
如上執(zhí)行execute()方法崎溃,會調(diào)用StreamExecutionEnvironment.getStreamGraph()方法,然后調(diào)用StreamGraphGenerator.generate()方法盯质,執(zhí)行生成過程.
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
/**
* han_pf
* 重點(diǎn)看
*/
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
return execute(streamGraph);
}
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
StreamGraphGenerator
首先笨奠,會對之前添加到transformations集合中的transformation進(jìn)行遍歷,并且執(zhí)行轉(zhuǎn)換唤殴,具體轉(zhuǎn)換過程如下般婆,會從初始化的translatorMap中根據(jù)transformation類型獲取對應(yīng)的translator進(jìn)行算子轉(zhuǎn)換。
整個(gè)執(zhí)行流程:
translatorMap
static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
}
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
/**
* han_pf
* 遍歷之前設(shè)置進(jìn)去的transformations
*/
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
private Collection<Integer> transform(Transformation<?> transform) {
//省略部分代碼
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
/**
* han_pf
* 根據(jù)算子類型調(diào)用不同的translator進(jìn)行轉(zhuǎn)換
*/
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
核心的translate(translator,transform)方法
首先朵逝,在遍歷transformations集合中的transformation時(shí)會獲取當(dāng)前transformation的input算子蔚袍,見getParentInputs(transform.getInputs())方法,會進(jìn)行遞歸調(diào)用transform()
方法配名,將上游transformation轉(zhuǎn)換完成啤咽。
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform) {
checkNotNull(translator);
checkNotNull(transform);
/**
* han_pf
* 涉及遞歸調(diào)用當(dāng)前算子 input算子的解析
*/
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
/**
* han_pf
* 判斷當(dāng)前算子是否設(shè)置了slotSharingGroup,默認(rèn)名稱是default渠脉,會以此判斷是否共享slot;
* 不同task的subtask之間可以共享slot宇整,
*/
final String slotSharingGroup =
determineSlotSharingGroup(
transform.getSlotSharingGroup().isPresent()
? transform.getSlotSharingGroup().get().getName()
: null,
allInputIds.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
final TransformationTranslator.Context context =
new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);
}
private List<Collection<Integer>> getParentInputIds(
@Nullable final Collection<Transformation<?>> parentTransformations) {
final List<Collection<Integer>> allInputIds = new ArrayList<>();
if (parentTransformations == null) {
return allInputIds;
}
for (Transformation<?> transformation : parentTransformations) {
allInputIds.add(transform(transformation));
}
return allInputIds;
}
SimpleTransformationTranslator
@Override
public final Collection<Integer> translateForStreaming(
final T transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final Collection<Integer> transformedIds =
translateForStreamingInternal(transformation, context);
configure(transformation, context);
return transformedIds;
translateForStreamingInternal
會調(diào)用對應(yīng)類型的translator進(jìn)行轉(zhuǎn)化,對于類似keyby等生成的partionTransformation會調(diào)用PartitionTransformationTranslator進(jìn)行解析會將這種算子生成VirtualPartitionNode,其他的如OneInputTransformationTranslator會將transformation轉(zhuǎn)換成StreamNode芋膘,代碼如下:
PartitionTransformationTranslator
public class PartitionTransformationTranslator<OUT>
extends SimpleTransformationTranslator<OUT, PartitionTransformation<OUT>> {
@Override
protected Collection<Integer> translateForBatchInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
return translateInternal(transformation, context);
}
@Override
protected Collection<Integer> translateForStreamingInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
return translateInternal(transformation, context);
}
private Collection<Integer> translateInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
final Transformation<?> input = parentTransformations.get(0);
List<Integer> resultIds = new ArrayList<>();
for (Integer inputId : context.getStreamNodeIds(input)) {
final int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualPartitionNode(
inputId,
virtualId,
transformation.getPartitioner(),
transformation.getExchangeMode());
resultIds.add(virtualId);
}
return resultIds;
}
}
以上對于StreamGraph生成整體流程進(jìn)行介紹鳞青,后續(xù)再對生成過程中的細(xì)節(jié),如streamNode及edge的生成進(jìn)行介紹为朋。