1诚隙、背景
降本增效大背景下讶隐,統(tǒng)計分析實時任務(wù)資源使用情況,核心關(guān)注任務(wù)配置的CPU久又、內(nèi)存巫延、 實際使用的CPU、內(nèi)存地消,首先對Flink任務(wù)涉及的基礎(chǔ)概念做個梳理理解
2炉峰、Flink基礎(chǔ)概念
介紹的主要是與 Flink 資源管理相關(guān)的組件,我們知道一個 Flink Cluster 是由一個 Flink Master 和多個 Task Manager 組成的脉执,F(xiàn)link Master 和 Task Manager 是進程級組件疼阔,其他的組件都是進程內(nèi)的組件。
2.1、jobManager
2.1.1婆廊、概述
JobManager :協(xié)調(diào) Flink 應(yīng)用程序的分布式執(zhí)行迅细。它決定何時調(diào)度下一個 task(或一組 task)、對完成的 task 或執(zhí)行失敗做出反應(yīng)否彩、協(xié)調(diào) checkpoint疯攒、并且協(xié)調(diào)從失敗中恢復(fù)等等嗦随。這個進程由三個不同的組件組成:
- ResourceManager: 負責(zé) Flink 集群中的資源提供列荔、回收、分配 枚尼。 它管理 taskslots贴浙,這是 Flink 集群中資源調(diào)度的單位。Flink 為不同的環(huán)境和資源提供者(例如YARN署恍、Kubernetes 和 standalone 部署)實現(xiàn)了對應(yīng)的 ResourceManager崎溃。
- Dispatcher:提供了一個 REST 接口,用來提交 Flink 應(yīng)用程序執(zhí)行盯质,并為每個提交的作業(yè)啟動一個新的 JobMaster袁串。它還運行 Flink WebUI 用來提供作業(yè)執(zhí)行信息。
- JobMaster:負責(zé)管理單個JobGraph的執(zhí)行呼巷。Flink 集群中可以同時運行多個作業(yè)囱修,每個作業(yè)都有自己的 唯一一個JobMaster。
2.1.2王悍、設(shè)置內(nèi)存
- Per job模式:-yjm參數(shù)設(shè)置內(nèi)存如 -yjm 1024 表示jobManager內(nèi)存1G
- session模式:-jm 參數(shù)設(shè)置內(nèi)存破镰,如 -jm 1024 表示jobManager內(nèi)存1G
參數(shù)jobmanager.memory.process.size設(shè)置
2.1.3、設(shè)置CPU
- K8s:kubernetes.jobmanager.cpu 默認1C
- yarn: yarn.appmaster.vcores 默認1C
2.2压储、taskManager
2.2.1鲜漩、概述
TaskManager:執(zhí)行作業(yè)流的 task,并且緩存和交換數(shù)據(jù)流集惋。在 TaskManager 中資源調(diào)度的最小單位是 task slot孕似。TaskManager 中 task slot 的數(shù)量表示并發(fā)處理 task 的數(shù)量。
2.2.3刮刑、數(shù)量如何設(shè)置
- Per job模式:TaskManager的數(shù)量是在提交作業(yè)時根據(jù)并發(fā)度動態(tài)計算(-yn參數(shù)指定已失效)喉祭,根據(jù)設(shè)定的operator的最大并發(fā)度計算,例如为朋,如果作業(yè)中operator的最大并發(fā)度為10臂拓,則 Parallelism/numberOfTaskSlots為向YARN申請的TaskManager數(shù),如:Parallelism為10习寸,numberOfTaskSlots為1胶惰,則TaskManager為10
- session模式:“-n NUM”參數(shù)設(shè)置TaskManager個數(shù)
2.2.4、設(shè)置內(nèi)存
- Per job模式:-ytm參數(shù)設(shè)置內(nèi)存霞溪,如-ytm 1024 表示每個taskmanager內(nèi)存1G
- session模式:-tm參數(shù)設(shè)置內(nèi)存孵滞,如 -tm 1024 表示每個taskmanager內(nèi)存1G
參數(shù)taskmanager.memory.process.size設(shè)置
2.2.5中捆、設(shè)置CPU
- k8s:kubernetes.taskmanager.cpu
- yarn: the cpu is set to the number of slots per TaskManager
如果你想為每個 TaskManager 分配多個 vcores,而不是 slot number坊饶,你可以在flink/conf/flink-conf.yaml中額外提供yarn.containers.vcores設(shè)置 - 每個 YARN 的虛擬核心數(shù)
2.3泄伪、并發(fā)度(parallelism)
2.3.1、概述
特定算子的子任務(wù)(subtask)的個數(shù)稱之為并行度(parallel)匿级,一般情況下蟋滴,一個數(shù)據(jù)流的并行度可以認為是其所有算子中最大的并行度。Flink中每個算子都可以在代碼中通過.setParallelism(n)來重新設(shè)置并行度痘绎。而并行執(zhí)行的subtask要發(fā)布到不同的slot中去執(zhí)行
2.3.2津函、設(shè)置parallelism
- 在flink的配置文件中flink-conf.yaml,默認的并行度為1孤页;
- 在以shell的方式提交flink job的時候尔苦,可以使用-p指定程序的并行度,如:flink run -p 10
- 在flink job程序內(nèi)設(shè)置并行度,如: env.setParallelism(10);
- 每個算子指定并行度行施;如:.map(new XxxMapFunction).setParallelism(5)
- 并行度設(shè)置優(yōu)先級是:算子設(shè)置并行度 > env 設(shè)置并行度 > 配置文件默認并行度
- 如果并發(fā)度值超過slot數(shù)量允坚,flink任務(wù)會等待申請任務(wù)資源超時,而拋出異常
2.4蛾号、slot
2.4.1稠项、概述
TaskManager 執(zhí)行具體的 Task,TaskManager 為了對資源進行隔離和增加允許的task數(shù)须教,引入了 slot 的概念皿渗,這個 slot 對資源的隔離僅僅是對內(nèi)存進行隔離,策略是均分轻腺,比如 taskmanager 的管理內(nèi)存是 3 GB乐疆,假如有兩個 slot,那么每個 slot 就僅僅有 1.5 GB 內(nèi)存可用
2.4.2贬养、數(shù)量設(shè)置
配置文件taskmanager.numberOfTaskSlots 參數(shù)設(shè)置
- Per job模式:-ys參數(shù)設(shè)置slot的數(shù)量
- session模式:-s參數(shù)設(shè)置slot的數(shù)量
2.4.3挤土、slot和taskManager之間的關(guān)系
每個 task slot 代表 TaskManager 中資源的固定子集。例如误算,具有 3 個 slot 的 TaskManager仰美,會將其托管內(nèi)存 1/3 用于每個 slot。分配資源意味著 subtask 不會與其他作業(yè)的 subtask 競爭托管內(nèi)存儿礼,而是具有一定數(shù)量的保留托管內(nèi)存咖杂。注意此處沒有 CPU 隔離;當前 slot 僅分離 task 的托管內(nèi)存蚊夫。
通過調(diào)整 task slot 的數(shù)量诉字,用戶可以定義 subtask 如何互相隔離。每個 TaskManager 有一個 slot,這意味著每個 task 組都在單獨的 JVM 中運行壤圃。具有多個 slot 意味著更多 subtask 共享同一 JVM陵霉。同一 JVM 中的 task 共享 TCP 連接(通過多路復(fù)用)和心跳信息。它們還可以共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)伍绳,從而減少了每個 task 的開銷
2.4.4踊挠、TaskManager,Slot冲杀,Task和并行度parallelism的關(guān)系
slot 是指 TaskManager 的最大并發(fā)能力
如上圖效床,3 個 TaskManager,每個 TaskManager 3 個 slot漠趁,此時一共有 9 個 slot
如上圖扁凛,所有的算子并行度為1,只需要 1 個 slot 就能解決問題闯传,有 8 個處于空閑。
如上圖的上半部分卤妒,并行度為2甥绿,使用了 2 個 slot。上圖的下半部分则披,設(shè)置并行度為9共缕,所有的 slot 都用到了。
2.5士复、 task 和 subtask
2.5.1图谷、概述
task: 是沒有產(chǎn)生 shuffle,One-to-one 模式下算子的集合阱洪,里面封裝了數(shù)個 subTask便贵,類似 spark 中的 TaskSet。
subTask:flink 最小的執(zhí)行單元冗荸,task 每一個分區(qū)會形成一個 subTask 承璃,類似 spark 中的 task。
2.5.2蚌本、詳細說明
下面我們用flink官方案例說明一下盔粹,案例代碼如下
案例執(zhí)行DAG圖
說明:圖中假設(shè)是 source/map 的并行度都是 2,keyby/window/apply 的并行度也都是 2程癌,sink 的是 1舷嗡,那么有幾個 task,幾個subTask 呢嵌莉?
答案:共 task 有 3 個进萄,subTask 是五個,最終需要五個線程。
解釋:由于 source 到 map 沒有產(chǎn)生 shuffle 垮斯,并且并行度相同郎仆,屬于 One-to-one 的模式,所有 source 和 map 劃分成一個 task兜蠕,后面的 map 到 keyBy 扰肌,和最后的 sink 都有 shuffle 產(chǎn)生,并行度發(fā)生改變熊杨,所有 keyBy曙旭,sink 都是一個單獨的 task,所有共有 3 個task晶府,其中 source桂躏,map 并行度是 2,所以有兩個 subTask川陆,以此類推共有 5 個 subtask剂习。
2.5.3、UI 界面上查看任務(wù)的 task 和 subTask
如下圖我們點擊任務(wù)的詳情頁面较沪,右上角的 4 就是 task 總數(shù)鳞绕,DAG 中的每一個矩形代表一個獨立的 task,點擊每一個 task 詳情尸曼,我們能看到 task 的 subtask 信息们何,包括 subtask 的接受數(shù)據(jù)量,狀態(tài)等信息
2.6控轿、Operator Chains
2.6.1冤竹、概述
為了更高效地分布式執(zhí)行,F(xiàn)link會盡可能地將operator的subtask鏈接(chain)在一起形成task茬射。每個task在一個線程中執(zhí)行鹦蠕。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化躲株,減少數(shù)據(jù)在緩沖區(qū)的交換片部,減少了延遲的同時提高整體的吞吐量。
2.6.2霜定、詳細說明
下面這幅圖档悠,展示了 Source 并行度為 1,F(xiàn)latMap望浩、KeyAggregation辖所、Sink并行度均為 2,最終以 5 個并行的線程來執(zhí)行的優(yōu)化過程
上圖中將 KeyAggregation 和 Sink 兩個 operator 進行了合并磨德,因為這兩個合并后并不會改變整體的拓撲結(jié)構(gòu)缘回。
但是吆视,并不是任意兩個 operator 就能 chain 一起的。其條件還是很苛刻的:
- 上下游的并行度一致
- 下游節(jié)點的入度為1 (也就是說下游節(jié)點沒有來自其他節(jié)點的輸入)
- 上下游節(jié)點都在同一個 slot group 中(下面會解釋 slot group)
- 下游節(jié)點的 chain 策略為 ALWAYS(可以與上下游鏈接酥宴,map啦吧、flatmap、filter等默認是ALWAYS)
- 上游節(jié)點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接拙寡,不能與上游鏈接授滓,Source默認是HEAD)
- 兩個節(jié)點間數(shù)據(jù)分區(qū)方式是 forward
- 用戶沒有禁用 chain
2.7、slotgroup
為了防止同一個 slot 包含太多的 task肆糕,或者我們希望把計算邏輯復(fù)雜的算子單獨使用 slot 般堆,提高計算速度,F(xiàn)link 提供了資源組(group) 的概念诚啃。group 就是對 operator 進行分組淮摔,同一 group 的不同 operator task 可以共享同一個 slot。默認所有 operator 屬于同一個組"default"始赎,也就是所有 operator task 可以共享一個 slot和橙。我們可以通過 slotSharingGroup() 為不同的 operator 設(shè)置不同的group
dataStream.filter(...).slotSharingGroup("groupName");