flink 運(yùn)行時(shí)的組件
JobManager
控制一個(gè)應(yīng)用程序執(zhí)行的主進(jìn)程望拖,也就是說跨算,每個(gè)應(yīng)用程序都會(huì)被一個(gè)不同的JobManager 所控制執(zhí)行搀擂。
JobManager 會(huì)先接收到要執(zhí)行的應(yīng)用程序违霞,這個(gè)應(yīng)用程序回包括:作業(yè)圖(JobGraph)堵幽、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有類堡赔、庫(kù)和其他資源的JAR包殉疼。JobManager會(huì)把 JobGraph 轉(zhuǎn)換成一個(gè)物理層面的數(shù)據(jù)流圖暂衡,這個(gè)圖被叫做"執(zhí)行圖"芽隆,包含了所有可以并發(fā)執(zhí)行的任務(wù)浊服。
JobManager 會(huì)向資源管理器(ResourceManager)請(qǐng)求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器(TaskManager)上的插槽(Slot)胚吁。一旦它獲取到了足夠的資源牙躺,就會(huì)將執(zhí)行圖分發(fā)到真正運(yùn)行他們的TaskManager上。而在運(yùn)行過程中腕扶,JobManager 會(huì)負(fù)載所有需要中央?yún)f(xié)調(diào)的操作孽拷,比如說檢查點(diǎn)(checkpoints)的協(xié)調(diào)。
TaskManager
Flink 中的工作進(jìn)程半抱。通常在Flink中會(huì)有多個(gè)TaskManager運(yùn)行脓恕,每一個(gè)TaskManager都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了TaskManager 能夠執(zhí)行的任務(wù)數(shù)量窿侈。啟動(dòng)之后炼幔,TaskManager會(huì)向資源管理器注冊(cè)它的插槽;收到資源管理器的指令后史简,TaskManager就會(huì)將一個(gè)或者多個(gè)插槽提供給JobManager調(diào)用乃秀。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。
在執(zhí)行過程中,一個(gè)TaskManager可以跟其他運(yùn)行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)跺讯。
ResourceManager
主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)枢贿,TaskManager 插槽是Flink中定義的處理資源單元。
Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器刀脏,比如YARN局荚、MEsos、K8s愈污,以及standalone部署耀态。
當(dāng)JobManager 申請(qǐng)插槽資源時(shí),ResourceManager 會(huì)將有空閑插槽的TaskManager分配給JobManager钙畔。如果ResourceManager沒有足夠的插槽來滿足JobManager的請(qǐng)求茫陆,它還可以向資源提供平臺(tái)發(fā)起會(huì)話,以提供啟動(dòng)TaskManager進(jìn)程的容器擎析。
Dispatcher
可以跨作業(yè)運(yùn)行簿盅,它為 應(yīng)用提交提供了REST接口。當(dāng)一個(gè)應(yīng)用被提交執(zhí)行時(shí)揍魂,分發(fā)器就會(huì)啟動(dòng)并將移交給一個(gè)JobManager桨醋。Dispatcher也會(huì)啟動(dòng)一個(gè)Web UI,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息现斋。
Dispatcher 在架構(gòu)中可能并不是必須的喜最,這取決于應(yīng)用提交運(yùn)行的方式。
任務(wù)提交流程
任務(wù)提交過程
YARN 模式
詳細(xì)提交流程
- Flink任務(wù)提交后庄蹋,client向HDFS上傳Flink的jar包和配置
- Client向Yarn ResourceManager 提交任務(wù)
- ResourceManager 分配Container 資源并通知對(duì)應(yīng)的NodeManager 啟動(dòng) ApplicationMaster
- ApplicationMaster 啟動(dòng)后加載Flink的jar包和配置構(gòu)建環(huán)境
- ApplicationMaster啟動(dòng)JobManager
- ApplicationMaster向ResourceManager申請(qǐng)資源啟動(dòng) TaskManager
- ResourceManager 分配Container 資源后
- 由ApplicationMaster 通知資源所在節(jié)點(diǎn)的NodeManager 啟動(dòng) TaskManager
- NodeManager 加載Flink的jar包和配置構(gòu)建環(huán)境并啟動(dòng)TaskManager
- TaskManager啟動(dòng)后向JobManager發(fā)送心跳包
- 等待JobManager 向其分配任務(wù)
優(yōu)缺點(diǎn)
YARN 資源的統(tǒng)一管理和調(diào)度瞬内。Yarn 集群中所有節(jié)點(diǎn)的資源(內(nèi)存、CPU限书、磁盤虫蝶、網(wǎng)絡(luò)等)被抽象為 Container。計(jì)算框架需要資源進(jìn)行運(yùn)算任務(wù)時(shí)需要向 Resource Manager 申請(qǐng) Container倦西,Yarn 按照特定的策略對(duì)資源進(jìn)行調(diào)度和進(jìn)行 Container 的分配能真。Yarn 模式能通過多種任務(wù)調(diào)度策略來利用提高集群資源利用率。例如 FIFO Scheduler扰柠、Capacity Scheduler粉铐、Fair Scheduler,并能設(shè)置任務(wù)優(yōu)先級(jí)卤档。
資源隔離蝙泼。Yarn 使用了輕量級(jí)資源隔離機(jī)制 Cgroups 進(jìn)行資源隔離以避免相互干擾,一旦 Container 使用的資源量超過事先定義的上限值劝枣,就將其殺死踱承。
自動(dòng) failover 處理倡缠。例如 Yarn NodeManager 監(jiān)控、Yarn ApplicationManager 異尘セ睿恢復(fù)。
TaskManager 與 Slots
Flink中每一個(gè)worker(TaskManager)都是一個(gè)JVM進(jìn)程琢唾,它可能會(huì)在獨(dú)立的線程上執(zhí)行一個(gè)或多個(gè)subtask载荔。為了控制一個(gè)worker能接收多少個(gè)task,worker通過task slot來進(jìn)行控制(一個(gè)worker至少有一個(gè)taskslot)
每個(gè)task slot表示TaskManager擁有資源的一個(gè)固定大小的子集采桃。 假如一個(gè)TaskManager有三個(gè)slot懒熙,那么它會(huì)將其管理的內(nèi)存分成三份給各個(gè)slot。資源slot化意味著一個(gè)subtask將不需要跟來自其他job的subtask競(jìng)爭(zhēng)被管理的內(nèi)存普办,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲(chǔ)備工扎。 需要注意的是,這里不會(huì)涉及到CPU的隔離(CPU是靠搶的)衔蹲,slot目前僅僅用來隔離task的受管理的內(nèi)存肢娘。
通過調(diào)整task slot的數(shù)量,允許用戶定義subtask之間如何互相隔離舆驶。如果一個(gè)TaskManager一個(gè)slot橱健,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM中(該JVM可能是通過一個(gè)特定的容器啟動(dòng)的),而一個(gè)TaskManager多個(gè)slot意味著更多的subtask可以共享同一個(gè)JVM沙廉。而在同一個(gè)JVM進(jìn)程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息拘荡。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個(gè)task的負(fù)載撬陵。
默認(rèn)情況下珊皿,F(xiàn)link允許子任務(wù)共享slot,即使他們是不同任務(wù)的子任務(wù)(前提是他們來自同一個(gè)job)巨税。 這樣的結(jié)果是蟋定,一個(gè)slot可以保存作業(yè)的整個(gè)管道。
Task Slot是靜態(tài)的概念垢夹,是指TaskManager具有的并發(fā)執(zhí)行能力溢吻,可以通過參數(shù)taskmanager.numberOfTaskSlots進(jìn)行配置
并行度parallelism是動(dòng)態(tài)概念,即TaskManager運(yùn)行程序時(shí)實(shí)際使用的并發(fā)能力果元,可以通過參數(shù)parallelism.default進(jìn)行配置促王。
設(shè)一共有3個(gè)TaskManager,每一個(gè)TaskManager中的分配3個(gè)TaskSlot而晒,也就是每個(gè)TaskManager可以接收3個(gè)task蝇狼,一共9個(gè)TaskSlot,如果我們?cè)O(shè)置parallelism.default=1倡怎,即運(yùn)行程序默認(rèn)的并行度為1迅耘,9個(gè)TaskSlot只用了1個(gè)贱枣,有8個(gè)空閑,因此颤专,設(shè)置合適的并行度才能提高效率纽哥。
并行度(Parallelism)
Flink程序的執(zhí)行具有并行、分布式的特性
在執(zhí)行過程中栖秕,一個(gè)流(stream)包含一個(gè)或多個(gè)分區(qū)(stream partition)春塌,而每一個(gè)算子(operator)可以包含一個(gè)或多個(gè)子任務(wù)(operator subtask),這些子任務(wù)在不同的線程簇捍、不同的物理機(jī)或不同的容器中彼此互不依賴地執(zhí)行只壳。
一個(gè)特定算子的子任務(wù)(subtask)的個(gè)數(shù)被稱之為其并行度(parallelism)
一般情況下,一個(gè)流程序的并行度暑塑,可以認(rèn)為就是其所有算子中最大的并行度吼句。一個(gè)程序中,不同的算子可能具有不同的并行度事格。
Stream在算子之間傳輸數(shù)據(jù)的形式可以是 one-to-one(forwarding) 的模式也可以是 redistributing 的模式惕艳,具體是哪一種形式,取決于算子的種類分蓖。
one-to-one:stream(比如在source和map operator之間)維護(hù)著分區(qū)以及元素的順序尔艇。那意味著map 算子的子任務(wù)看到的元素的個(gè)數(shù)以及順序跟source 算子的子任務(wù)生產(chǎn)的元素的個(gè)數(shù)、順序相同么鹤,map终娃、fliter、flatMap等算子都是one-to-one的對(duì)應(yīng)關(guān)系蒸甜。類似于spark中的窄依賴
redistributing(重新分配):stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區(qū)會(huì)發(fā)生改變棠耕。每一個(gè)算子的子任務(wù)依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標(biāo)任務(wù)。例如柠新,keyBy() 基于hashCode重分區(qū)窍荧、broadcast和rebalance會(huì)隨機(jī)重新分區(qū),這些算子都會(huì)引起redistribute過程恨憎,而redistribute過程就類似于Spark中的shuffle過程蕊退。類似于spark中的寬依賴
任務(wù)鏈(Operator Chains)
相同并行度的one to one操作,F(xiàn)link這樣相連的算子鏈接在一起形成一個(gè)task憔恳,原來的算子成為里面的一部分
將算子鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換瓤荔,在減少時(shí)延的同時(shí)提升吞吐量。
鏈接的行為可以在編程API中進(jìn)行指定钥组;任務(wù)鏈必須滿足兩個(gè)條件:one-to-one的數(shù)據(jù)傳輸并且并行度相同