答案寫在最前面:Job的最大并行度除以每個TaskManager分配的任務槽數咧栗。
問題
在Flink 1.5 Release Notes中逆甜,有這樣一段話,直接上截圖致板。
這說明從1.5版本開始交煞,Flink on YARN時的容器數量——亦即TaskManager數量——將由程序的并行度自動推算,也就是說flink run腳本的-yn/--yarncontainer參數不起作用了斟或。那么自動推算的規(guī)則是什么呢素征?要弄清楚它,先來復習Flink的并行度(Parallelism)和任務槽(Task Slot)萝挤。
并行度(Parallelism)
與Spark類似地御毅,一個Flink Job在生成執(zhí)行計劃時也劃分成多個Task。Task可以是Source怜珍、Sink端蛆、算子或算子鏈(算子鏈有點意思,之后會另寫文章詳細說的)绘面。Task可以由多線程并發(fā)執(zhí)行欺税,每個線程處理Task輸入數據的一個子集侈沪。而并發(fā)的數量就稱為Parallelism揭璃,即并行度。
Flink程序中設定并行度有4種級別亭罪,從低到高分別為:算子級別瘦馍、執(zhí)行環(huán)境(ExecutionEnvironment)級別、客戶端(命令行)級別应役、配置文件(flink-conf.yaml)級別情组。實際執(zhí)行時,優(yōu)先級則是反過來的箩祥,算子級別最高院崇。簡單示例如下。
- 算子級別
dataStream.flatMap(new SomeFlatMapFunction()).setParallelism(4);
- 執(zhí)行環(huán)境級別
streamExecutionEnvironment.setParallelism(4);
- 命令行級別
bin/flink -run --parallelism 4 example-0.1.jar
- flink-conf.yaml級別
parallelism.default: 4
任務槽(Task Slot)
Flink運行時由兩個組件組成:JobManager與TaskManager袍祖,與Spark Standalone模式下的Master與Worker是同等概念底瓣。從官網抄來的圖如下所示,很容易理解蕉陋。
JobManager和TaskManager本質上都是JVM進程捐凭。為了提高Flink程序的運行效率和資源利用率拨扶,Flink在TaskManager中實現了任務槽(Task Slot)。任務槽是Flink計算資源的基本單位茁肠,每個任務槽可以在同一時間執(zhí)行一個Task患民,而TaskManager可以擁有一個或者多個任務槽。
任務槽可以實現TaskManager中不同Task的資源隔離垦梆,不過是邏輯隔離匹颤,并且只隔離內存,亦即在調度層面認為每個任務槽“應該”得到taskmanager.heap.size的N分之一大小的內存托猩。CPU資源不算在內惋嚎。
TaskManager的任務槽個數在使用flink run腳本提交on YARN作業(yè)時用-ys/--yarnslots參數來指定,另外在flink-conf.yaml文件中也有默認值taskManager.numberOfTaskSlots站刑。一般來講另伍,我們設定該參數時可以將它理解成一個TaskManager可以利用的CPU核心數,因此也要根據實際情況(集群的CPU資源和作業(yè)的計算量)來確定绞旅。
確定TaskManager數
以Flink自帶示例中簡化的WordCount程序為例:
// 執(zhí)行環(huán)境并行度設為6
env.setParallelism(6);
// Source并行度為1
DataStream<String> text = env
.readTextFile(params.get("input"))
.setParallelism(1);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
counts.print();
用--yarnslots 3參數來執(zhí)行摆尝,即每個TaskManager分配3個任務槽。TaskManager因悲、任務槽和任務的分布將如下圖所示堕汞,方括號內的數字為并行線程的編號。
由圖中可以看出晃琳,由于算子鏈機制的存在讯检,KeyAgg與Sink操作鏈接在了一起,作為一個Task來執(zhí)行卫旱。
Flink允許任務槽共享人灼,即來自同一個Job的不同Task的Sub-Task(理解為Task的子集就行)進入同一個槽位,因此在圖中也可以見到任務槽X中同時存在FlatMap[X]與KeyAgg[X]+Sink[X]顾翼。任務槽共享有兩點好處:
- 能夠讓每個Task的Sub-Task都均攤到不同的TaskManager投放,避免負載傾斜。
- 不需要再計算App一共需要起多少個Task适贸,因為作業(yè)需要的任務槽數量肯定等于Job中最大的并行度灸芳。
所以,可以得出Flink on YARN時拜姿,TaskManager的數量就是:max(parallelism) / yarnslots(向上取整)烙样。例如,一個最大并行度為20蕊肥,每個TaskManager有兩個任務槽的作業(yè)谒获,就會啟動10個TaskManager,如Web UI所示。