Flink是如何調(diào)度Job的,以及如何在JobManager上表現(xiàn)并跟蹤Job狀態(tài)
調(diào)度
Flink 通過任務(wù)槽(Task slot)定義執(zhí)行資源实苞,每個 TaskManager 都有一或多個任務(wù)槽,每個任務(wù)槽都可以運行一個并行任務(wù)流州藕,一個 pipeline 包括多個連續(xù)的任務(wù),例如一個 MapFunction 的第n個并行實例與一個 ReduceFunction 的第n個并行實例的連續(xù)任務(wù)。Flink 通常會并發(fā)執(zhí)行連續(xù)的任務(wù)涛癌,對于流式程序來說,任何情況都如此執(zhí)行送火;而對于批處理程序拳话,多數(shù)情況也如此執(zhí)行。
參照下圖說明种吸,由一個 Data source弃衍、一個 MapFunction 和一個 ReduceFunction 組成的程序,Data source 和 MapFunction 的并發(fā)度都為4骨稿,而 ReduceFunction 的并發(fā)度為3笨鸡。一個 pipeline 由 Source-Map-Reduce 組成姜钳,在具有2個 TaskManager坦冠,每個 TaskManager 有3個 Task slot 的集群上運行,程序執(zhí)行情況下:
說明如下:
- TaskManager 1上哥桥,有2個并行的 ExecutionVertex 組成的DAG圖辙浑,各占用一個 Task slot
- TaskManager 2上,有2個并行的 ExecutionVertex 組成的 DAG 圖拟糕,各占用一個Task Slot
- 在2個 TaskManager 上運行的4個 Execution 是并行執(zhí)行的
Flink 內(nèi)通過 SlotSharingGroup 和 CoLocationGroup 來定義哪些任務(wù)共享一個 Task slot判呕,哪些任務(wù)必須嚴格的放到一個 Task slot中倦踢。
JobManager數(shù)據(jù)結(jié)構(gòu)
作業(yè)執(zhí)行期間,JobManager 會持續(xù)跟蹤分布式任務(wù)侠草,決定什么時候調(diào)度下一個 Task(或者一組任務(wù))辱挥,并且對已完成的或執(zhí)行失敗的任務(wù)進行響應(yīng)。
JobManager 接收 JobGraph边涕,JobGraph 是數(shù)據(jù)流的表現(xiàn)形式晤碘,包括算子(JobVertex)和中間結(jié)果(IntermediateDataSet)。每個算子都有諸如并行度和執(zhí)行代碼等屬性功蜓。此外园爷,JobGraph 還擁有一些在算子執(zhí)行代碼時所需要的附加庫。
JobManager 將 JobGraph 轉(zhuǎn)換為 ExecutionGraph式撼,ExecutionGraph 是 JobGraph 的并行版本:對每個 JobVertex童社,包含并行子任務(wù)的 ExecutionVertex。一個并行度為100的算子將擁有一個 JobVertex 和100個 ExecutionVertex著隆。ExecutionVertex 會跟蹤特定子任務(wù)的執(zhí)行狀態(tài)扰楼。來自一個 JobVertex 的所有 ExecutionVertex 都由一個 ExecutionJobVertex 管理保存,ExecutionJobVertex 跟蹤算子整體狀態(tài)旅东。除了各個節(jié)點之外灭抑,ExecutionGraph 同樣包括了 IntermediateResult 和 IntermediateResultPartition,前者跟蹤 IntermediateDataSet 的狀態(tài)抵代,后者跟蹤每個分區(qū)的狀態(tài)腾节。
每個 ExecutionGraph 都有一個與其相關(guān)聯(lián)的作業(yè)狀態(tài)。此作業(yè)狀態(tài)指示作業(yè)執(zhí)行的當前狀態(tài)荤牍。
作業(yè)首先處于創(chuàng)建狀態(tài)(created)案腺,然后切換到運行狀態(tài)(running),并且在完成所有工作后康吵,它將切換到完成狀態(tài)(finished)劈榨。在失敗的情況下,作業(yè)首先切換到失敗狀態(tài)(failing)晦嵌,取消所有正在運行任務(wù)同辣。如果所有節(jié)點都已達到最終狀態(tài),并且作業(yè)不可重新啟動惭载,則狀態(tài)將轉(zhuǎn)換為失敽岛(failed)。如果作業(yè)可以重新啟動描滔,那么它將進入重新啟動狀態(tài)(restarting)棒妨。一旦完成重新啟動,它將變成創(chuàng)建狀態(tài)(created)含长。
在用戶取消作業(yè)的情況下券腔,將進入取消狀態(tài)(cancelling)伏穆,會取消所有當前正在運行的任務(wù)。一旦所有運行的任務(wù)已經(jīng)達到最終狀態(tài)纷纫,該作業(yè)將轉(zhuǎn)換到已取消狀態(tài)(canceled)枕扫。
完成狀態(tài)(finished),取消狀態(tài)(canceled)和失敗狀態(tài)(failed)表示一個全局的終結(jié)狀態(tài)辱魁,并且觸發(fā)清理工作铡原,而暫停狀態(tài)(suspended)僅處于本地終止狀態(tài)。意味著作業(yè)的執(zhí)行在相應(yīng)的 JobManager 上終止商叹,但集群的另一個 JobManager 可以從持久的HA存儲中恢復(fù)這個作業(yè)并重新啟動燕刻。因此,處于暫停狀態(tài)的作業(yè)將不會被完全清理剖笙。
在執(zhí)行 ExecutionGraph 期間卵洗,每個并行任務(wù)經(jīng)過多個階段,從創(chuàng)建(created)到完成(finished)或失斆诌洹(failed) 过蹂,下圖說明了它們之間的狀態(tài)和可能的轉(zhuǎn)換。任務(wù)可以執(zhí)行多次(例如故障恢復(fù))聚至。每個 Execution 跟蹤一個 ExecutionVertex 的執(zhí)行酷勺,每個 ExecutionVertex 都有一個當前 Execution(current execution)和一個前驅(qū) Execution(prior execution)。
補充:Flink 執(zhí)行圖
在 Flink 中的執(zhí)行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖
- StreamGraph:Stream API 編寫的代碼生成的最初的圖扳躬。用來表示程序的拓撲結(jié)構(gòu)脆诉。可以調(diào)用 env.getExecutionPlan() 輸出json串贷币,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 可視化該執(zhí)行圖击胜。
- JobGraph:StreamGraph 經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)役纹。主要的優(yōu)化為偶摔,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化/反序列化/傳輸消耗促脉。
- ExecutionGraph:JobManager 根據(jù) JobGraph 生成 ExecutionGraph辰斋。ExecutionGraph 是 JobGraph 的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)瘸味。
- 物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后宫仗,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)硫戈。
四層執(zhí)行圖的演變過程如下圖所示(來源:Flink 原理與實現(xiàn):架構(gòu)和拓撲概覽):
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/job_scheduling.html
http://wuchong.me/blog/2016/05/03/flink-internals-overview/