簡(jiǎn)介: Flink 的整體架構(gòu)如圖 1 所示牌柄。Flink 是可以運(yùn)行在多種不同的環(huán)境中的畸悬,例如,它可以通過(guò)單進(jìn)程多線程的方式直接運(yùn)行珊佣,從而提供調(diào)試的能力蹋宦。它也可以運(yùn)行在 Yarn 或者 K8S 這種資源管理系統(tǒng)上面,也可以在各種云環(huán)境中執(zhí)行咒锻。
參考老師:高赟(云騫)
- 綜述
本文主要介紹 Flink Runtime 的作業(yè)執(zhí)行的核心機(jī)制冷冗。
1.首先介紹 Flink Runtime 的整體架構(gòu) - Job 的基本執(zhí)行流程,
3.然后介紹在這個(gè)過(guò)程惑艇,F(xiàn)link 是怎么進(jìn)行資源管理蒿辙、作業(yè)調(diào)度以及錯(cuò)誤恢復(fù)的拇泛。
最后,本文還將簡(jiǎn)要介紹 Flink Runtime 層當(dāng)前正在進(jìn)行的一些工作思灌。 -
Flink Runtime 整體架構(gòu)
圖1. Flink 的整體架構(gòu)俺叭,其中 Runtime 層對(duì)不同的執(zhí)行環(huán)境提供了一套統(tǒng)一的分布式執(zhí)行引擎。.png
Flink 在 Runtime 層之上提供了 DataStream 和 DataSet 兩套 API泰偿,分別用來(lái)編寫(xiě)流作業(yè)與批作業(yè)熄守,以及一組更高級(jí)的 API 來(lái)簡(jiǎn)化特定作業(yè)的編寫(xiě)。
本文主要介紹 Flink Runtime 層的整體架構(gòu)甜奄。
Flink Runtime 層的主要架構(gòu)如圖 2 所示柠横,它展示了一個(gè) Flink 集群的基本結(jié)構(gòu)。
Flink Runtime 層的整個(gè)架構(gòu)主要是在 FLIP-6 中實(shí)現(xiàn)的,整體來(lái)說(shuō)烟阐,它采用了標(biāo)準(zhǔn) master-slave 的結(jié)構(gòu)搬俊,其中左側(cè)白色圈中的部分即是 master,它負(fù)責(zé)管理整個(gè)集群中的資源和作業(yè)蜒茄;而右側(cè)的兩個(gè) TaskExecutor 則是 Slave唉擂,負(fù)責(zé)提供具體的資源并實(shí)際執(zhí)行作業(yè)。
其中檀葛,Master 部分又包含了三個(gè)組件玩祟,即 Dispatcher、ResourceManager 和 JobManager屿聋。其中空扎,Dispatcher 負(fù)責(zé)接收用戶提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的 JobManager 組件润讥。ResourceManager 負(fù)責(zé)資源的管理转锈,在整個(gè) Flink 集群中只有一個(gè) ResourceManager。JobManager 負(fù)責(zé)管理作業(yè)的執(zhí)行楚殿,在一個(gè) Flink 集群中可能有多個(gè)作業(yè)同時(shí)執(zhí)行撮慨,每個(gè)作業(yè)都有自己的 JobManager 組件。這三個(gè)組件都包含在 AppMaster 進(jìn)程中脆粥。
基于上述結(jié)構(gòu)砌溺,當(dāng)用戶提交作業(yè)的時(shí)候,提交腳本會(huì)首先啟動(dòng)一個(gè) Client進(jìn)程負(fù)責(zé)作業(yè)的編譯與提交变隔。它首先將用戶編寫(xiě)的代碼編譯為一個(gè) JobGraph抚吠,在這個(gè)過(guò)程,它還會(huì)進(jìn)行一些檢查或優(yōu)化等工作弟胀,例如判斷哪些 Operator 可以 Chain 到同一個(gè) Task 中。然后,Client 將產(chǎn)生的 JobGraph 提交到集群中執(zhí)行孵户。此時(shí)有兩種情況萧朝,一種是類似于 Standalone 這種 Session 模式,AM 會(huì)預(yù)先啟動(dòng)夏哭,此時(shí) Client 直接與 Dispatcher 建立連接并提交作業(yè)即可检柬。另一種是 Per-Job 模式,AM 不會(huì)預(yù)先啟動(dòng)竖配,此時(shí) Client 將首先向資源管理系統(tǒng) (如Yarn何址、K8S)申請(qǐng)資源來(lái)啟動(dòng) AM,然后再向 AM 中的 Dispatcher 提交作業(yè)进胯。
當(dāng)作業(yè)到 Dispatcher 后用爪,Dispatcher 會(huì)首先啟動(dòng)一個(gè) JobManager 組件,然后 JobManager 會(huì)向 ResourceManager 申請(qǐng)資源來(lái)啟動(dòng)作業(yè)中具體的任務(wù)胁镐。這時(shí)根據(jù) Session 和 Per-Job 模式的區(qū)別偎血, TaskExecutor 可能已經(jīng)啟動(dòng)或者尚未啟動(dòng)。如果是前者盯漂,此時(shí) ResourceManager 中已有記錄了 TaskExecutor 注冊(cè)的資源颇玷,可以直接選取空閑資源進(jìn)行分配。否則就缆,ResourceManager 也需要首先向外部資源管理系統(tǒng)申請(qǐng)資源來(lái)啟動(dòng) TaskExecutor帖渠,然后等待 TaskExecutor 注冊(cè)相應(yīng)資源后再繼續(xù)選擇空閑資源進(jìn)程分配。目前 Flink 中 TaskExecutor 的資源是通過(guò) Slot 來(lái)描述的竭宰,一個(gè) Slot 一般可以執(zhí)行一個(gè)具體的 Task空郊,但在一些情況下也可以執(zhí)行多個(gè)相關(guān)聯(lián)的 Task,這部分內(nèi)容將在下文進(jìn)行詳述羞延。ResourceManager 選擇到空閑的 Slot 之后渣淳,就會(huì)通知相應(yīng)的 TM “將該 Slot 分配分 JobManager XX ”,然后 TaskExecutor 進(jìn)行相應(yīng)的記錄后伴箩,會(huì)向 JobManager 進(jìn)行注冊(cè)入愧。JobManager 收到 TaskExecutor 注冊(cè)上來(lái)的 Slot 后,就可以實(shí)際提交 Task 了嗤谚。
TaskExecutor 收到 JobManager 提交的 Task 之后棺蛛,會(huì)啟動(dòng)一個(gè)新的線程來(lái)執(zhí)行該 Task。Task 啟動(dòng)后就會(huì)開(kāi)始進(jìn)行預(yù)先指定的計(jì)算巩步,并通過(guò)數(shù)據(jù) Shuffle 模塊互相交換數(shù)據(jù)旁赊。
以上就是 Flink Runtime 層執(zhí)行作業(yè)的基本流程∫我埃可以看出终畅,F(xiàn)link 支持兩種不同的模式籍胯,即 Per-job 模式與 Session 模式。如圖 3 所示离福,Per-job 模式下整個(gè) Flink 集群只執(zhí)行單個(gè)作業(yè)杖狼,即每個(gè)作業(yè)會(huì)獨(dú)享 Dispatcher 和 ResourceManager 組件。此外妖爷,Per-job 模式下 AppMaster 和 TaskExecutor 都是按需申請(qǐng)的蝶涩。因此,Per-job 模式更適合運(yùn)行執(zhí)行時(shí)間較長(zhǎng)的大作業(yè)絮识,這些作業(yè)對(duì)穩(wěn)定性要求較高绿聘,并且對(duì)申請(qǐng)資源的時(shí)間不敏感。與之對(duì)應(yīng)次舌,在 Session 模式下熄攘,F(xiàn)link 預(yù)先啟動(dòng) AppMaster 以及一組 TaskExecutor,然后在整個(gè)集群的生命周期中會(huì)執(zhí)行多個(gè)作業(yè)垃它∠势粒可以看出,Session 模式更適合規(guī)模小国拇,執(zhí)行時(shí)間短的作業(yè)洛史。
3. 資源管理與作業(yè)調(diào)度
本節(jié)對(duì) Flink 中資源管理與作業(yè)調(diào)度的功能進(jìn)行更深入的說(shuō)明酱吝。實(shí)際上也殖,作業(yè)調(diào)度可以看做是對(duì)資源和任務(wù)進(jìn)行匹配的過(guò)程。如上節(jié)所述务热,在 Flink 中忆嗜,資源是通過(guò) Slot 來(lái)表示的,每個(gè) Slot 可以用來(lái)執(zhí)行不同的 Task崎岂。而在另一端捆毫,任務(wù)即 Job 中實(shí)際的 Task,它包含了待執(zhí)行的用戶邏輯冲甘。調(diào)度的主要目的就是為了給 Task 找到匹配的 Slot绩卤。邏輯上來(lái)說(shuō),每個(gè) Slot 都應(yīng)該有一個(gè)向量來(lái)描述它所能提供的各種資源的量江醇,每個(gè) Task 也需要相應(yīng)的說(shuō)明它所需要的各種資源的量濒憋。但是實(shí)際上在 1.9 之前,F(xiàn)link 是不支持細(xì)粒度的資源描述的陶夜,而是統(tǒng)一的認(rèn)為每個(gè) Slot 提供的資源和 Task 需要的資源都是相同的凛驮。從 1.9 開(kāi)始,F(xiàn)link 開(kāi)始增加對(duì)細(xì)粒度的資源匹配的支持的實(shí)現(xiàn)条辟,但這部分功能目前仍在完善中黔夭。
作業(yè)調(diào)度的基礎(chǔ)是首先提供對(duì)資源的管理宏胯,因此我們首先來(lái)看下 Flink 中資源管理的實(shí)現(xiàn)。如上文所述本姥,Flink 中的資源是由 TaskExecutor 上的 Slot 來(lái)表示的胳嘲。如圖 4 所示,在 ResourceManager 中扣草,有一個(gè)子組件叫做 SlotManager,它維護(hù)了當(dāng)前集群中所有 TaskExecutor 上的 Slot 的信息與狀態(tài)颜屠,如該 Slot 在哪個(gè) TaskExecutor 中辰妙,該 Slot 當(dāng)前是否空閑等。當(dāng) JobManger 來(lái)為特定 Task 申請(qǐng)資源的時(shí)候甫窟,根據(jù)當(dāng)前是 Per-job 還是 Session 模式密浑,ResourceManager 可能會(huì)去申請(qǐng)資源來(lái)啟動(dòng)新的 TaskExecutor。當(dāng) TaskExecutor 啟動(dòng)之后粗井,它會(huì)通過(guò)服務(wù)發(fā)現(xiàn)找到當(dāng)前活躍的 ResourceManager 并進(jìn)行注冊(cè)尔破。在注冊(cè)信息中,會(huì)包含該 TaskExecutor中所有 Slot 的信息浇衬。 ResourceManager 收到注冊(cè)信息后懒构,其中的 SlotManager 就會(huì)記錄下相應(yīng)的 Slot 信息。當(dāng) JobManager 為某個(gè) Task 來(lái)申請(qǐng)資源時(shí)耘擂, SlotManager 就會(huì)從當(dāng)前空閑的 Slot 中按一定規(guī)則選擇一個(gè)空閑的 Slot 進(jìn)行分配胆剧。當(dāng)分配完成后,如第 2 節(jié)所述醉冤,RM 會(huì)首先向 TaskManager 發(fā)送 RPC 要求將選定的 Slot 分配給特定的 JobManager秩霍。TaskManager 如果還沒(méi)有執(zhí)行過(guò)該 JobManager 的 Task 的話,它需要首先向相應(yīng)的 JobManager 建立連接蚁阳,然后發(fā)送提供 Slot 的 RPC 請(qǐng)求铃绒。在 JobManager 中,所有 Task 的請(qǐng)求會(huì)緩存到 SlotPool 中螺捐。當(dāng)有 Slot 被提供之后颠悬,SlotPool 會(huì)從緩存的請(qǐng)求中選擇相應(yīng)的請(qǐng)求并結(jié)束相應(yīng)的請(qǐng)求過(guò)程。
當(dāng) Task 結(jié)束之后椿疗,無(wú)論是正常結(jié)束還是異常結(jié)束,都會(huì)通知 JobManager 相應(yīng)的結(jié)束狀態(tài)糠悼,然后在 TaskManager 端將 Slot 標(biāo)記為已占用但未執(zhí)行任務(wù)的狀態(tài)届榄。JobManager 會(huì)首先將相應(yīng)的 Slot 緩存到 SlotPool 中,但不會(huì)立即釋放倔喂。這種方式避免了如果將 Slot 直接還給 ResourceManager铝条,在任務(wù)異常結(jié)束之后需要重啟時(shí)靖苇,需要立刻重新申請(qǐng) Slot 的問(wèn)題。通過(guò)延時(shí)釋放班缰,F(xiàn)ailover 的 Task 可以盡快調(diào)度回原來(lái)的 TaskManager贤壁,從而加快 Failover 的速度。當(dāng) SlotPool 中緩存的 Slot 超過(guò)指定的時(shí)間仍未使用時(shí)埠忘,SlotPool 就會(huì)發(fā)起釋放該 Slot 的過(guò)程脾拆。與申請(qǐng) Slot 的過(guò)程對(duì)應(yīng),SlotPool 會(huì)首先通知 TaskManager 來(lái)釋放該 Slot莹妒,然后 TaskExecutor 通知 ResourceManager 該 Slot 已經(jīng)被釋放名船,從而最終完成釋放的邏輯。
除了正常的通信邏輯外旨怠,在 ResourceManager 和 TaskExecutor 之間還存在定時(shí)的心跳消息來(lái)同步 Slot 的狀態(tài)渠驼。在分布式系統(tǒng)中,消息的丟失鉴腻、錯(cuò)亂不可避免迷扇,這些問(wèn)題會(huì)在分布式系統(tǒng)的組件中引入不一致?tīng)顟B(tài),如果沒(méi)有定時(shí)消息爽哎,那么組件無(wú)法從這些不一致?tīng)顟B(tài)中恢復(fù)蜓席。此外,當(dāng)組件之間長(zhǎng)時(shí)間未收到對(duì)方的心跳時(shí)倦青,就會(huì)認(rèn)為對(duì)應(yīng)的組件已經(jīng)失效瓮床,并進(jìn)入到 Failover 的流程。
在 Slot 管理基礎(chǔ)上产镐,F(xiàn)link 可以將 Task 調(diào)度到相應(yīng)的 Slot 當(dāng)中隘庄。如上文所述,F(xiàn)link 尚未完全引入細(xì)粒度的資源匹配癣亚,默認(rèn)情況下丑掺,每個(gè) Slot 可以分配給一個(gè) Task。但是述雾,這種方式在某些情況下會(huì)導(dǎo)致資源利用率不高街州。如圖 5 所示,假如 A玻孟、B唆缴、C 依次執(zhí)行計(jì)算邏輯,那么給 A黍翎、B面徽、C 分配分配單獨(dú)的 Slot 就會(huì)導(dǎo)致資源利用率不高。為了解決這一問(wèn)題,F(xiàn)link 提供了 Share Slot 的機(jī)制趟紊。如圖 5 所示氮双,基于 Share Slot,每個(gè) Slot 中可以部署來(lái)自不同 JobVertex 的多個(gè)任務(wù)霎匈,但是不能部署來(lái)自同一個(gè) JobVertex 的 Task戴差。如圖5所示,每個(gè) Slot 中最多可以部署同一個(gè) A铛嘱、B 或 C 的 Task暖释,但是可以同時(shí)部署 A、B 和 C 的各一個(gè) Task墨吓。當(dāng)單個(gè) Task 占用資源較少時(shí)饭入,Share Slot 可以提高資源利用率。 此外肛真,Share Slot 也提供了一種簡(jiǎn)單的保持負(fù)載均衡的方式。
基于上述 Slot 管理和分配的邏輯,JobManager 負(fù)責(zé)維護(hù)作業(yè)中 Task執(zhí)行的狀態(tài)讥珍。如上文所述历极,Client 端會(huì)向 JobManager 提交一個(gè) JobGraph,它代表了作業(yè)的邏輯結(jié)構(gòu)衷佃。JobManager 會(huì)根據(jù) JobGraph 按并發(fā)展開(kāi)趟卸,從而得到 JobManager 中關(guān)鍵的 ExecutionGraph。ExecutionGraph 的結(jié)構(gòu)如圖 5 所示氏义,與 JobGraph 相比锄列,ExecutionGraph 中對(duì)于每個(gè) Task 與中間結(jié)果等均創(chuàng)建了對(duì)應(yīng)的對(duì)象,從而可以維護(hù)這些實(shí)體的信息與狀態(tài)惯悠。
在一個(gè) Flink Job 中是包含多個(gè) Task 的筒严,因此另一個(gè)關(guān)鍵的問(wèn)題是在 Flink 中按什么順序來(lái)調(diào)度 Task。如圖 7 所示情萤,目前 Flink 提供了兩種基本的調(diào)度邏輯鸭蛙,即 Eager 調(diào)度與 Lazy From Source。Eager 調(diào)度如其名子所示筋岛,它會(huì)在作業(yè)啟動(dòng)時(shí)申請(qǐng)資源將所有的 Task 調(diào)度起來(lái)娶视。這種調(diào)度算法主要用來(lái)調(diào)度可能沒(méi)有終止的流作業(yè)。與之對(duì)應(yīng)泉蝌,Lazy From Source 則是從 Source 開(kāi)始歇万,按拓?fù)漤樞騺?lái)進(jìn)行調(diào)度揩晴。簡(jiǎn)單來(lái)說(shuō),Lazy From Source 會(huì)先調(diào)度沒(méi)有上游任務(wù)的 Source 任務(wù)贪磺,當(dāng)這些任務(wù)執(zhí)行完成時(shí)硫兰,它會(huì)將輸出數(shù)據(jù)緩存到內(nèi)存或者寫(xiě)入到磁盤(pán)中。然后寒锚,對(duì)于后續(xù)的任務(wù)劫映,當(dāng)它的前驅(qū)任務(wù)全部執(zhí)行完成后,F(xiàn)link 就會(huì)將這些任務(wù)調(diào)度起來(lái)刹前。這些任務(wù)會(huì)從讀取上游緩存的輸出數(shù)據(jù)進(jìn)行自己的計(jì)算泳赋。這一過(guò)程繼續(xù)進(jìn)行直到所有的任務(wù)完成計(jì)算。
- 錯(cuò)誤恢復(fù)
在 Flink 作業(yè)的執(zhí)行過(guò)程中拣技,除正常執(zhí)行的流程外千诬,還有可能由于環(huán)境等原因?qū)е赂鞣N類型的錯(cuò)誤。整體上來(lái)說(shuō)膏斤,錯(cuò)誤可能分為兩大類:Task 執(zhí)行出現(xiàn)錯(cuò)誤或 Flink 集群的 Master 出現(xiàn)錯(cuò)誤徐绑。由于錯(cuò)誤不可避免,為了提高可用性莫辨,F(xiàn)link 需要提供自動(dòng)錯(cuò)誤恢復(fù)機(jī)制來(lái)進(jìn)行重試傲茄。
對(duì)于第一類 Task 執(zhí)行錯(cuò)誤,F(xiàn)link 提供了多種不同的錯(cuò)誤恢復(fù)策略沮榜。如圖 8 所示盘榨,第一種策略是 Restart-all,即直接重啟所有的 Task蟆融。對(duì)于 Flink 的流任務(wù)较曼,由于 Flink 提供了 Checkpoint 機(jī)制,因此當(dāng)任務(wù)重啟后可以直接從上次的 Checkpoint 開(kāi)始繼續(xù)執(zhí)行振愿。因此這種方式更適合于流作業(yè)捷犹。第二類錯(cuò)誤恢復(fù)策略是 Restart-individual,它只適用于 Task 之間沒(méi)有數(shù)據(jù)傳輸?shù)那闆r冕末。這種情況下萍歉,我們可以直接重啟出錯(cuò)的任務(wù)。
由于 Flink 的批作業(yè)沒(méi)有 Checkpoint 機(jī)制拒担,因此對(duì)于需要數(shù)據(jù)傳輸?shù)淖鳂I(yè)个盆,直接重啟所有 Task 會(huì)導(dǎo)致作業(yè)從頭計(jì)算蒙幻,從而導(dǎo)致一定的性能問(wèn)題。為了增強(qiáng)對(duì) Batch 作業(yè)仍律,F(xiàn)link 在1.9中引入了一種新的Region-Based的Failover策略钧栖。在一個(gè) Flink 的 Batch 作業(yè)中 Task 之間存在兩種數(shù)據(jù)傳輸方式低零,一種是 Pipeline 類型的方式,這種方式上下游 Task 之間直接通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)拯杠,因此需要上下游同時(shí)運(yùn)行掏婶;另外一種是 Blocking 類型的試,如上節(jié)所述潭陪,這種方式下雄妥,上游的 Task 會(huì)首先將數(shù)據(jù)進(jìn)行緩存,因此上下游的 Task 可以單獨(dú)執(zhí)行依溯【グ牛基于這兩種類型的傳輸,F(xiàn)link 將 ExecutionGraph 中使用 Pipeline 方式傳輸數(shù)據(jù)的 Task 的子圖叫做 Region誓沸,從而將整個(gè) ExecutionGraph 劃分為多個(gè)子圖∫妓冢可以看出拜隧,Region 內(nèi)的 Task 必須同時(shí)重啟,而不同 Region 的 Task 由于在 Region 邊界存在 Blocking 的邊趁仙,因此洪添,可以單獨(dú)重啟下游 Region 中的 Task。
基于這一思路,如果某個(gè) Region 中的某個(gè) Task 執(zhí)行出現(xiàn)錯(cuò)誤雀费,可以分兩種情況進(jìn)行考慮干奢。如圖 8 所示,如果是由于 Task 本身的問(wèn)題發(fā)生錯(cuò)誤盏袄,那么可以只重啟該 Task 所屬的 Region 中的 Task忿峻,這些 Task 重啟之后,可以直接拉取上游 Region 緩存的輸出結(jié)果繼續(xù)進(jìn)行計(jì)算辕羽。
另一方面逛尚,如圖如果錯(cuò)誤是由于讀取上游結(jié)果出現(xiàn)問(wèn)題,如網(wǎng)絡(luò)連接中斷刁愿、緩存上游輸出數(shù)據(jù)的 TaskExecutor 異常退出等绰寞,那么還需要重啟上游 Region 來(lái)重新產(chǎn)生相應(yīng)的數(shù)據(jù)。在這種情況下,如果上游 Region 輸出的數(shù)據(jù)分發(fā)方式不是確定性的(如 KeyBy滤钱、Broadcast 是確定性的分發(fā)方式觉壶,而 Rebalance、Random 則不是件缸,因?yàn)槊看螆?zhí)行會(huì)產(chǎn)生不同的分發(fā)結(jié)果)铜靶,為保證結(jié)果正確性,還需要同時(shí)重啟上游 Region 所有的下游 Region停团。
除了 Task 本身執(zhí)行的異常外,另一類異常是 Flink 集群的 Master 進(jìn)行發(fā)生異常履恩。目前 Flink 支持啟動(dòng)多個(gè) Master 作為備份锰茉,這些 Master 可以通過(guò) ZK 來(lái)進(jìn)行選主,從而保證某一時(shí)刻只有一個(gè) Master 在運(yùn)行切心。當(dāng)前活路的 Master 發(fā)生異常時(shí),某個(gè)備份的 Master 可以接管協(xié)調(diào)的工作飒筑。為了保證 Master 可以準(zhǔn)確維護(hù)作業(yè)的狀態(tài),F(xiàn)link 目前采用了一種最簡(jiǎn)單的實(shí)現(xiàn)方式绽昏,即直接重啟整個(gè)作業(yè)协屡。實(shí)際上,由于作業(yè)本身可能仍在正常運(yùn)行全谤,因此這種方式存在肤晓。
參考:https://developer.aliyun.com/article/718131?spm=a2c6h.12873639.0.0.609c11b80nD34r