Flink運(yùn)行時組件
作業(yè)管理器(JobManager)、資源管理器(ResourceManager)尚洽、任務(wù)管理器(TaskManager),
以及分發(fā)器(Dispatcher)波势。
JobManager
控制一個應(yīng)用程序執(zhí)行的主進(jìn)程翎朱,也就是說橄维,每個應(yīng)用程序都會被一個不同的
JobManager 所控制執(zhí)行尺铣。JobManager 會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:
作業(yè)圖(JobGraph)争舞、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類凛忿、庫和其它
資源的 JAR 包。JobManager 會把 JobGraph 轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖竞川,這個圖被叫做
“執(zhí)行圖”(ExecutionGraph)店溢,包含了所有可以并發(fā)執(zhí)行的任務(wù)叁熔。JobManager 會向資源管
理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器(TaskManager)上
的插槽(slot)床牧。一旦它獲取到了足夠的資源荣回,就會將執(zhí)行圖分發(fā)到真正運(yùn)行它們的
TaskManager 上。而在運(yùn)行過程中戈咳,JobManager 會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作心软,比如說檢
查點(diǎn)(checkpoints)的協(xié)調(diào)。
ResourceManager
主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)著蛙,TaskManger 插槽是 Flink 中
定義的處理資源單元删铃。Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如
YARN踏堡、Mesos猎唁、K8s,以及 standalone 部署顷蟆。當(dāng) JobManager 申請插槽資源時诫隅,ResourceManager
會將有空閑插槽的 TaskManager 分配給 JobManager。如果 ResourceManager 沒有足夠的插槽
來滿足 JobManager 的請求慕的,它還可以向資源提供平臺發(fā)起會話阎肝,以提供啟動 TaskManager
進(jìn)程的容器。另外肮街,ResourceManager 還負(fù)責(zé)終止空閑的 TaskManager风题,釋放計算資源。
TaskManager
Flink 中的工作進(jìn)程嫉父。通常在 Flink 中會有多個 TaskManager 運(yùn)行沛硅,每一個 TaskManager
都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了 TaskManager 能夠執(zhí)行的任務(wù)數(shù)量绕辖。
啟動之后摇肌,TaskManager 會向資源管理器注冊它的插槽;收到資源管理器的指令后仪际,
TaskManager 就會將一個或者多個插槽提供給 JobManager 調(diào)用围小。JobManager 就可以向插槽
分配任務(wù)(tasks)來執(zhí)行了。在執(zhí)行過程中树碱,一個 TaskManager 可以跟其它運(yùn)行同一應(yīng)用程
序的 TaskManager 交換數(shù)據(jù)肯适。
Dispatcher
可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了 REST 接口成榜。當(dāng)一個應(yīng)用被提交執(zhí)行時框舔,分發(fā)器
就會啟動并將應(yīng)用移交給一個 JobManager。由于是 REST 接口,所以 Dispatcher 可以作為集
群的一個 HTTP 接入點(diǎn)刘绣,這樣就能夠不受防火墻阻擋樱溉。Dispatcher 也會啟動一個 Web UI,用
來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息纬凤。Dispatcher 在架構(gòu)中可能并不是必需的福贞,這取決于應(yīng)
用提交運(yùn)行的方式
Flink任務(wù)提交流程
Yarn運(yùn)行模式
Flink 任 務(wù) 提 交 后 , Client 向 HDFS 上 傳 Flink 的 Jar 包 和 配 置 停士, 之 后 向 Yarn ResourceManager 提 交 任 務(wù) 肚医, ResourceManager 分 配 Container 資 源 并 通 知 對 應(yīng) 的 NodeManager 啟動 ApplicationMaster,ApplicationMaster 啟動后加載 Flink 的 Jar 包 和配置構(gòu)建環(huán)境向瓷,然后啟動 JobManager肠套,之后 ApplicationMaster 向 ResourceManager 申 請 資 源 啟 動 TaskManager , ResourceManager 分 配 Container 資 源 后 猖任, 由 ApplicationMaster 通 知 資 源 所 在 節(jié) 點(diǎn) 的 NodeManager 啟 動 TaskManager 你稚, NodeManager 加載 Flink 的 Jar 包和配置構(gòu)建環(huán)境并啟動 TaskManager,TaskManager 啟動后向 JobManager 發(fā)送心跳包朱躺,并等待 JobManager 向其分配任務(wù)刁赖。
任務(wù)調(diào)度原理
客 戶 端 不 是 運(yùn) 行 時 和 程 序 執(zhí) 行 的 一 部 分 , 但 它 用 于 準(zhǔn) 備 并 發(fā) 送 dataflow(JobGraph)給 Master(JobManager)长搀,然后宇弛,客戶端斷開連接或者維持連接以 等待接收計算結(jié)果。
當(dāng) Flink 集 群 啟 動 后 源请, 首 先 會 啟 動 一 個 JobManger 和 一 個 或 多 個 的 TaskManager枪芒。由 Client 提交任務(wù)給 JobManager,JobManager 再調(diào)度任務(wù)到各個 TaskManager 去執(zhí)行谁尸,然后 TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager舅踪。 TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸。上述三者均為獨(dú)立的 JVM 進(jìn)程良蛮。
Client 為提交 Job 的客戶端抽碌,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境 連通即可)。提交 Job 后决瞳,Client 可以結(jié)束進(jìn)程(Streaming 的任務(wù))货徙,也可以不 結(jié)束并等待結(jié)果返回。
JobManager 主 要 負(fù) 責(zé) 調(diào) 度 Job 并 協(xié) 調(diào) Task 做 checkpoint 皮胡。從 Client 處接收到 Job 和 JAR 包等資源后痴颊,會生成優(yōu)化后的 執(zhí)行計劃,并以 Task 的單元調(diào)度到各個 TaskManager 去執(zhí)行胸囱。
TaskManager 在啟動的時候就設(shè)置好了槽位數(shù)(Slot)祷舀,每個 slot 能啟動一個 Task瀑梗,Task 為線程烹笔。從 JobManager 處接收需要部署的 Task裳扯,部署啟動后,與自 己的上游建立 Netty 連接谤职,接收數(shù)據(jù)并處理饰豺。
TaskManger 與 Slots
Flink 中每一個 worker(TaskManager)都是一個 JVM 進(jìn)程,它可能會在獨(dú)立的線
程上執(zhí)行一個或多個 subtask允蜈。為了控制一個 worker 能接收多少個 task冤吨,worker 通
過 task slot 來進(jìn)行控制(一個 worker 至少有一個 task slot)。
每個 task slot 表示 TaskManager 擁有資源的一個固 定大小的子集 饶套。假如一個
TaskManager 有三個 slot漩蟆,那么它會將其管理的內(nèi)存分成三份給各個 slot。資源 slot
化意味著一個 subtask 將不需要跟來自其他 job 的 subtask 競爭被管理的內(nèi)存妓蛮,取而
代之的是它將擁有一定數(shù)量的內(nèi)存儲備怠李。需要注意的是,這里不會涉及到 CPU 的隔
離蛤克,slot 目前僅僅用來隔離 task 的受管理的內(nèi)存捺癞。
通過調(diào)整 task slot 的數(shù)量,允許用戶定義 subtask 之間如何互相隔離构挤。如果一個
TaskManager 一個 slot髓介,那將意味著每個 task group 運(yùn)行在獨(dú)立的 JVM 中(該 JVM
可能是通過一個特定的容器啟動的),而一個 TaskManager 多個 slot 意味著更多的
subtask 可以共享同一個 JVM筋现。而在同一個 JVM 進(jìn)程中的 task 將共享 TCP 連接(基
于多路復(fù)用)和心跳消息唐础。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個
task 的負(fù)載矾飞。
默認(rèn)情況下彻犁,F(xiàn)link 允許子任務(wù)共享 slot,即使它們是不同任務(wù)的子任務(wù)(前提 是它們來自同一個 job)凰慈。 這樣的結(jié)果是汞幢,一個 slot 可以保存作業(yè)的整個管道。
Task Slot 是靜態(tài)的概念微谓,是指 TaskManager 具有的并發(fā)執(zhí)行能力森篷,可以通過 參數(shù) taskmanager.numberOfTaskSlots 進(jìn)行配置;而并行度 parallelism 是動態(tài)概念豺型, 即 TaskManager 運(yùn)行程序時實(shí)際使用的并發(fā)能力仲智,可以通過參數(shù) parallelism.default 進(jìn)行配置。
程序與數(shù)據(jù)流(DataFlow)
所有的 Flink 程序都是由三部分組成的: Source 姻氨、Transformation 和 Sink钓辆。
Source 負(fù)責(zé)讀取數(shù)據(jù)源,Transformation 利用各種算子進(jìn)行處理加工,Sink 負(fù)責(zé)輸出前联。
在運(yùn)行時功戚,F(xiàn)link 上運(yùn)行的程序會被映射成“邏輯數(shù)據(jù)流”(dataflows),它包 含了這三部分似嗤。每一個 dataflow 以一個或多個 sources 開始以一個或多個 sinks 結(jié) 束啸臀。dataflow 類似于任意的有向無環(huán)圖(DAG)。在大部分情況下烁落,程序中的轉(zhuǎn)換運(yùn)算(transformations)跟 dataflow 中的算子(operator)是一一對應(yīng)的關(guān)系乘粒,但有時候,一個 transformation 可能對應(yīng)多個 operator伤塌。
執(zhí)行圖(ExecutionGraph)
由 Flink 程序直接映射成的數(shù)據(jù)流圖是 StreamGraph灯萍,也被稱為邏輯流圖,因?yàn)?它們表示的是計算邏輯的高級視圖每聪。為了執(zhí)行一個流處理程序竟稳,F(xiàn)link 需要將邏輯流圖轉(zhuǎn)換為物理數(shù)據(jù)流圖(也叫執(zhí)行圖),詳細(xì)說明程序的執(zhí)行方式熊痴。
Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖他爸。
StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用 來表示程序的拓?fù)浣Y(jié)構(gòu)果善。
JobGraph:StreamGraph 經(jīng)過優(yōu)化后生成了 JobGraph诊笤,提交給 JobManager 的 數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為巾陕,將多個符合條件的節(jié)點(diǎn) chain 在一起作為一個節(jié)點(diǎn)讨跟,這 樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph : JobManager 根 據(jù) JobGraph 生 成 ExecutionGraph 鄙煤。 ExecutionGraph 是 JobGraph 的并行化版本晾匠,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進(jìn)行調(diào)度后梯刚,在各個 TaskManager 上部署 Task 后形成的“圖”凉馆,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)
Flink 程序的執(zhí)行具有并行、分布式的特性亡资。
在執(zhí)行過程中澜共, 一個流( stream) 包含一個或多個分區(qū)( stream partition), 而每一個算子( operator) 可以包含一個或多個子任務(wù)( operator subtask)锥腻, 這些子任務(wù)在不同的線程嗦董、不同的物理機(jī)或不同的容器中彼此互不依賴地執(zhí)行。
一個特定算子的子任務(wù)(subtask) 的個數(shù)被稱之為其并行度( parallelism)瘦黑。一般情況下京革, 一個流程序的并行度奇唤, 可以認(rèn)為就是其所有算子中最大的并行度。一個程序中匹摇, 不同的算子可能具有不同的并行度咬扇。
圖 并行數(shù)據(jù)流
Stream 在算子之間傳輸數(shù)據(jù)的形式可以是 one-to-one(forwarding)的模式也可以是 redistributing 的模式, 具體是哪一種形式来惧, 取決于算子的種類。
-
One-to-one:stream(比如在 source 和 map operator 之間)維護(hù)著分區(qū)以及元素的順序演顾。那意味著 map 算子的子任務(wù)看到的元素的個數(shù)以及順序跟 source 算子的子任務(wù)生產(chǎn)的元素的個數(shù)供搀、順序相同, map钠至、fliter葛虐、flatMap 等算子都是 one-to-one 的對應(yīng)關(guān)系。
? 類似于 spark 中的窄依賴
-
Redistributing: stream(map()跟 keyBy/window 之間或者 keyBy/window 跟 sink 之間)的分區(qū)會發(fā)生改變棉钧。每一個算子的子任務(wù)依據(jù)所選擇的 transformation 發(fā)送數(shù)據(jù)到不同的目標(biāo)任務(wù)屿脐。例如,keyBy() 基于 hashCode 重分區(qū)宪卿、broadcast 和 rebalance 會隨機(jī)重新分區(qū)的诵,這些算子都會引起 redistribute 過程,而 redistribute 過程就類似于Spark 中的 shuffle 過程佑钾。
? 類似于 spark 中的寬依賴
任務(wù)鏈
相同并行度的 one To one操作西疤,F(xiàn)link將這樣相連的算子鏈接在一起形成一個 task,原來的算子成為里面的一部分休溶。將算子鏈接成 task 是非常有效的優(yōu)化: 它能減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換代赁, 在減少時延的同時提升吞吐量。鏈接的行為可以在編程 API 中進(jìn)行指定兽掰。