架構(gòu)
要了解一個(gè)系統(tǒng)寺枉,一般都是從架構(gòu)開始耙替。我們關(guān)心的問題是:系統(tǒng)部署成功后各個(gè)節(jié)點(diǎn)都啟動(dòng)了哪些服務(wù)涨享,各個(gè)服務(wù)之間又是怎么交互和協(xié)調(diào)的缸废。下方是 Flink 集群啟動(dòng)后架構(gòu)圖吟秩。
當(dāng) Flink 集群啟動(dòng)后咱扣,首先會(huì)啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager涵防,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行闹伪,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸壮池。上述三者均為獨(dú)立的 JVM 進(jìn)程偏瓤。
- Client 為提交 Job 的客戶端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)椰憋。提交 Job 后厅克,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回橙依。
- JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint证舟,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后窗骑,會(huì)生成優(yōu)化后的執(zhí)行計(jì)劃女责,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。
- TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot)创译,每個(gè) slot 能啟動(dòng)一個(gè) Task抵知,Task 為線程。從 JobManager 處接收需要部署的 Task软族,部署啟動(dòng)后辛藻,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理互订。
可以看到 Flink 的任務(wù)調(diào)度是多線程模型吱肌,并且不同Job/Task混合在一個(gè) TaskManager 進(jìn)程中。雖然這種方式可以有效提高 CPU 利用率仰禽,但是個(gè)人不太喜歡這種設(shè)計(jì)氮墨,因?yàn)椴粌H缺乏資源隔離機(jī)制,同時(shí)也不方便調(diào)試吐葵。類似 Storm 的進(jìn)程模型规揪,一個(gè)JVM 中只跑該 Job 的 Tasks 實(shí)際應(yīng)用中更為合理。
Job 例子
本文所示例子為 flink-1.0.x 版本
我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount
温峭,這是一個(gè)從 socket 流中統(tǒng)計(jì)單詞出現(xiàn)次數(shù)的例子猛铅。
-
首先,使用 netcat 啟動(dòng)本地服務(wù)器:
$ nc -l 9000
-
然后提交 Flink 程序
$ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \ --hostname 10.218.130.9 \ --port 9000
在netcat端輸入單詞并監(jiān)控 taskmanager 的輸出可以看到單詞統(tǒng)計(jì)的結(jié)果凤藏。
SocketTextStreamWordCount
的具體代碼如下:
public static void main(String[] args) throws Exception {
// 檢查輸入
final ParameterTool params = ParameterTool.fromArgs(args);
...
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text =
env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1);
counts.print();
// execute program
env.execute("WordCount from SocketTextStream Example");
}
我們將最后一行代碼 env.execute
替換成 System.out.println(env.getExecutionPlan());
并在本地運(yùn)行該代碼(并發(fā)度設(shè)為2)奸忽,可以得到該拓?fù)涞倪壿媹?zhí)行計(jì)劃圖的 JSON 串堕伪,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 中,能可視化該執(zhí)行圖栗菜。
但這并不是最終在 Flink 中運(yùn)行的執(zhí)行圖欠雌,只是一個(gè)表示拓?fù)涔?jié)點(diǎn)關(guān)系的計(jì)劃圖,在 Flink 中對(duì)應(yīng)了 SteramGraph疙筹。另外富俄,提交拓?fù)浜螅úl(fā)度設(shè)為2)還能在 UI 中看到另一張執(zhí)行計(jì)劃圖,如下所示而咆,該圖對(duì)應(yīng)了 Flink 中的 JobGraph霍比。
Graph
看起來有點(diǎn)亂,怎么有這么多不一樣的圖暴备。實(shí)際上桂塞,還有更多的圖。Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖馍驯。
- StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖阁危。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
- JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph汰瘫,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)狂打。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn)混弥,這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗趴乡。
- ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)蝗拿。
- 物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后晾捏,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)哀托。
例如上文中的2個(gè)并發(fā)度(Source為1個(gè)并發(fā)度)的 SocketTextStreamWordCount
四層執(zhí)行圖的演變過程如下圖所示(點(diǎn)擊查看大圖):
這里對(duì)一些名詞進(jìn)行簡單的解釋惦辛。
-
StreamGraph:根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。
- StreamNode:用來代表 operator 的類仓手,并具有所有相關(guān)的屬性胖齐,如并發(fā)度、入邊和出邊等嗽冒。
- StreamEdge:表示連接兩個(gè)StreamNode的邊呀伙。
-
JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)添坊。
- JobVertex:經(jīng)過優(yōu)化后符合條件的多個(gè)StreamNode可能會(huì)chain在一起生成一個(gè)JobVertex剿另,即一個(gè)JobVertex包含一個(gè)或多個(gè)operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet雨女。
- IntermediateDataSet:表示JobVertex的輸出谚攒,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。producer是JobVertex戚篙,consumer是JobEdge五鲫。
- JobEdge:代表了job graph中的一條數(shù)據(jù)傳輸通道溺职。source 是 IntermediateDataSet岔擂,target 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex浪耘。
-
ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖乱灵,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
- ExecutionJobVertex:和JobGraph中的JobVertex一一對(duì)應(yīng)七冲。每一個(gè)ExecutionJobVertex都有和并發(fā)度一樣多的 ExecutionVertex痛倚。
- ExecutionVertex:表示ExecutionJobVertex的其中一個(gè)并發(fā)子任務(wù),輸入是ExecutionEdge澜躺,輸出是IntermediateResultPartition蝉稳。
- IntermediateResult:和JobGraph中的IntermediateDataSet一一對(duì)應(yīng)。每一個(gè)IntermediateResult有與下游ExecutionJobVertex相同并發(fā)數(shù)的IntermediateResultPartition掘鄙。
- IntermediateResultPartition:表示ExecutionVertex的一個(gè)輸出分區(qū)耘戚,producer是ExecutionVertex,consumer是若干個(gè)ExecutionEdge操漠。
- ExecutionEdge:表示ExecutionVertex的輸入收津,source是IntermediateResultPartition,target是ExecutionVertex浊伙。source和target都只能是一個(gè)撞秋。
- Execution:是執(zhí)行一個(gè) ExecutionVertex 的一次嘗試。當(dāng)發(fā)生故障或者數(shù)據(jù)需要重算的情況下 ExecutionVertex 可能會(huì)有多個(gè) ExecutionAttemptID嚣鄙。一個(gè) Execution 通過 ExecutionAttemptID 來唯一標(biāo)識(shí)吻贿。JM和TM之間關(guān)于 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者。
-
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后哑子,在各個(gè)TaskManager 上部署 Task 后形成的“圖”廓八,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。
- Task:Execution被調(diào)度后在分配的 TaskManager 中啟動(dòng)對(duì)應(yīng)的 Task赵抢。Task 包裹了具有用戶執(zhí)行邏輯的 operator剧蹂。
- ResultPartition:代表由一個(gè)Task的生成的數(shù)據(jù),和ExecutionGraph中的IntermediateResultPartition一一對(duì)應(yīng)烦却。
- ResultSubpartition:是ResultPartition的一個(gè)子分區(qū)宠叼。每個(gè)ResultPartition包含多個(gè)ResultSubpartition,其數(shù)目要由下游消費(fèi) Task 數(shù)和 DistributionPattern 來決定。
- InputGate:代表Task的輸入封裝冒冬,和JobGraph中JobEdge一一對(duì)應(yīng)伸蚯。每個(gè)InputGate消費(fèi)了一個(gè)或多個(gè)的ResultPartition。
- InputChannel:每個(gè)InputGate會(huì)包含一個(gè)以上的InputChannel简烤,和ExecutionGraph中的ExecutionEdge一一對(duì)應(yīng)剂邮,也和ResultSubpartition一對(duì)一地相連,即一個(gè)InputChannel接收一個(gè)ResultSubpartition的輸出横侦。
那么 Flink 為什么要設(shè)計(jì)這4張圖呢挥萌,其目的是什么呢?Spark 中也有多張圖枉侧,數(shù)據(jù)依賴圖以及物理執(zhí)行的DAG引瀑。其目的都是一樣的,就是解耦榨馁,每張圖各司其職憨栽,每張圖對(duì)應(yīng)了 Job 不同的階段,更方便做該階段的事情翼虫。我們給出更完整的 Flink Graph 的層次圖屑柔。
首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan珍剑。OptimizedPlan 是由 Batch API 轉(zhuǎn)換而來的掸宛。StreamGraph 是由 Stream API 轉(zhuǎn)換而來的。為什么 API 不直接轉(zhuǎn)換成 JobGraph次慢?因?yàn)榕缘樱珺atch 和 Stream 的圖結(jié)構(gòu)和優(yōu)化方法有很大的區(qū)別,比如 Batch 有很多執(zhí)行前的預(yù)分析用來優(yōu)化圖的執(zhí)行迫像,而這種優(yōu)化并不普適于 Stream劈愚,所以通過 OptimizedPlan 來做 Batch 的優(yōu)化會(huì)更方便和清晰,也不會(huì)影響 Stream闻妓。JobGraph 的責(zé)任就是統(tǒng)一 Batch 和 Stream 的圖菌羽,用來描述清楚一個(gè)拓?fù)鋱D的結(jié)構(gòu),并且做了 chaining 的優(yōu)化由缆,chaining 是普適于 Batch 和 Stream 的注祖,所以在這一層做掉。ExecutionGraph 的責(zé)任是方便調(diào)度和各個(gè) tasks 狀態(tài)的監(jiān)控和跟蹤均唉,所以 ExecutionGraph 是并行化的 JobGraph是晨。而“物理執(zhí)行圖”就是最終分布式在各個(gè)機(jī)器上運(yùn)行著的tasks了。所以可以看到舔箭,這種解耦方式極大地方便了我們在各個(gè)層所做的工作罩缴,各個(gè)層之間是相互隔離的蚊逢。
后續(xù)的文章,將會(huì)詳細(xì)介紹 Flink 是如何生成這些執(zhí)行圖的箫章。由于我目前關(guān)注 Flink 的流處理功能烙荷,所以主要有以下內(nèi)容:
- 如何生成 StreamGraph
- 如何生成 JobGraph
- 如何生成 ExecutionGraph
- 如何進(jìn)行調(diào)度(如何生成物理執(zhí)行圖)