上一篇文章我們講解了通過命令行將一個(gè)Job提交到TaskManager的整體過程灯帮,但是我們中間忽略了一些細(xì)節(jié),比如Job提交到集群的哪些節(jié)點(diǎn)做瞪,JobGraph是什么契讲,它是如何生成的?JobClient又是如何將Job提交到集群中的等等缰犁,本文會(huì)為你一一解決這些問題淳地。
Flink運(yùn)行時(shí)環(huán)境
Flink運(yùn)行時(shí)主要包含兩種類型的處理器:
- JobManager: 主要負(fù)責(zé)協(xié)調(diào)分布式執(zhí)行怖糊。調(diào)度任務(wù),協(xié)調(diào)Checkpoint颇象,協(xié)調(diào)故障時(shí)容錯(cuò)功能等伍伤。
- TaskManager: 執(zhí)行數(shù)據(jù)流的Task(或更具體地說,子任務(wù))遣钳,并緩沖和交換數(shù)據(jù)流扰魂。
根據(jù)JobManager和TaskManager的分工和名稱,應(yīng)該可以很顯然的看出JobClient提交Job到JobManager節(jié)點(diǎn)上蕴茴,并通過它將子任務(wù)分配到TaskManager上執(zhí)行劝评。
交互模式提交Job
在通過命令行提交Job時(shí),會(huì)調(diào)用CluterClient的run方法去執(zhí)行提交邏輯倦淀,而且分為兩種方式蒋畜,交互模式和非交互模式:
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) { // 如果包含入口類(非交互模式提交Job)
// JobWithJars是一個(gè)Flink數(shù)據(jù)流計(jì)劃,包含了jar中所有的類晃听,以及用于加載用戶代碼的ClassLoader
final JobWithJars jobWithJars;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
jobWithJars = prog.getPlanWithoutJars();
} else {
jobWithJars = prog.getPlanWithJars();
}
return run(jobWithJars, parallelism, prog.getSavepointSettings());
} else if (prog.isUsingInteractiveMode()) { // 使用交互模式提交Job
log.info("Starting program in interactive mode");
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
libraries = Collections.emptyList();
} else {
libraries = prog.getAllLibraries();
}
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
prog.getSavepointSettings());
ContextEnvironment.setAsContext(factory);
try {
// 調(diào)用main方法
prog.invokeInteractiveModeForExecution();
if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
throw new ProgramMissingJobException("The program didn't contain a Flink job.");
}
if (isDetached()) {
// in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
} else {
// in blocking mode, we execute all Flink jobs contained in the user code and then return here
return this.lastJobExecutionResult;
}
} finally {
ContextEnvironment.unsetContext();
}
} else {
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
}
而實(shí)際中百侧,大家可能都是采用交互模式提交作業(yè),在提交的jar包中包含mainClass能扒。以Flink的流處理示例WordCount為例:
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<String> text;
if (params.has("input")) {
// read the text file from given input path
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// get default test text data
text = env.fromElements(WordCountData.WORDS);
}
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// execute program
env.execute("Streaming WordCount");
}
ClusterClient中的prog.invokeInteractiveModeForExecution();
其實(shí)就是調(diào)用WordCount的main
方法佣渴。main方法的邏輯很簡(jiǎn)單,分為兩部分:構(gòu)建和執(zhí)行數(shù)據(jù)流初斑。本節(jié)重點(diǎn)講執(zhí)行數(shù)據(jù)流辛润,也就是最后一行的env.execute("Streaming WordCount");
。
以本地流執(zhí)行環(huán)境(LocalStreamEnvironment)來看一下execute
方法執(zhí)行了哪些邏輯
@Override
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
// 生成流圖
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
// 將流圖轉(zhuǎn)換成作業(yè)圖
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
// add (and override) the settings with what the user defined
configuration.addAll(this.conf);
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
try {
exec.start();
// 提交作業(yè)圖
return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
}
finally {
transformations.clear();
exec.stop();
}
}
可以看出主要分為三步:生成流圖见秤,生成作業(yè)圖和提交Job砂竖。首先看下提交Job的邏輯
@throws(classOf[JobExecutionException])
def submitJobAndWait(
jobGraph: JobGraph,
printUpdates: Boolean)
: JobExecutionResult = {
submitJobAndWait(jobGraph, printUpdates, timeout)
}
@throws(classOf[JobExecutionException])
def submitJobAndWait(
jobGraph: JobGraph,
printUpdates: Boolean,
timeout: FiniteDuration)
: JobExecutionResult = {
val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
val userCodeClassLoader =
try {
createUserCodeClassLoader(
jobGraph.getUserJars,
jobGraph.getClasspaths,
Thread.currentThread().getContextClassLoader)
} catch {
case e: Exception => throw new JobExecutionException(
jobGraph.getJobID,
"Could not create the user code class loader.",
e)
}
try {
JobClient.submitJobAndWait(
clientActorSystem,
configuration,
highAvailabilityServices,
jobGraph,
timeout,
printUpdates,
userCodeClassLoader)
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
shutdownJobClientActorSystem(clientActorSystem)
}
}
}
通過執(zhí)行鏈,可以看出最終還是會(huì)通過上文描述過的JobClient.submitJobAndWait(...)
方法提交作業(yè)鹃答,這里不再贅述乎澄。JobClient會(huì)啟動(dòng)一個(gè)Actor System,雖然它不是Flink運(yùn)行時(shí)的一部分测摔,但是它可以斷開連接置济,或者保持連接以接收進(jìn)度報(bào)告。一個(gè)整體的Job提交圖如下所示:
上面講了提交作業(yè)的三步锋八,第一和第二步分別是生成流圖和作業(yè)圖浙于,下面我們分別看下流圖和作業(yè)圖 。
流圖
StreamGraph(流圖)是用來表示流的拓補(bǔ)結(jié)構(gòu)的數(shù)據(jù)結(jié)構(gòu)挟纱,它包含了生成JobGraph的必要信息羞酗。
流圖是由節(jié)點(diǎn)和邊組成的,分別對(duì)應(yīng)數(shù)據(jù)結(jié)構(gòu)StreamNode和StreamEdge紊服。一個(gè)StreamGraph可能如下圖所示:
下面我們看下StreamGraph是如何創(chuàng)建的檀轨,即
getStreamGraph()
方法的邏輯胸竞。
@Internal
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
/**
* This starts the actual transformation, beginning from the sinks.
*/
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
/**
* Transforms one {@code StreamTransformation}.
*
* <p>This checks whether we already transformed it and exits early in that case. If not it
* delegates to one of the transformation specific methods.
*/
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
可以看出,核心的邏輯在transform(StreamTransformation<?> transform)
方法中裤园,可能大家疑惑StreamTransformation是什么撤师?StreamTransformation是DataStream創(chuàng)建操作的描述信息剂府,每一個(gè)DataStream底層都有一個(gè)StreamTransformation拧揽,它是DataStream的原始信息。通過StreamTransformation就可以構(gòu)建一副整體的StreamGraph腺占。以O(shè)neInputTransformation為例淤袜,看下是如何進(jìn)行transform
操作的。
/**
* Transforms a {@code OneInputTransformation}.
*
* <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
* wired the inputs to this new node.
*/
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
// 轉(zhuǎn)換當(dāng)前OneInputTransformation的輸入StreamTransformation
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
//添加 StreamGraph 節(jié)點(diǎn)
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
// 添加 StreamGraph 邊
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
邏輯很清晰衰伯,解析當(dāng)前OneInputTransformation的輸入StreamTransformation铡羡,根據(jù)OneInputTransformation的operator等信息構(gòu)建StreamNode,然后根據(jù)解析的輸入StreamTransformation的Id意鲸,構(gòu)建StreamEdge烦周。
在創(chuàng)建Stream,以及生成StreamGraph的過程中怎顾,涉及到較多的數(shù)據(jù)結(jié)構(gòu)以及層次關(guān)系读慎,以上述的WordCount示例中,通過text.flatMap(new Tokenizer())
創(chuàng)建的流為例槐雾,具體的數(shù)據(jù)結(jié)構(gòu)和層次如下圖所示:
作業(yè)圖
作業(yè)圖(JobGraph)是唯一被Flink的數(shù)據(jù)流引擎所識(shí)別的表述作業(yè)的數(shù)據(jù)結(jié)構(gòu)夭委,也正是這一共同的抽象體現(xiàn)了流處理和批處理在運(yùn)行時(shí)的統(tǒng)一。
相比流圖(StreamGraph)以及批處理優(yōu)化計(jì)劃(OptimizedPlan)募强,JobGraph發(fā)生了一些變化株灸,已經(jīng)不完全是“靜態(tài)”的數(shù)據(jù)結(jié)構(gòu)了,因?yàn)樗尤肓酥虚g數(shù)據(jù)集(IntermediateDataSet)這一“動(dòng)態(tài)”概念擎值。
作業(yè)頂點(diǎn)(JobVertex)慌烧、中間數(shù)據(jù)集(IntermediateDataSet)、作業(yè)邊(JobEdge)是組成JobGraph的基本元素鸠儿。這三個(gè)對(duì)象彼此之間互為依賴:
- 一個(gè)JobVertex關(guān)聯(lián)著若干個(gè)JobEdge作為輸入端以及若干個(gè)IntermediateDataSet作為其生產(chǎn)的結(jié)果集屹蚊;
- 一個(gè)IntermediateDataSet關(guān)聯(lián)著一個(gè)JobVertex作為生產(chǎn)者以及若干個(gè)JobEdge作為消費(fèi)者;
- 一個(gè)JobEdge關(guān)聯(lián)著一個(gè)IntermediateDataSet可認(rèn)為是源以及一個(gè)JobVertex可認(rèn)為是目標(biāo)消費(fèi)者捆交;
因此一個(gè)JobGraph可能的如下圖所示: