[轉(zhuǎn)]Flink 原理與實(shí)現(xiàn):架構(gòu)和拓?fù)涓庞[

架構(gòu)

要了解一個(gè)系統(tǒng)寺枉,一般都是從架構(gòu)開始耙替。我們關(guān)心的問題是:系統(tǒng)部署成功后各個(gè)節(jié)點(diǎn)都啟動(dòng)了哪些服務(wù)涨享,各個(gè)服務(wù)之間又是怎么交互和協(xié)調(diào)的缸废。下方是 Flink 集群啟動(dòng)后架構(gòu)圖吟秩。

image

當(dāng) Flink 集群啟動(dòng)后咱扣,首先會(huì)啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager涵防,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行闹伪,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸壮池。上述三者均為獨(dú)立的 JVM 進(jìn)程偏瓤。

  • Client 為提交 Job 的客戶端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)椰憋。提交 Job 后厅克,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回橙依。
  • JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint证舟,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后窗骑,會(huì)生成優(yōu)化后的執(zhí)行計(jì)劃女责,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。
  • TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot)创译,每個(gè) slot 能啟動(dòng)一個(gè) Task抵知,Task 為線程。從 JobManager 處接收需要部署的 Task软族,部署啟動(dòng)后辛藻,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理互订。

可以看到 Flink 的任務(wù)調(diào)度是多線程模型吱肌,并且不同Job/Task混合在一個(gè) TaskManager 進(jìn)程中。雖然這種方式可以有效提高 CPU 利用率仰禽,但是個(gè)人不太喜歡這種設(shè)計(jì)氮墨,因?yàn)椴粌H缺乏資源隔離機(jī)制,同時(shí)也不方便調(diào)試吐葵。類似 Storm 的進(jìn)程模型规揪,一個(gè)JVM 中只跑該 Job 的 Tasks 實(shí)際應(yīng)用中更為合理。

Job 例子

本文所示例子為 flink-1.0.x 版本

我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount温峭,這是一個(gè)從 socket 流中統(tǒng)計(jì)單詞出現(xiàn)次數(shù)的例子猛铅。

  • 首先,使用 netcat 啟動(dòng)本地服務(wù)器:

    $ nc -l 9000
    
  • 然后提交 Flink 程序

    $ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
      --hostname 10.218.130.9 \
      --port 9000
    

在netcat端輸入單詞并監(jiān)控 taskmanager 的輸出可以看到單詞統(tǒng)計(jì)的結(jié)果凤藏。

SocketTextStreamWordCount 的具體代碼如下:

public static void main(String[] args) throws Exception {
    // 檢查輸入
    final ParameterTool params = ParameterTool.fromArgs(args);
    ...

    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data
    DataStream<String> text =
            env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);

    DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .keyBy(0)
                    .sum(1);
    counts.print();

    // execute program
    env.execute("WordCount from SocketTextStream Example");
}

我們將最后一行代碼 env.execute 替換成 System.out.println(env.getExecutionPlan()); 并在本地運(yùn)行該代碼(并發(fā)度設(shè)為2)奸忽,可以得到該拓?fù)涞倪壿媹?zhí)行計(jì)劃圖的 JSON 串堕伪,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 中,能可視化該執(zhí)行圖栗菜。

image

但這并不是最終在 Flink 中運(yùn)行的執(zhí)行圖欠雌,只是一個(gè)表示拓?fù)涔?jié)點(diǎn)關(guān)系的計(jì)劃圖,在 Flink 中對(duì)應(yīng)了 SteramGraph疙筹。另外富俄,提交拓?fù)浜螅úl(fā)度設(shè)為2)還能在 UI 中看到另一張執(zhí)行計(jì)劃圖,如下所示而咆,該圖對(duì)應(yīng)了 Flink 中的 JobGraph霍比。

image

Graph

看起來有點(diǎn)亂,怎么有這么多不一樣的圖暴备。實(shí)際上桂塞,還有更多的圖。Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖馍驯。

  • StreamGraph:是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖阁危。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
  • JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph汰瘫,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)狂打。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn)混弥,這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗趴乡。
  • ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)蝗拿。
  • 物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后晾捏,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)哀托。

例如上文中的2個(gè)并發(fā)度(Source為1個(gè)并發(fā)度)的 SocketTextStreamWordCount 四層執(zhí)行圖的演變過程如下圖所示(點(diǎn)擊查看大圖):

image

