一.Flink運(yùn)行的四大組件
如下圖所示,F(xiàn)link運(yùn)行的四大組件:
1,1 作業(yè)管理器(JobManager)
控制一個(gè)應(yīng)用程序執(zhí)行的主進(jìn)程盖淡,也就是說(shuō)年柠,每個(gè)應(yīng)用程序都會(huì)被一個(gè)不同的JobManager 所控制執(zhí)行。
JobManager 會(huì)先接收到要執(zhí)行的應(yīng)用程序禁舷,這個(gè)應(yīng)用程序會(huì)包括:作業(yè)圖(JobGraph)彪杉、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類毅往、庫(kù)和其它資源的JAR包牵咙。
JobManager 會(huì)把JobGraph轉(zhuǎn)換成一個(gè)物理層面的數(shù)據(jù)流圖,這個(gè)圖被叫做“執(zhí)行圖”(ExecutionGraph)攀唯,包含了所有可以并發(fā)執(zhí)行的任務(wù)洁桌。
JobManager 會(huì)向資源管理器(ResourceManager)請(qǐng)求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器(TaskManager)上的插槽(slot)侯嘀。一旦它獲取到了足夠的資源另凌,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager上。而在運(yùn)行過(guò)程中戒幔,JobManager會(huì)負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作吠谢,比如說(shuō)檢查點(diǎn)(checkpoints)的協(xié)調(diào)。
1.2 任務(wù)管理器(TaskManager)
Flink中的工作進(jìn)程诗茎。通常在Flink中會(huì)有多個(gè)TaskManager運(yùn)行工坊,每一個(gè)TaskManager都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量敢订。
啟動(dòng)之后王污,TaskManager會(huì)向資源管理器注冊(cè)它的插槽;收到資源管理器的指令后楚午,TaskManager就會(huì)將一個(gè)或者多個(gè)插槽提供給JobManager調(diào)用昭齐。JobManager就可以向插槽分配任務(wù)(tasks)來(lái)執(zhí)行了。
在執(zhí)行過(guò)程中矾柜,一個(gè)TaskManager可以跟其它運(yùn)行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)判族。
1.3 資源管理器(ResourceManager)
主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定義的處理資源單元叁扫。
Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器秩伞,比如YARN、Mesos饮睬、K8s租谈,以及standalone部署。
當(dāng)JobManager申請(qǐng)插槽資源時(shí),ResourceManager會(huì)將有空閑插槽的TaskManager分配給JobManager割去。如果ResourceManager沒(méi)有足夠的插槽來(lái)滿足JobManager的請(qǐng)求窟却,它還可以向資源提供平臺(tái)發(fā)起會(huì)話,以提供啟動(dòng)TaskManager進(jìn)程的容器呻逆。
1.4 分發(fā)器(Dispatcher)
可以跨作業(yè)運(yùn)行夸赫,它為應(yīng)用提交提供了REST接口。
當(dāng)一個(gè)應(yīng)用被提交執(zhí)行時(shí)咖城,分發(fā)器就會(huì)啟動(dòng)并將應(yīng)用移交給一個(gè)JobManager茬腿。
Dispatcher也會(huì)啟動(dòng)一個(gè)Web UI,用來(lái)方便地展示和監(jiān)控作業(yè)執(zhí)行的信息宜雀。
Dispatcher在架構(gòu)中可能并不是必需的切平,這取決于應(yīng)用提交運(yùn)行的方式。
二.任務(wù)提交流程
2.1 非yarn模式的任務(wù)提交流程
2.2 任務(wù)提交流程(YARN)
三. 任務(wù)調(diào)度原理
客戶端不是運(yùn)行時(shí)和程序執(zhí)行的一部分辐董,但它用于準(zhǔn)備并發(fā)送dataflow(JobGraph)給Master(JobManager)悴品,然后,客戶端斷開(kāi)連接或者維持連接以等待接收計(jì)算結(jié)果简烘。而Job Manager會(huì)產(chǎn)生一個(gè)執(zhí)行圖(Dataflow Graph)
當(dāng) Flink 集群?jiǎn)?dòng)后苔严,首先會(huì)啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager孤澎,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行届氢,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸覆旭。上述三者均為獨(dú)立的 JVM 進(jìn)程退子。
Client 為提交 Job 的客戶端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)姐扮。提交 Job 后絮供,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回茶敏。
JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint壤靶,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后惊搏,會(huì)生成優(yōu)化后的執(zhí)行計(jì)劃贮乳,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。
TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot)恬惯,每個(gè) slot 能啟動(dòng)一個(gè) Task向拆,Task 為線程。從 JobManager 處接收需要部署的 Task酪耳,部署啟動(dòng)后浓恳,與自己的上游建立 Netty 連接刹缝,接收數(shù)據(jù)并處理。
四. TaskManger與Slots與parallelism
Flink 中每一個(gè) TaskManager 都是一個(gè)JVM進(jìn)程颈将,它可能會(huì)在獨(dú)立的線程上執(zhí)行一個(gè)或多個(gè)子任務(wù)
為了控制一個(gè) TaskManager 能接收多少個(gè) task梢夯, TaskManager 通過(guò) task slot 來(lái)進(jìn)行控制(一個(gè) TaskManager 至少有一個(gè) slot)
圖中每個(gè)Task Manager中的Slot為3個(gè),那么兩個(gè)Task Manager一共有六個(gè)Slot, 而這6個(gè)Slot代表著Task Manager最大的并發(fā)執(zhí)行能力晴圾,一共能可以執(zhí)行6個(gè)task進(jìn)行同時(shí)執(zhí)行
Slot是靜態(tài)概念颂砸,代表著Task Manager具有的并發(fā)執(zhí)行能力,可以通過(guò)參數(shù)taskmanager.numberOfTaskSlots進(jìn)行配置
為了控制一個(gè) TaskManager 能接收多少個(gè) task死姚, TaskManager 通過(guò) task slot 來(lái)進(jìn)行控制(一個(gè) TaskManager 至少有一個(gè) slot)
-
圖中Source和Map是一個(gè)Task人乓,且并行度(我們?cè)O(shè)置的setParallelism())都為1,指這個(gè)task任務(wù)的并行能力為1都毒,只占用一個(gè)Slot資源
image.png 在第二張圖中為Flink的共享子任務(wù)色罚,如果一個(gè)TaskManager一個(gè)slot,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM中(該JVM可能是通過(guò)一個(gè)特定的容器啟動(dòng)的)温鸽,而一個(gè)TaskManager多個(gè)slot意味著更多的subtask可以共享同一個(gè)JVM保屯。而在同一個(gè)JVM進(jìn)程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)涤垫,因此這減少了每個(gè)task的負(fù)載。
并行度parallelism是動(dòng)態(tài)概念竟终,即TaskManager運(yùn)行程序時(shí)實(shí)際使用的并發(fā)能力蝠猬,可以通過(guò)參數(shù)parallelism.default進(jìn)行配置。
- 也就是說(shuō)统捶,假設(shè)一共有3個(gè)TaskManager榆芦,每一個(gè)TaskManager中的分配3個(gè)TaskSlot,也就是每個(gè)TaskManager可以接收3個(gè)task喘鸟,一共9個(gè)TaskSlot匆绣,如果我們?cè)O(shè)置parallelism.default=1,即運(yùn)行程序默認(rèn)的并行度為1什黑,9個(gè)TaskSlot只用了1個(gè)崎淳,有8個(gè)空閑,因此愕把,設(shè)置合適的并行度才能提高效率拣凹。
一個(gè)特定算子的 子任務(wù)(subtask)的個(gè)數(shù)被稱之為其并行度(parallelism),我們可以對(duì)單獨(dú)的每個(gè)算子進(jìn)行設(shè)置并行度恨豁,也可以直接用env設(shè)置全局的并行度嚣镜,更可以在頁(yè)面中去指定并行度。
最后橘蜜,由于并行度是實(shí)際Task Manager處理task 的能力菊匿,而一般情況下,一個(gè) stream 的并行度,可以認(rèn)為就是其所有算子中最大的并行度跌捆,則可以得出在設(shè)置Slot時(shí)凡涩,在所有設(shè)置中的最大設(shè)置的并行度大小則就是所需要設(shè)置的Slot的數(shù)量。
五. 程序與數(shù)據(jù)流
所有的Flink程序都是由三部分組成的: Source 疹蛉、Transformation 和 Sink活箕。
-
Source 負(fù)責(zé)讀取數(shù)據(jù)源,Transformation 利用各種算子進(jìn)行處理加工可款,Sink 負(fù)責(zé)輸出
image.png 在運(yùn)行時(shí)育韩,F(xiàn)link上運(yùn)行的程序會(huì)被映射成“邏輯數(shù)據(jù)流”(dataflows),它包含了這三部分
每一個(gè)dataflow以一個(gè)或多個(gè)sources開(kāi)始以一個(gè)或多個(gè)sinks結(jié)束闺鲸。dataflow類似于任意的有向無(wú)環(huán)圖(DAG)
-
在大部分情況下筋讨,程序中的轉(zhuǎn)換運(yùn)算(transformations)跟dataflow中的算子(operator)是一一對(duì)應(yīng)的關(guān)系
image.png
六. 執(zhí)行圖(ExecutionGraph)
Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖
StreamGraph:是根據(jù)用戶通過(guò) Stream API 編寫(xiě)的代碼生成的最初的圖。用來(lái)表示程序的拓?fù)浣Y(jié)構(gòu)摸恍。
JobGraph:StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph悉罕,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為立镶,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn)
ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph壁袄。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)媚媒。
-
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后嗜逻,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)缭召。
image.png
七. 數(shù)據(jù)傳輸形式
Flink 采用了一種稱為任務(wù)鏈的優(yōu)化技術(shù)栈顷,可以在特定條件下減少本地通信的開(kāi)銷。為了滿足任務(wù)鏈的要求嵌巷,必須將兩個(gè)或多個(gè)算子設(shè)為相同的并行度萄凤,并通過(guò)本地轉(zhuǎn)發(fā)(local forward)的方式進(jìn)行連接
相同并行度的 one-to-one 操作,F(xiàn)link 這樣相連的算子鏈接在一起形成一個(gè) task搪哪,原來(lái)的算子成為里面的 subtask
并行度相同靡努、并且是 one-to-one 操作,兩個(gè)條件缺一不可
-
而為什么需要并行度相同噩死,因?yàn)槿鬴latMap并行度為1颤难,到了之后的map并行度為2,從flatMap到map的數(shù)據(jù)涉及到數(shù)據(jù)由于并行度map為2會(huì)往兩個(gè)slot處理已维,數(shù)據(jù)會(huì)分散行嗤,所產(chǎn)生的元素個(gè)數(shù)和順序發(fā)生的改變所以有2個(gè)單獨(dú)的task,不能成為任務(wù)鏈
image.png