apache Flink是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計(jì)算平臺(tái)驴剔,它能夠基于同一個(gè)Flink運(yùn)行時(shí)(Flink Runtime)园匹,提供支持流處理和批處理兩種類型應(yīng)用的功能∶粮郏現(xiàn)有的開源計(jì)算方案,會(huì)把流處理和批處理作為兩種不同的應(yīng)用類型搞监,因?yàn)樗麄兯鼈兯峁┑腟LA是完全不相同的:流處理一般需要支持低延遲仁连、Exactly-once保證,而批處理需要支持高吞吐昌罩、高效處理哭懈,所以在實(shí)現(xiàn)的時(shí)候通常是分別給出兩套實(shí)現(xiàn)方法,或者通過一個(gè)獨(dú)立的開源框架來實(shí)現(xiàn)其中每一種處理方案峡迷。例如银伟,實(shí)現(xiàn)批處理的開源方案有MapReduce、Tez绘搞、Crunch彤避、Spark,實(shí)現(xiàn)流處理的開源方案有Samza夯辖、Storm琉预。
Flink在實(shí)現(xiàn)流處理和批處理時(shí),與傳統(tǒng)的一些方案完全不同蒿褂,它從另一個(gè)視角看待流處理和批處理圆米,將二者統(tǒng)一起來:Flink是完全支持流處理,也就是說作為流處理看待時(shí)輸入數(shù)據(jù)流是無界的啄栓;批處理被作為一種特殊的流處理娄帖,只是它的輸入數(shù)據(jù)流被定義為有界的£汲基于同一個(gè)Flink運(yùn)行時(shí)(Flink Runtime)近速,分別提供了流處理和批處理API,而這兩種API也是實(shí)現(xiàn)上層面向流處理堪旧、批處理類型應(yīng)用框架的基礎(chǔ)削葱。
基本特性
關(guān)于Flink所支持的特性,我這里只是通過分類的方式簡(jiǎn)單做一下梳理淳梦,涉及到具體的一些概念及其原理會(huì)在后面的部分做詳細(xì)說明析砸。
流處理特性
支持高吞吐、低延遲爆袍、高性能的流處理
支持帶有事件時(shí)間的窗口(Window)操作
支持有狀態(tài)計(jì)算的Exactly-once語義
支持高度靈活的窗口(Window)操作首繁,支持基于time作郭、count、session蛮瞄,以及data-driven的窗口操作
支持具有Backpressure功能的持續(xù)流模型
支持基于輕量級(jí)分布式快照(Snapshot)實(shí)現(xiàn)的容錯(cuò)
一個(gè)運(yùn)行時(shí)同時(shí)支持Batch on Streaming處理和Streaming處理
Flink在JVM內(nèi)部實(shí)現(xiàn)了自己的內(nèi)存管理
支持迭代計(jì)算
支持程序自動(dòng)優(yōu)化:避免特定情況下Shuffle所坯、排序等昂貴操作,中間結(jié)果有必要進(jìn)行緩存
API支持
對(duì)Streaming數(shù)據(jù)類應(yīng)用挂捅,提供DataStream API
對(duì)批處理類應(yīng)用芹助,提供DataSet API(支持Java/Scala)
Libraries支持
支持機(jī)器學(xué)習(xí)(FlinkML)
支持圖分析(Gelly)
支持關(guān)系數(shù)據(jù)處理(Table)
支持復(fù)雜事件處理(CEP)
整合支持
支持Flink on YARN
支持HDFS
支持來自Kafka的輸入數(shù)據(jù)
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS
基本概念
Stream & Transformation & Operator
用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組成,其中Stream是一個(gè)中間結(jié)果數(shù)據(jù)闲先,而Transformation是一個(gè)操作状土,它對(duì)一個(gè)或多個(gè)輸入Stream進(jìn)行計(jì)算處理,輸出一個(gè)或多個(gè)結(jié)果Stream伺糠。當(dāng)一個(gè)Flink程序被執(zhí)行的時(shí)候蒙谓,它會(huì)被映射為Streaming Dataflow。一個(gè)Streaming Dataflow是由一組Stream和Transformation Operator組成训桶,它類似于一個(gè)DAG圖累驮,在啟動(dòng)的時(shí)候從一個(gè)或多個(gè)Source Operator開始,結(jié)束于一個(gè)或多個(gè)Sink Operator舵揭。
下面是一個(gè)由Flink程序映射為Streaming Dataflow的示意圖谤专,如下所示:
上圖中,F(xiàn)linkKafkaConsumer是一個(gè)Source Operator午绳,map置侍、keyBy、timeWindow拦焚、apply是Transformation Operator蜡坊,RollingSink是一個(gè)Sink Operator。
Parallel Dataflow
在Flink中赎败,程序天生是并行和分布式的:一個(gè)Stream可以被分成多個(gè)Stream分區(qū)(Stream Partitions)秕衙,一個(gè)Operator可以被分成多個(gè)Operator Subtask,每一個(gè)Operator Subtask是在不同的線程中獨(dú)立執(zhí)行的僵刮。一個(gè)Operator的并行度灾梦,等于Operator Subtask的個(gè)數(shù),一個(gè)Stream的并行度總是等于生成它的Operator的并行度妓笙。
有關(guān)Parallel Dataflow的實(shí)例,如下圖所示:
上圖Streaming Dataflow的并行視圖中能岩,展現(xiàn)了在兩個(gè)Operator之間的Stream的兩種模式:
One-to-one模式
比如從Source[1]到map()[1]寞宫,它保持了Source的分區(qū)特性(Partitioning)和分區(qū)內(nèi)元素處理的有序性,也就是說map()[1]的Subtask看到數(shù)據(jù)流中記錄的順序拉鹃,與Source[1]中看到的記錄順序是一致的辈赋。
Redistribution模式
這種模式改變了輸入數(shù)據(jù)流的分區(qū)鲫忍,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]钥屈、keyBy()/window()/apply()[2]悟民,上游的Subtask向下游的多個(gè)不同的Subtask發(fā)送數(shù)據(jù),改變了數(shù)據(jù)流的分區(qū)篷就,這與實(shí)際應(yīng)用所選擇的Operator有關(guān)系射亏。
另外,Source Operator對(duì)應(yīng)2個(gè)Subtask竭业,所以并行度為2智润,而Sink Operator的Subtask只有1個(gè),故而并行度為1未辆。
Task & Operator Chain
在Flink分布式執(zhí)行環(huán)境中窟绷,會(huì)將多個(gè)Operator Subtask串起來組成一個(gè)Operator Chain,實(shí)際上就是一個(gè)執(zhí)行鏈咐柜,每個(gè)執(zhí)行鏈會(huì)在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行兼蜈,如下圖所示:
上圖中上半部分表示的是一個(gè)Operator Chain,多個(gè)Operator通過Stream連接拙友,而每個(gè)Operator在運(yùn)行時(shí)對(duì)應(yīng)一個(gè)Task为狸;圖中下半部分是上半部分的一個(gè)并行版本,也就是對(duì)每一個(gè)Task都并行化為多個(gè)Subtask献宫。
Time & Window
Flink支持基于時(shí)間窗口操作钥平,也支持基于數(shù)據(jù)的窗口操作,如下圖所示:
上圖中姊途,基于時(shí)間的窗口操作涉瘾,在每個(gè)相同的時(shí)間間隔對(duì)Stream中的記錄進(jìn)行處理,通常各個(gè)時(shí)間間隔內(nèi)的窗口操作處理的記錄數(shù)不固定捷兰;而基于數(shù)據(jù)驅(qū)動(dòng)的窗口操作立叛,可以在Stream中選擇固定數(shù)量的記錄作為一個(gè)窗口,對(duì)該窗口中的記錄進(jìn)行處理贡茅。
有關(guān)窗口操作的不同類型秘蛇,可以分為如下幾種:傾斜窗口(Tumbling Windows,記錄沒有重疊)顶考、滑動(dòng)窗口(Slide Windows赁还,記錄有重疊)、會(huì)話窗口(Session Windows)驹沿,具體可以查閱相關(guān)資料艘策。
在處理Stream中的記錄時(shí),記錄中通常會(huì)包含各種典型的時(shí)間字段渊季,F(xiàn)link支持多種時(shí)間的處理朋蔫,如下圖所示:
上圖描述了在基于Flink的流處理系統(tǒng)中罚渐,各種不同的時(shí)間所處的位置和含義,其中驯妄,Event Time表示事件創(chuàng)建時(shí)間荷并,Ingestion Time表示事件進(jìn)入到Flink Dataflow的時(shí)間 ,Processing Time表示某個(gè)Operator對(duì)事件進(jìn)行處理事的本地系統(tǒng)時(shí)間(是在TaskManager節(jié)點(diǎn)上)青扔。這里源织,談一下基于Event Time進(jìn)行處理的問題,通常根據(jù)Event Time會(huì)給整個(gè)Streaming應(yīng)用帶來一定的延遲性赎懦,因?yàn)樵谝粋€(gè)基于事件的處理系統(tǒng)中雀鹃,進(jìn)入系統(tǒng)的事件可能會(huì)基于Event Time而發(fā)生亂序現(xiàn)象,比如事件來源于外部的多個(gè)系統(tǒng)励两,為了增強(qiáng)事件處理吞吐量會(huì)將輸入的多個(gè)Stream進(jìn)行自然分區(qū)黎茎,每個(gè)Stream分區(qū)內(nèi)部有序,但是要保證全局有序必須同時(shí)兼顧多個(gè)Stream分區(qū)的處理当悔,設(shè)置一定的時(shí)間窗口進(jìn)行暫存數(shù)據(jù)傅瞻,當(dāng)多個(gè)Stream分區(qū)基于Event Time排列對(duì)齊后才能進(jìn)行延遲處理。所以盲憎,設(shè)置的暫存數(shù)據(jù)記錄的時(shí)間窗口越長(zhǎng)嗅骄,處理性能越差,甚至嚴(yán)重影響Stream處理的實(shí)時(shí)性饼疙。
有關(guān)基于時(shí)間的Streaming處理溺森,可以參考官方文檔,在Flink中借鑒了Google使用的WaterMark實(shí)現(xiàn)方式窑眯,可以查閱相關(guān)資料屏积。
基本架構(gòu)
Flink系統(tǒng)的架構(gòu)與Spark類似,是一個(gè)基于Master-Slave風(fēng)格的架構(gòu)磅甩,如下圖所示:
Flink集群?jiǎn)?dòng)時(shí)炊林,會(huì)啟動(dòng)一個(gè)JobManager進(jìn)程、至少一個(gè)TaskManager進(jìn)程卷要。在Local模式下渣聚,會(huì)在同一個(gè)JVM內(nèi)部啟動(dòng)一個(gè)JobManager進(jìn)程和TaskManager進(jìn)程。當(dāng)Flink程序提交后僧叉,會(huì)創(chuàng)建一個(gè)Client來進(jìn)行預(yù)處理奕枝,并轉(zhuǎn)換為一個(gè)并行數(shù)據(jù)流,這是對(duì)應(yīng)著一個(gè)Flink Job瓶堕,從而可以被JobManager和TaskManager執(zhí)行倍权。在實(shí)現(xiàn)上,F(xiàn)link基于Actor實(shí)現(xiàn)了JobManager和TaskManager,所以JobManager與TaskManager之間的信息交換薄声,都是通過事件的方式來進(jìn)行處理。
如上圖所示题画,F(xiàn)link系統(tǒng)主要包含如下3個(gè)主要的進(jìn)程:
JobManager
JobManager是Flink系統(tǒng)的協(xié)調(diào)者默辨,它負(fù)責(zé)接收Flink Job,調(diào)度組成Job的多個(gè)Task的執(zhí)行苍息。同時(shí)缩幸,JobManager還負(fù)責(zé)收集Job的狀態(tài)信息,并管理Flink集群中從節(jié)點(diǎn)TaskManager竞思。JobManager所負(fù)責(zé)的各項(xiàng)管理功能表谊,它接收到并處理的事件主要包括:
RegisterTaskManager
在Flink集群?jiǎn)?dòng)的時(shí)候,TaskManager會(huì)向JobManager注冊(cè)盖喷,如果注冊(cè)成功爆办,則JobManager會(huì)向TaskManager回復(fù)消息AcknowledgeRegistration。
SubmitJob
Flink程序內(nèi)部通過Client向JobManager提交Flink Job课梳,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息距辆。
CancelJob
請(qǐng)求取消一個(gè)Flink Job的執(zhí)行,CancelJob消息中包含了Job的ID暮刃,如果成功則返回消息CancellationSuccess跨算,失敗則返回消息CancellationFailure。
UpdateTaskExecutionState
TaskManager會(huì)向JobManager請(qǐng)求更新ExecutionGraph中的ExecutionVertex的狀態(tài)信息椭懊,更新成功則返回true诸蚕。
RequestNextInputSplit
運(yùn)行在TaskManager上面的Task,請(qǐng)求獲取下一個(gè)要處理的輸入Split氧猬,成功則返回NextInputSplit背犯。
JobStatusChanged
ExecutionGraph向JobManager發(fā)送該消息,用來表示Flink Job的狀態(tài)發(fā)生的變化狂窑,例如:RUNNING媳板、CANCELING、FINISHED等泉哈。
TaskManager
TaskManager也是一個(gè)Actor蛉幸,它是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,在其上執(zhí)行Flink Job的一組Task丛晦。每個(gè)TaskManager負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息奕纫,如內(nèi)存、磁盤烫沙、網(wǎng)絡(luò)匹层,在啟動(dòng)的時(shí)候?qū)①Y源的狀態(tài)向JobManager匯報(bào)。TaskManager端可以分成兩個(gè)階段:
注冊(cè)階段
TaskManager會(huì)向JobManager注冊(cè),發(fā)送RegisterTaskManager消息升筏,等待JobManager返回AcknowledgeRegistration撑柔,然后TaskManager就可以進(jìn)行初始化過程。
可操作階段
該階段TaskManager可以接收并處理與Task有關(guān)的消息您访,如SubmitTask铅忿、CancelTask、FailTask灵汪。如果TaskManager無法連接到JobManager檀训,這是TaskManager就失去了與JobManager的聯(lián)系,會(huì)自動(dòng)進(jìn)入“注冊(cè)階段”享言,只有完成注冊(cè)才能繼續(xù)處理Task相關(guān)的消息峻凫。
Client
當(dāng)用戶提交一個(gè)Flink程序時(shí),會(huì)首先創(chuàng)建一個(gè)Client览露,該Client首先會(huì)對(duì)用戶提交的Flink程序進(jìn)行預(yù)處理荧琼,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址肛循,并建立到JobManager的連接铭腕,將Flink Job提交給JobManager。Client會(huì)將用戶提交的Flink程序組裝一個(gè)JobGraph多糠, 并且是以JobGraph的形式提交的累舷。一個(gè)JobGraph是一個(gè)Flink Dataflow,它由多個(gè)JobVertex組成的DAG夹孔。其中被盈,一個(gè)JobGraph包含了一個(gè)Flink程序的如下信息:JobID、Job名稱搭伤、配置信息只怎、一組JobVertex等。
組件棧
Flink是一個(gè)分層架構(gòu)的系統(tǒng)怜俐,每一層所包含的組件都提供了特定的抽象身堡,用來服務(wù)于上層組件。Flink分層的組件棧如下圖所示:
下面拍鲤,我們自下而上贴谎,分別針對(duì)每一層進(jìn)行解釋說明:
Deployment層
該層主要涉及了Flink的部署模式,F(xiàn)link支持多種部署模式:本地季稳、集群(Standalone/YARN)擅这、云(GCE/EC2)。Standalone部署模式與Spark類似景鼠,這里仲翎,我們看一下Flink on YARN的部署模式,如下圖所示:
了解YARN的話,對(duì)上圖的原理非常熟悉溯香,實(shí)際Flink也實(shí)現(xiàn)了滿足在YARN集群上運(yùn)行的各個(gè)組件:Flink YARN Client負(fù)責(zé)與YARN RM通信協(xié)商資源請(qǐng)求鲫构,F(xiàn)link JobManager和Flink TaskManager分別申請(qǐng)到Container去運(yùn)行各自的進(jìn)程。通過上圖可以看到玫坛,YARN AM與Flink JobManager在同一個(gè)Container中芬迄,這樣AM可以知道Flink JobManager的地址,從而AM可以申請(qǐng)Container去啟動(dòng)Flink TaskManager昂秃。待Flink成功運(yùn)行在YARN集群上,F(xiàn)link YARN Client就可以提交Flink Job到Flink JobManager杜窄,并進(jìn)行后續(xù)的映射肠骆、調(diào)度和計(jì)算處理。
Runtime層
Runtime層提供了支持Flink計(jì)算的全部核心實(shí)現(xiàn)塞耕,比如:支持分布式Stream處理蚀腿、JobGraph到ExecutionGraph的映射、調(diào)度等等扫外,為上層API層提供基礎(chǔ)服務(wù)莉钙。
API層
API層主要實(shí)現(xiàn)了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對(duì)應(yīng)DataStream API筛谚,面向批處理對(duì)應(yīng)DataSet API磁玉。
Libraries層
該層也可以稱為Flink應(yīng)用框架層,根據(jù)API層的劃分驾讲,在API層之上構(gòu)建的滿足特定應(yīng)用的實(shí)現(xiàn)計(jì)算框架蚊伞,也分別對(duì)應(yīng)于面向流處理和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)吮铭、基于SQL-like的操作(基于Table的關(guān)系操作)时迫;面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)谓晌。
內(nèi)部原理
容錯(cuò)機(jī)制
Flink基于Checkpoint機(jī)制實(shí)現(xiàn)容錯(cuò)掠拳,它的原理是不斷地生成分布式Streaming數(shù)據(jù)流Snapshot。在流處理失敗時(shí)纸肉,通過這些Snapshot可以恢復(fù)數(shù)據(jù)流處理溺欧。理解Flink的容錯(cuò)機(jī)制,首先需要了解一下Barrier這個(gè)概念:
Stream Barrier是Flink分布式Snapshotting中的核心元素毁靶,它會(huì)作為數(shù)據(jù)流的記錄被同等看待胧奔,被插入到數(shù)據(jù)流中,將數(shù)據(jù)流中記錄的進(jìn)行分組预吆,并沿著數(shù)據(jù)流的方向向前推進(jìn)龙填。每個(gè)Barrier會(huì)攜帶一個(gè)Snapshot ID,屬于該Snapshot的記錄會(huì)被推向該Barrier的前方。因?yàn)锽arrier非常輕量岩遗,所以并不會(huì)中斷數(shù)據(jù)流扇商。帶有Barrier的數(shù)據(jù)流,如下圖所示:
基于上圖宿礁,我們通過如下要點(diǎn)來說明:
出現(xiàn)一個(gè)Barrier案铺,在該Barrier之前出現(xiàn)的記錄都屬于該Barrier對(duì)應(yīng)的Snapshot,在該Barrier之后出現(xiàn)的記錄屬于下一個(gè)Snapshot
來自不同Snapshot多個(gè)Barrier可能同時(shí)出現(xiàn)在數(shù)據(jù)流中梆靖,也就是說同一個(gè)時(shí)刻可能并發(fā)生成多個(gè)Snapshot
當(dāng)一個(gè)中間(Intermediate)Operator接收到一個(gè)Barrier后控汉,它會(huì)發(fā)送Barrier到屬于該Barrier的Snapshot的數(shù)據(jù)流中,等到Sink Operator接收到該Barrier后會(huì)向Checkpoint Coordinator確認(rèn)該Snapshot返吻,直到所有的Sink Operator都確認(rèn)了該Snapshot姑子,才被認(rèn)為完成了該Snapshot
這里還需要強(qiáng)調(diào)的是,Snapshot并不僅僅是對(duì)數(shù)據(jù)流做了一個(gè)狀態(tài)的Checkpoint测僵,它也包含了一個(gè)Operator內(nèi)部所持有的狀態(tài)街佑,這樣才能夠在保證在流處理系統(tǒng)失敗時(shí)能夠正確地恢復(fù)數(shù)據(jù)流處理。也就是說捍靠,如果一個(gè)Operator包含任何形式的狀態(tài)沐旨,這種狀態(tài)必須是Snapshot的一部分。
Operator的狀態(tài)包含兩種:一種是系統(tǒng)狀態(tài)榨婆,一個(gè)Operator進(jìn)行計(jì)算處理的時(shí)候需要對(duì)數(shù)據(jù)進(jìn)行緩沖磁携,所以數(shù)據(jù)緩沖區(qū)的狀態(tài)是與Operator相關(guān)聯(lián)的,以窗口操作的緩沖區(qū)為例纲辽,F(xiàn)link系統(tǒng)會(huì)收集或聚合記錄數(shù)據(jù)并放到緩沖區(qū)中颜武,直到該緩沖區(qū)中的數(shù)據(jù)被處理完成;另一種是用戶自定義狀態(tài)(狀態(tài)可以通過轉(zhuǎn)換函數(shù)進(jìn)行創(chuàng)建和修改)拖吼,它可以是函數(shù)中的Java對(duì)象這樣的簡(jiǎn)單變量鳞上,也可以是與函數(shù)相關(guān)的Key/Value狀態(tài)。
對(duì)于具有輕微狀態(tài)的Streaming應(yīng)用吊档,會(huì)生成非常輕量的Snapshot而且非常頻繁篙议,但并不會(huì)影響數(shù)據(jù)流處理性能。Streaming應(yīng)用的狀態(tài)會(huì)被存儲(chǔ)到一個(gè)可配置的存儲(chǔ)系統(tǒng)中怠硼,例如HDFS鬼贱。在一個(gè)Checkpoint執(zhí)行過程中,存儲(chǔ)的狀態(tài)信息及其交互過程香璃,如下圖所示:
在Checkpoint過程中这难,還有一個(gè)比較重要的操作——Stream Aligning。當(dāng)Operator接收到多個(gè)輸入的數(shù)據(jù)流時(shí)葡秒,需要在Snapshot Barrier中對(duì)數(shù)據(jù)流進(jìn)行排列對(duì)齊姻乓,如下圖所示:
具體排列過程如下:
Operator從一個(gè)incoming Stream接收到Snapshot Barrier n嵌溢,然后暫停處理,直到其它的incoming Stream的Barrier n(否則屬于2個(gè)Snapshot的記錄就混在一起了)到達(dá)該Operator
接收到Barrier n的Stream被臨時(shí)擱置蹋岩,來自這些Stream的記錄不會(huì)被處理赖草,而是被放在一個(gè)Buffer中
一旦最后一個(gè)Stream接收到Barrier n,Operator會(huì)emit所有暫存在Buffer中的記錄剪个,然后向Checkpoint Coordinator發(fā)送Snapshot n
繼續(xù)處理來自多個(gè)Stream的記錄
基于Stream Aligning操作能夠?qū)崿F(xiàn)Exactly Once語義秧骑,但是也會(huì)給流處理應(yīng)用帶來延遲,因?yàn)闉榱伺帕袑?duì)齊Barrier扣囊,會(huì)暫時(shí)緩存一部分Stream的記錄到Buffer中乎折,尤其是在數(shù)據(jù)流并行度很高的場(chǎng)景下可能更加明顯,通常以最遲對(duì)齊Barrier的一個(gè)Stream為處理Buffer中緩存記錄的時(shí)刻點(diǎn)侵歇。在Flink中笆檀,提供了一個(gè)開關(guān),選擇是否使用Stream Aligning盒至,如果關(guān)掉則Exactly Once會(huì)變成At least once。
調(diào)度機(jī)制
在JobManager端士修,會(huì)接收到Client提交的JobGraph形式的Flink Job枷遂,JobManager會(huì)將一個(gè)JobGraph轉(zhuǎn)換映射為一個(gè)ExecutionGraph,如下圖所示:
通過上圖可以看出:
JobGraph是一個(gè)Job的用戶邏輯視圖表示棋嘲,將一個(gè)用戶要對(duì)數(shù)據(jù)流進(jìn)行的處理表示為單個(gè)DAG圖(對(duì)應(yīng)于JobGraph)酒唉,DAG圖由頂點(diǎn)(JobVertex)和中間結(jié)果集(IntermediateDataSet)組成,其中JobVertex表示了對(duì)數(shù)據(jù)流進(jìn)行的轉(zhuǎn)換操作沸移,比如map痪伦、flatMap、filter雹锣、keyBy等操作网沾,而IntermediateDataSet是由上游的JobVertex所生成,同時(shí)作為下游的JobVertex的輸入蕊爵。
而ExecutionGraph是JobGraph的并行表示辉哥,也就是實(shí)際JobManager調(diào)度一個(gè)Job在TaskManager上運(yùn)行的邏輯視圖,它也是一個(gè)DAG圖攒射,是由ExecutionJobVertex醋旦、IntermediateResult(或IntermediateResultPartition)組成,ExecutionJobVertex實(shí)際對(duì)應(yīng)于JobGraph圖中的JobVertex会放,只不過在ExecutionJobVertex內(nèi)部是一種并行表示饲齐,由多個(gè)并行的ExecutionVertex所組成。另外咧最,這里還有一個(gè)重要的概念捂人,就是Execution御雕,它是一個(gè)ExecutionVertex的一次運(yùn)行Attempt,也就是說先慷,一個(gè)ExecutionVertex可能對(duì)應(yīng)多個(gè)運(yùn)行狀態(tài)的Execution饮笛,比如,一個(gè)ExecutionVertex運(yùn)行產(chǎn)生了一個(gè)失敗的Execution论熙,然后還會(huì)創(chuàng)建一個(gè)新的Execution來運(yùn)行福青,這時(shí)就對(duì)應(yīng)這個(gè)2次運(yùn)行Attempt。每個(gè)Execution通過ExecutionAttemptID來唯一標(biāo)識(shí)脓诡,在TaskManager和JobManager之間進(jìn)行Task狀態(tài)的交換都是通過ExecutionAttemptID來實(shí)現(xiàn)的无午。
下面看一下,在物理上進(jìn)行調(diào)度祝谚,基于資源的分配與使用的一個(gè)例子宪迟,來自官網(wǎng),如下圖所示:
說明如下:
左上子圖:有2個(gè)TaskManager交惯,每個(gè)TaskManager有3個(gè)Task Slot
左下子圖:一個(gè)Flink Job次泽,邏輯上包含了1個(gè)data source、1個(gè)MapFunction席爽、1個(gè)ReduceFunction意荤,對(duì)應(yīng)一個(gè)JobGraph
左下子圖:用戶提交的Flink Job對(duì)各個(gè)Operator進(jìn)行的配置——data source的并行度設(shè)置為4,MapFunction的并行度也為4只锻,ReduceFunction的并行度為3玖像,在JobManager端對(duì)應(yīng)于ExecutionGraph
右上子圖:TaskManager 1上,有2個(gè)并行的ExecutionVertex組成的DAG圖齐饮,它們各占用一個(gè)Task Slot
右下子圖:TaskManager 2上捐寥,也有2個(gè)并行的ExecutionVertex組成的DAG圖,它們也各占用一個(gè)Task Slot
在2個(gè)TaskManager上運(yùn)行的4個(gè)Execution是并行執(zhí)行的
迭代機(jī)制
機(jī)器學(xué)習(xí)和圖計(jì)算應(yīng)用祖驱,都會(huì)使用到迭代計(jì)算握恳,F(xiàn)link通過在迭代Operator中定義Step函數(shù)來實(shí)現(xiàn)迭代算法,這種迭代算法包括Iterate和Delta Iterate兩種類型捺僻,在實(shí)現(xiàn)上它們反復(fù)地在當(dāng)前迭代狀態(tài)上調(diào)用Step函數(shù)睡互,直到滿足給定的條件才會(huì)停止迭代。下面陵像,對(duì)Iterate和Delta Iterate兩種類型的迭代算法原理進(jìn)行說明:
Iterate
Iterate Operator是一種簡(jiǎn)單的迭代形式:每一輪迭代就珠,Step函數(shù)的輸入或者是輸入的整個(gè)數(shù)據(jù)集,或者是上一輪迭代的結(jié)果醒颖,通過該輪迭代計(jì)算出下一輪計(jì)算所需要的輸入(也稱為Next Partial Solution)妻怎,滿足迭代的終止條件后,會(huì)輸出最終迭代結(jié)果泞歉,具體執(zhí)行流程如下圖所示:
Step函數(shù)在每一輪迭代中都會(huì)被執(zhí)行逼侦,它可以是由map匿辩、reduce、join等Operator組成的數(shù)據(jù)流榛丢。下面通過官網(wǎng)給出的一個(gè)例子來說明Iterate Operator铲球,非常簡(jiǎn)單直觀荚恶,如下圖所示:
上面迭代過程中箱沦,輸入數(shù)據(jù)為1到5的數(shù)字,Step函數(shù)就是一個(gè)簡(jiǎn)單的map函數(shù)良蛮,會(huì)對(duì)每個(gè)輸入的數(shù)字進(jìn)行加1處理掖鱼,而Next Partial Solution對(duì)應(yīng)于經(jīng)過map函數(shù)處理后的結(jié)果然走,比如第一輪迭代,對(duì)輸入的數(shù)字1加1后結(jié)果為2戏挡,對(duì)輸入的數(shù)字2加1后結(jié)果為3芍瑞,直到對(duì)輸入數(shù)字5加1后結(jié)果為變?yōu)?,這些新生成結(jié)果數(shù)字2~6會(huì)作為第二輪迭代的輸入褐墅。迭代終止條件為進(jìn)行10輪迭代拆檬,則最終的結(jié)果為11~15。
Delta Iterate
Delta Iterate Operator實(shí)現(xiàn)了增量迭代妥凳,它的實(shí)現(xiàn)原理如下圖所示:
基于Delta Iterate Operator實(shí)現(xiàn)增量迭代秩仆,它有2個(gè)輸入,其中一個(gè)是初始Workset猾封,表示輸入待處理的增量Stream數(shù)據(jù),另一個(gè)是初始Solution Set噪珊,它是經(jīng)過Stream方向上Operator處理過的結(jié)果晌缘。第一輪迭代會(huì)將Step函數(shù)作用在初始Workset上,得到的計(jì)算結(jié)果Workset作為下一輪迭代的輸入痢站,同時(shí)還要增量更新初始Solution Set磷箕。如果反復(fù)迭代知道滿足迭代終止條件,最后會(huì)根據(jù)Solution Set的結(jié)果阵难,輸出最終迭代結(jié)果岳枷。
比如,我們現(xiàn)在已知一個(gè)Solution集合中保存的是呜叫,已有的商品分類大類中購買量最多的商品空繁,而Workset輸入的是來自線上實(shí)時(shí)交易中最新達(dá)成購買的商品的人數(shù),經(jīng)過計(jì)算會(huì)生成新的商品分類大類中商品購買量最多的結(jié)果朱庆,如果某些大類中商品購買量突然增長(zhǎng)盛泡,它需要更新Solution Set中的結(jié)果(原來購買量最多的商品,經(jīng)過增量迭代計(jì)算娱颊,可能已經(jīng)不是最多)傲诵,最后會(huì)輸出最終商品分類大類中購買量最多的商品結(jié)果集合凯砍。更詳細(xì)的例子,可以參考官網(wǎng)給出的“Propagate Minimum in Graph”拴竹,這里不再累述悟衩。
Backpressure監(jiān)控
Backpressure在流式計(jì)算系統(tǒng)中會(huì)比較受到關(guān)注,因?yàn)樵谝粋€(gè)Stream上進(jìn)行處理的多個(gè)Operator之間栓拜,它們處理速度和方式可能非常不同座泳,所以就存在上游Operator如果處理速度過快,下游Operator處可能機(jī)會(huì)堆積Stream記錄菱属,嚴(yán)重會(huì)造成處理延遲或下游Operator負(fù)載過重而崩潰(有些系統(tǒng)可能會(huì)丟失數(shù)據(jù))钳榨。因此,對(duì)下游Operator處理速度跟不上的情況纽门,如果下游Operator能夠?qū)⒆约禾幚頎顟B(tài)傳播給上游Operator薛耻,使得上游Operator處理速度慢下來就會(huì)緩解上述問題,比如通過告警的方式通知現(xiàn)有流處理系統(tǒng)存在的問題赏陵。
Flink Web界面上提供了對(duì)運(yùn)行Job的Backpressure行為的監(jiān)控饼齿,它通過使用Sampling線程對(duì)正在運(yùn)行的Task進(jìn)行堆棧跟蹤采樣來實(shí)現(xiàn),具體實(shí)現(xiàn)方式如下圖所示:
JobManager會(huì)反復(fù)調(diào)用一個(gè)Job的Task運(yùn)行所在線程的Thread.getStackTrace()蝙搔,默認(rèn)情況下缕溉,JobManager會(huì)每間隔50ms觸發(fā)對(duì)一個(gè)Job的每個(gè)Task依次進(jìn)行100次堆棧跟蹤調(diào)用,根據(jù)調(diào)用調(diào)用結(jié)果來確定Backpressure吃型,F(xiàn)link是通過計(jì)算得到一個(gè)比值(Radio)來確定當(dāng)前運(yùn)行Job的Backpressure狀態(tài)证鸥。在Web界面上可以看到這個(gè)Radio值,它表示在一個(gè)內(nèi)部方法調(diào)用中阻塞(Stuck)的堆棧跟蹤次數(shù)勤晚,例如枉层,radio=0.01,表示100次中僅有1次方法調(diào)用阻塞赐写。Flink目前定義了如下Backpressure狀態(tài):
OK: 0 <= Ratio <= 0.10
LOW: 0.10 < Ratio <= 0.5
HIGH: 0.5 < Ratio <= 1
另外鸟蜡,F(xiàn)link還提供了3個(gè)參數(shù)來配置Backpressure監(jiān)控行為:
參數(shù)名稱默認(rèn)值說明
jobmanager.web.backpressure.refresh-interval60000默認(rèn)1分鐘,表示采樣統(tǒng)計(jì)結(jié)果刷新時(shí)間間隔
jobmanager.web.backpressure.num-samples100評(píng)估Backpressure狀態(tài)挺邀,所使用的堆棧跟蹤調(diào)用次數(shù)
jobmanager.web.backpressure.delay-between-samples50默認(rèn)50毫秒揉忘,表示對(duì)一個(gè)Job的每個(gè)Task依次調(diào)用的時(shí)間間隔
通過上面?zhèn)€定義的Backpressure狀態(tài),以及調(diào)整相應(yīng)的參數(shù)端铛,可以確定當(dāng)前運(yùn)行的Job的狀態(tài)是否正常泣矛,并且保證不影響JobManager提供服務(wù)。