這里對(duì)一些名詞進(jìn)行簡單的解釋惦辛。

  • StreamGraph:根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。

    • StreamNode:用來代表 operator 的類仓手,并具有所有相關(guān)的屬性胖齐,如并發(fā)度、入邊和出邊等嗽冒。
    • StreamEdge:表示連接兩個(gè)StreamNode的邊呀伙。
  • JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)添坊。

    • JobVertex:經(jīng)過優(yōu)化后符合條件的多個(gè)StreamNode可能會(huì)chain在一起生成一個(gè)JobVertex剿另,即一個(gè)JobVertex包含一個(gè)或多個(gè)operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet雨女。
    • IntermediateDataSet:表示JobVertex的輸出谚攒,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。producer是JobVertex戚篙,consumer是JobEdge五鲫。
    • JobEdge:代表了job graph中的一條數(shù)據(jù)傳輸通道溺职。source 是 IntermediateDataSet岔擂,target 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex浪耘。
  • ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖乱灵,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

    • ExecutionJobVertex:和JobGraph中的JobVertex一一對(duì)應(yīng)七冲。每一個(gè)ExecutionJobVertex都有和并發(fā)度一樣多的 ExecutionVertex痛倚。
    • ExecutionVertex:表示ExecutionJobVertex的其中一個(gè)并發(fā)子任務(wù),輸入是ExecutionEdge澜躺,輸出是IntermediateResultPartition蝉稳。
    • IntermediateResult:和JobGraph中的IntermediateDataSet一一對(duì)應(yīng)。每一個(gè)IntermediateResult有與下游ExecutionJobVertex相同并發(fā)數(shù)的IntermediateResultPartition掘鄙。
    • IntermediateResultPartition:表示ExecutionVertex的一個(gè)輸出分區(qū)耘戚,producer是ExecutionVertex,consumer是若干個(gè)ExecutionEdge操漠。
    • ExecutionEdge:表示ExecutionVertex的輸入收津,source是IntermediateResultPartition,target是ExecutionVertex浊伙。source和target都只能是一個(gè)撞秋。
    • Execution:是執(zhí)行一個(gè) ExecutionVertex 的一次嘗試。當(dāng)發(fā)生故障或者數(shù)據(jù)需要重算的情況下 ExecutionVertex 可能會(huì)有多個(gè) ExecutionAttemptID嚣鄙。一個(gè) Execution 通過 ExecutionAttemptID 來唯一標(biāo)識(shí)吻贿。JM和TM之間關(guān)于 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者。
  • 物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后哑子,在各個(gè)TaskManager 上部署 Task 后形成的“圖”廓八,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。

    • Task:Execution被調(diào)度后在分配的 TaskManager 中啟動(dòng)對(duì)應(yīng)的 Task赵抢。Task 包裹了具有用戶執(zhí)行邏輯的 operator剧蹂。
    • ResultPartition:代表由一個(gè)Task的生成的數(shù)據(jù),和ExecutionGraph中的IntermediateResultPartition一一對(duì)應(yīng)烦却。
    • ResultSubpartition:是ResultPartition的一個(gè)子分區(qū)宠叼。每個(gè)ResultPartition包含多個(gè)ResultSubpartition,其數(shù)目要由下游消費(fèi) Task 數(shù)和 DistributionPattern 來決定。
    • InputGate:代表Task的輸入封裝冒冬,和JobGraph中JobEdge一一對(duì)應(yīng)伸蚯。每個(gè)InputGate消費(fèi)了一個(gè)或多個(gè)的ResultPartition。
    • InputChannel:每個(gè)InputGate會(huì)包含一個(gè)以上的InputChannel简烤,和ExecutionGraph中的ExecutionEdge一一對(duì)應(yīng)剂邮,也和ResultSubpartition一對(duì)一地相連,即一個(gè)InputChannel接收一個(gè)ResultSubpartition的輸出横侦。

那么 Flink 為什么要設(shè)計(jì)這4張圖呢挥萌,其目的是什么呢?Spark 中也有多張圖枉侧,數(shù)據(jù)依賴圖以及物理執(zhí)行的DAG引瀑。其目的都是一樣的,就是解耦榨馁,每張圖各司其職憨栽,每張圖對(duì)應(yīng)了 Job 不同的階段,更方便做該階段的事情翼虫。我們給出更完整的 Flink Graph 的層次圖屑柔。

image

