1夭问、 上次回顧 0:10 ~0:18
2屯烦、 本次大綱 0:18 ~0:22
4.1 Flink 編程套路 0:23 ~ 0:38
4.2 Clifrontend 提交分析 0:38
src/main/flink-bin/bin/flink
最后一行
exec JVM_ARGS {log_setting[@]}" -classpath "manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"
" org.apache.flink.client.cli.CliFrontend "$@"
示例程序:src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
啟動類: org.apache.flink.client.cli.CliFrontend
啟動 main 方法
CliFrontend#main()
▼
* 注釋: 運行
* 解析命令行并并開始請求操作
* flink run calss1
* args【0】 = run
*/
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
——》 CliFrontend#parseParameters()
▼
switch (action) {
case ACTION_RUN:
run(params);
——》 CliFrontend#run()
▼
final PackagedProgram program =
getPackagedProgram(programOptions);
final List<URL> jobJars = program.getJobJarAndDependencies();
final Configuration effectiveConfiguration = getEffectiveConfiguration(
........
executeProgram(effectiveConfiguration, program);
——》 CliFrontend# executeProgram()
——》ClientUtils#executeProgram()
——》 PackagedProgram#invokeInteractiveModeForExecution
——》PackagedProgram#callMainMethod // 反射調用用戶程序的main方法
▼
//這時就去到我們自己編寫的wordcount應用程序的main方法
mainMethod.invoke(null, (Object) args);
---------------- 到此為止提交任務完成,轉入用戶編寫的 任務解析階段-------
4 ExecutionEnvironment 源碼解析 0:41~ 0:50
5碾局、 Job 提交流程源碼分析 0:50 ~1:50
示例:
src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
main()
▼
1、source 部分
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
——》
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
——》 StreamExecutionEnvironment#addSource
▼
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
..................
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator,
▲
2、 回到main方法看flatmap
text .flatMap(new FlatMapFunction<String, WordWithCount>()
——》DataStream#flatMap()
return transform("Flat Map", outputType, new StreamFlatMap<>
——》DataStream#doTransform
getExecutionEnvironment().addOperator(resultTransform);
——》DataStream#addOperator
this.transformations.add(transformation);//增加算子
其他的keyby 算子類似
3洒放、回到main方法看 executor 提交代碼
env.execute("Socket Window WordCount");
——》 StreamExecutionEnvironment#execute
return 2 execute(1 getStreamGraph(jobName) );
第1步:getStreamGraph(jobName) 生成 StreamGraph 解析
——》 StreamExecutionEnvironment#getStreamGraph
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
第2步:execute(StreamGraph) 解析
4.6. Flink Graph 演變 0:58 ~ 1:20
Flink Graph介紹 :
Flink 的一個 Job,最終滨砍,歸根結底往湿,還是構建一個高效率的能用于分布式并行執(zhí)行的 DAG 執(zhí)行圖。
1惋戏、幫我們把上下游兩個相鄰算子如果能chain到一起领追,則chain到一起做優(yōu)化
2、chain到一起的多個Operator就會組成一個OperatorChain响逢,當peratorChain執(zhí)行的時候绒窑,到底要執(zhí)行多少個 Task,則就需要把 DAG 進行并行化變成實實在在的Task來調度執(zhí)行
最開始:
dataStream.xx1().xxx2().xxx3().....xxxn();
evn.execute();
到最后:
List<StreamTask> 執(zhí)行(不同的StreamTask(StreamTask內部邏輯計算操作不一樣))
總結要點
相鄰兩個階段之間的StreamTask是有關系的舔亭。(到底哪些上游StreamTask生產數(shù)據(jù)給下游消費StreamTask)Shuffle關系些膨!
一個 Flink 流式作業(yè)蟀俊,從 Client 提交到 Flink 集群,到最后執(zhí)行订雾,總共會經(jīng)歷四種不同的狀態(tài)肢预。總的來說:
1葬燎、Client 首先根據(jù)用戶編寫的代碼生成 StreamGraph误甚,然后把 StreamGraph 構建成 JobGraph 提交給 Flink 集群主節(jié)點
2、然后啟動的 JobMaster 在接收到 JobGraph 后谱净,會對其進行并行化生成 ExecutionGraph 后調度啟動 StreamTask 執(zhí)行窑邦。
3、StreamTask 并行化的運行在 Flink 集群中的壕探,就是最終的物理執(zhí)行圖狀態(tài)結構冈钦。
Flink 中的執(zhí)行圖可以分成四層:
StreamGraph ==> JobGraph ==> ExecutionGraph ==> 物理執(zhí)行圖。
StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖李请。用來表示程序的拓撲結構瞧筛。
JobGraph:StreamGraph 經(jīng)過優(yōu)化后生成了 JobGraph,提交給 jobManager 的數(shù)據(jù)結構导盅。主要的優(yōu)化為较幌,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化反序列化傳輸消耗白翻。
ExecutionGraph:JobManager 根據(jù) JobGraph 生成 ExecutionGraph乍炉。ExecutionGraph 是JobGraph 的并行化版本,是調度層最核心的數(shù)據(jù)結構滤馍。
物理執(zhí)行圖:
JobManager 根據(jù) ExecutionGraph 對 Job 進行調度后岛琼,在各個 TaskManager 上部署Task 后形成的圖,并不是一個具體的數(shù)據(jù)結構巢株。
關于這四層之間的演變槐瑞,請看下圖:
![image.png](https://upload-images.jianshu.io/upload_images/11332520-04e493470a374efb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
StreamGraph 生成 和 execute 執(zhí)行 解析 1:20 ~ 1:50
第1步:getStreamGraph(jobName) 生成 StreamGraph 解析
4、再回到main方法看 executor 提交代碼
env.execute("Socket Window WordCount");
——0 》 StreamExecutionEnvironment#execute
return 2 execute(1 getStreamGraph(jobName) );
------------------▼▼▼▼第1步: 生成 StreamGraph 開始▼▼▼▼ ------------
——》 StreamExecutionEnvironment#getStreamGraph
StreamGraph streamGraph =
getStreamGraphGenerator().setJobName(jobName).generate();
—— 》 StreamExecutionEnvironment#generate
—— 》 StreamGraphGenerator#transform()
——1 》 StreamGraphGenerator#transformOneInputTransform()
▼
streamGraph.addOperator(transform.getId(),
——》StreamGraph#addOperator
addNode(vertexID, slotSharingGroup, coLocationGroup
——》StreamGraph#addNode
StreamNode vertex = new StreamNode(
——》StreamNode #構造函數(shù)
回到 ——1 》 StreamGraphGenerator#transformOneInputTransform
▼
streamGraph.addEdge(inputId, transform.getId(), 0);
——》StreamGraph#addEdge
——》StreamGraph#addEdgeInternal
------------------ ▲▲▲▲第1步: 生成 StreamGraph 結束 ▲▲▲------------
第2步:execute(StreamGraph) 解析
回到 ——0 》 StreamExecutionEnvironment#execute()
return 2 execute(1 getStreamGraph(jobName) );
------------------▼▼▼▼第2步:execute (StreamGraph) 開始▼▼▼▼ ------------
——》 StreamExecutionEnvironment# execute(StreamGraph streamGraph)
final JobClient jobClient = executeAsync(streamGraph);
——》 StreamExecutionEnvironment# executeAsync
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
↓
——2 》AbstractSessionClusterExecutor#execute()
▼
//***** 8蟀困檩!從 StreamGraph(pipeline) 得到jobGraph (在下面第2.5 節(jié) 詳細說明)
2:》 final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline,
.......
return clusterClient .submitJob(jobGraph) 1:38
↓
RestClusterClient#submitJob
▼
* 注釋: 發(fā)送請求
* requestFuture.thenCompos 的參數(shù)函數(shù)的參數(shù),是 requestFuture 的返回結果那槽,就是 Tuple2 * 補充:thenCompose 的參數(shù)為一個返回 CompletableFuture 實例的函數(shù)窗看,該函數(shù)的參數(shù)是先前計算步驟的結果。 */
final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance()
——》RestClusterClient#sendRetriableRequest
* 注釋: restClient = RestClient
return restClient.sendRequest(webMonitorBaseUrl.getHost(),
——》RestClusterClient#sendRequest()
return submitRequest(targetAddress, targetPort, httpRequest, responseType);
——》RestClusterClient#submitRequest()
* 注釋: 通過 Netty 客戶端發(fā)送請求給 Netty 服務端
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
* 注釋: 發(fā)送請求 到 JobManager 的 Netty 服務端
httpRequest.writeTo(channel);
------------------ ▲▲▲▲第2步: execute (StreamGraph) 結束 ▲▲▲------------
最終通過 channel 把請求數(shù)據(jù)倦炒,發(fā)給 WebMonitorEndpoint 中的 JobSubmitHandler 來執(zhí)行處理显沈。
第2.5步 由 StreamGraph 生成 jobgraph 分析 2:01~
回到 ——2 》AbstractSessionClusterExecutor#execute()
2:》 final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline,
——》PipelineExecutorUtils#getJobGraph()
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline,
——》FlinkPipelineTranslationUtil#getJobGraph()
↓
StreamGraphTranslator#getJobGraph()
——》streamGraph#getJobGraph
——》 StreamingJobGraphGenerator#createJobGraph
——》 StreamingJobGraphGenerator# setChaining
——》 StreamingJobGraphGenerator# createChain //創(chuàng)建job圖的遞歸方法
2:20
//判斷可否優(yōu)化job isChainable
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
——》StreamingJobGraphGenerator# isChainable()
//九大條件判斷
return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
// 1、下游節(jié)點的入度為1 (也就是說下游節(jié)點沒有來自其他節(jié)點的輸入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游節(jié)點都在同一個 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3拉讯、前后算子不為空
!(downStreamOperator == null || upStreamOperator == null);
// 4涤浇、上游節(jié)點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接魔慷,Source 默認
是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5只锭、下游節(jié)點的 chain 策略為 ALWAYS(可以與上下游鏈接,map院尔、flatmap蜻展、filter 等默認是
ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、兩個節(jié)點間物理分區(qū)邏輯是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7邀摆、兩個算子間的shuffle方式不等于批處理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8纵顾、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用戶沒有禁用 chain
streamGraph.isChainingEnabled();
4.7. WebMonitorEndpoint 處理 RestClient 的JobSubmit 請求 2:35~ 2:52
最終處理這個請求: JobSubmitHandler 來處理栋盹!
核心入口
——》 JobSubmitHandler.handleRequest();
▼
// TODO_MA 注釋: 提交任務
// TODO_MA 注釋: gateway = Dispatcher
jobGraph -> gateway.submitJob(jobGraph, timeout)
↓
Dispatcher#submitJob
——》 Dispatcher#internalSubmitJob
▼
- 注釋: 提交執(zhí)行
final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager( // TODO_MA 注釋: 持久和提交
jobGraph.getJobID(), jobGraph, this::persistAndRunJob).thenApply(ignored -> Acknowledge.get()
——》 Dispatcher#persistAndRunJob
* 注釋: 運行 job
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
——3 》 Dispatcher# runJob
▼
注釋: 客戶端正常提交一個 job 的時候施逾,
最終由 集群主節(jié)點中的 Dispatcher 接收到來繼續(xù)提交執(zhí)行
// * 第一 1 注釋: 創(chuàng)建 JobManagerRunner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
// *第二2 提交任務 == start JobManagerRunner
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
——1 》 Dispatcher#createJobManagerRunner
↓
DefaultJobManagerRunnerFactory#createJobManagerRunner
▼
* 注釋: 返回 JobManagerRunnerImpl
* 負責啟動 JobMaster
*/
return new JobManagerRunnerImpl(
↓
JobManagerRunnerImpl#構造函數(shù)()
* 注釋: 啟動 JobMaster
* jobMasterService = JobMaster 實例 */
this.jobMasterService = jobMasterFactory.createJobMasterService
↓
DefaultJobManagerRunnerFactory#createJobMasterService
* 注釋: 生成和啟動一個 JobMaster
*/
return new JobMaster(
——2 》 JobMaster#構造函數(shù)
// TODO_MA 注釋: defaultScheduler
this.schedulerNG = createScheduler(jobManagerJobMetricGroup); 2:49
↓
DefaultJobManagerRunnerFactory#createInstance()
注釋: 返回一個 DefaultScheduler
*/
return new DefaultScheduler(log, jobGraph,
—— 》 DefaultScheduler#構造函數(shù)
super#構造函數(shù)
—— 》 SchedulerBase#構造函數(shù)
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup,
————******** 此階段END ********————--
4.8. ExecutionGraph 構建和提交源碼解析 2:52 ~ 3:31
入口
—— 》 SchedulerBase#createAndRestoreExecutionGraph()
—— 》 SchedulerBase#createExecutionGraph()
—— 》 ExecutionGraphBuilder#buildGraph()
▼
//先構建一個空的ExecutionGraph
try { executionGraph = (prior != null) ? prior :
new ExecutionGraph(
//1、先 根據(jù)jobGraph 生成JsonPlan
//2例获、 JsonPlan 設置到 executionGraph
try {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph
......
executionGraph.attachJobGraph(sortedTopology);
——》ExecutionGraph #attachJobGraph()
▼
ejv.connectToPredecessors(this.intermediateResults);
——》ExecutionJobVertex#connectToPredecessors
▼
/* ExecutionVertex 一個電影真正執(zhí)行的StremTask
* 一個 StremTask 得到1個slot
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i]; 3:09
回到 ——3 》 Dispatcher# runJob():
// TODO_MA 注釋: 提交任務 == start JobManagerRunner FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
——》 Dispatcher# startJobManagerRunner()
* 注釋: 啟動 jobManagerRunner
*/
jobManagerRunner.start();
↓
JobManagerRunnerImpl#.start();
▼
- 注釋:ZooKeeperLeaderElectionService = leaderElectionService
leaderElectionService.start(this);
↓//開始選舉啟動同上一章
ZooKeeperLeaderElectionService.start
↓
ZooKeeperLeaderElectionService#isLeader()
▼
leaderContender.grantLeadership(issuedLeaderSessionID);
↓
JobManagerRunnerImpl.grantLeadership
▼-
注釋: 調度 并啟動 JobManager
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
——》JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobMan
▼
* 注釋: 啟動 JobMaster
return startJobMaster(leaderSessionId);
——》JobManagerRunnerImpl#startJobMaster
* 注釋: 啟動 JobMaster
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
↓
JobMaster#start()
▼
//1 注釋: 確保 RPC 工作正常
start();
//2 注釋: 執(zhí)行 JobGragh
return callAsyncWithoutFencing(() -> startJobExecution
——》JobMaster#startJobExecution()
▼
/ *1 注釋: 初始化一些必要的服務組件 jobmaster的注冊和心跳 */
startJobMasterServices();/* 2 注釋: 開始調度執(zhí)行 接下來進入 Slot 管理(申請和釋放)
resetAndStartScheduler(); 3:31
-