首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan珍剑。OptimizedPlan 是由 Batch API 轉(zhuǎn)換而來的掸宛。StreamGraph 是由 Stream API 轉(zhuǎn)換而來的。為什么 API 不直接轉(zhuǎn)換成 JobGraph次慢?因?yàn)榕缘樱珺atch 和 Stream 的圖結(jié)構(gòu)和優(yōu)化方法有很大的區(qū)別,比如 Batch 有很多執(zhí)行前的預(yù)分析用來優(yōu)化圖的執(zhí)行迫像,而這種優(yōu)化并不普適于 Stream劈愚,所以通過 OptimizedPlan 來做 Batch 的優(yōu)化會(huì)更方便和清晰,也不會(huì)影響 Stream闻妓。JobGraph 的責(zé)任就是統(tǒng)一 Batch 和 Stream 的圖菌羽,用來描述清楚一個(gè)拓?fù)鋱D的結(jié)構(gòu),并且做了 chaining 的優(yōu)化由缆,chaining 是普適于 Batch 和 Stream 的注祖,所以在這一層做掉。ExecutionGraph 的責(zé)任是方便調(diào)度和各個(gè) tasks 狀態(tài)的監(jiān)控和跟蹤均唉,所以 ExecutionGraph 是并行化的 JobGraph是晨。而“物理執(zhí)行圖”就是最終分布式在各個(gè)機(jī)器上運(yùn)行著的tasks了。所以可以看到舔箭,這種解耦方式極大地方便了我們在各個(gè)層所做的工作罩缴,各個(gè)層之間是相互隔離的蚊逢。

后續(xù)的文章,將會(huì)詳細(xì)介紹 Flink 是如何生成這些執(zhí)行圖的箫章。由于我目前關(guān)注 Flink 的流處理功能烙荷,所以主要有以下內(nèi)容:

  1. 如何生成 StreamGraph
  2. 如何生成 JobGraph
  3. 如何生成 ExecutionGraph
  4. 如何進(jìn)行調(diào)度(如何生成物理執(zhí)行圖)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市檬寂,隨后出現(xiàn)的幾起案子终抽,更是在濱河造成了極大的恐慌,老刑警劉巖桶至,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昼伴,死亡現(xiàn)場離奇詭異,居然都是意外死亡塞茅,警方通過查閱死者的電腦和手機(jī)亩码,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門季率,熙熙樓的掌柜王于貴愁眉苦臉地迎上來野瘦,“玉大人,你說我怎么就攤上這事飒泻”薰猓” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵泞遗,是天一觀的道長惰许。 經(jīng)常有香客問我,道長史辙,這世上最難降的妖魔是什么汹买? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮聊倔,結(jié)果婚禮上晦毙,老公的妹妹穿的比我還像新娘。我一直安慰自己耙蔑,他們只是感情好见妒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著甸陌,像睡著了一般须揣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钱豁,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天耻卡,我揣著相機(jī)與錄音,去河邊找鬼牲尺。 笑死卵酪,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播凛澎,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼霹肝,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了塑煎?” 一聲冷哼從身側(cè)響起沫换,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎最铁,沒想到半個(gè)月后讯赏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冷尉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年漱挎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雀哨。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡磕谅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出雾棺,到底是詐尸還是另有隱情膊夹,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布捌浩,位于F島的核電站放刨,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏尸饺。R本人自食惡果不足惜进统,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浪听。 院中可真熱鬧螟碎,春花似錦、人聲如沸馋辈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽迈螟。三九已至叉抡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間答毫,已是汗流浹背褥民。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留洗搂,地道東北人消返。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓载弄,卻偏偏與公主長得像,于是被迫代替她去往敵國和親撵颊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子宇攻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 架構(gòu) 要了解一個(gè)系統(tǒng),一般都是從架構(gòu)開始倡勇。我們關(guān)心的問題是:系統(tǒng)部署成功后各個(gè)節(jié)點(diǎn)都啟動(dòng)了哪些服務(wù)逞刷,各個(gè)服務(wù)之間又...
    尼小摩閱讀 4,928評(píng)論 0 7
  • apache Flink是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計(jì)算平臺(tái),它能夠基于同一個(gè)Flink運(yùn)行時(shí)(...
    生活的探路者閱讀 1,475評(píng)論 3 8
  • 事件:今天我跟誰是一組,她只抽了她自己的推針扔役,但是我沒有看到帆喇,我就幫忙一起弄鹽水,他跟我說“要不你先抽你自己的推針...
    金龜子的小天地閱讀 215評(píng)論 0 3
  • 我總喜歡自嘲自己是狡兔三窟亿胸,故鄉(xiāng)有個(gè)家坯钦,工作的地方有個(gè)家,孩子上學(xué)的地方還有個(gè)家损敷,我常常在想葫笼,究竟那個(gè)家才是自己的...
    來日緣何方長閱讀 412評(píng)論 0 0
  • 今天為大家介紹一部馮導(dǎo)的執(zhí)念之作《一九四二》。電影改編自劉震云的《溫故1942》溯街。馮小剛經(jīng)過十年的醞釀才動(dòng)手開拍的...
    生如逆旅66閱讀 1,750評(píng)論 6 13