基礎(chǔ)概念考察
一、 簡(jiǎn)單介紹一下 Flink
Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算命爬。并且 Flink 提供了數(shù)據(jù)分布、容錯(cuò)機(jī)制以及資源管理等核心功能辐脖。
Flink 的特性包括:
支持高吞吐饲宛、低延遲、高性能的流處理支持帶有事件時(shí)間的窗口 (Window) 操作支持有狀態(tài)計(jì)算的 Exactly-once 語(yǔ)義支持高度靈活的窗口 (Window) 操作嗜价,支持基于 time艇抠、count幕庐、session 以及 data-driven 的窗口操作支持具有 Backpressure 功能的持續(xù)流模型支持基于輕量級(jí)分布式快照(Snapshot)實(shí)現(xiàn)的容錯(cuò)一個(gè)運(yùn)行時(shí)同時(shí)支持 Batch on Streaming 處理和 Streaming 處理Flink 在 JVM 內(nèi)部實(shí)現(xiàn)了自己的內(nèi)存管理支持迭代計(jì)算支持程序自動(dòng)優(yōu)化:避免特定情況下 Shuffle、排序等昂貴操作练链,中間結(jié)果有必要進(jìn)行緩存
二翔脱、Flink 相比傳統(tǒng)的 Spark Streaming 有什么區(qū)別?
- 架構(gòu)模型
Spark Streaming 在運(yùn)行時(shí)的主要角色包括:Master、Worker媒鼓、Driver届吁、Executor,F(xiàn)link 在運(yùn)行時(shí)主要包含:Jobmanager绿鸣、Taskmanager 和 Slot疚沐。
- 任務(wù)調(diào)度
Spark Streaming 連續(xù)不斷的生成微小的數(shù)據(jù)批次,構(gòu)建有向無(wú)環(huán)圖 DAG潮模,Spark Streaming 會(huì)依次創(chuàng)建 DStreamGraph亮蛔、JobGenerator、JobScheduler擎厢。
Flink 根據(jù)用戶(hù)提交的代碼生成 StreamGraph究流,經(jīng)過(guò)優(yōu)化生成 JobGraph,然后提交給 JobManager 進(jìn)行處理动遭,JobManager 會(huì)根據(jù) JobGraph 生成 ExecutionGraph芬探,ExecutionGraph 是 Flink 調(diào)度最核心的數(shù)據(jù)結(jié)構(gòu),JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度厘惦。
- 時(shí)間機(jī)制
Spark Streaming 支持的時(shí)間機(jī)制有限偷仿,只支持處理時(shí)間。Flink 支持了流處理程序在時(shí)間上的三個(gè)定義:處理時(shí)間宵蕉、事件時(shí)間酝静、注入時(shí)間。同時(shí)也支持 watermark 機(jī)制來(lái)處理滯后數(shù)據(jù)羡玛。
- 容錯(cuò)機(jī)制
對(duì)于 Spark Streaming 任務(wù)别智,我們可以設(shè)置 checkpoint,然后假如發(fā)生故障并重啟稼稿,我們可以從上次 checkpoint 之處恢復(fù)亿遂,但是這個(gè)行為只能使得數(shù)據(jù)不丟失,可能會(huì)重復(fù)處理渺杉,不能做到恰一次處理語(yǔ)義。
Flink 則使用兩階段提交協(xié)議來(lái)解決這個(gè)問(wèn)題挪钓。
三是越、Flink 組件棧
自下而上,每一層分別代表:Deploy 層:該層主要涉及了 Flink 的部署模式碌上,在上圖中我們可以看出倚评,F(xiàn)link 支持包括 local浦徊、Standalone、Cluster天梧、Cloud 等多種部署模式盔性。Runtime 層:Runtime 層提供了支持 Flink 計(jì)算的核心實(shí)現(xiàn),比如:支持分布式 Stream 處理呢岗、JobGraph 到 ExecutionGraph 的映射冕香、調(diào)度等等,為上層 API 層提供基礎(chǔ)服務(wù)后豫。API 層:API 層主要實(shí)現(xiàn)了面向流(Stream)處理和批(Batch)處理 API悉尾,其中面向流處理對(duì)應(yīng) DataStream API,面向批處理對(duì)應(yīng) DataSet API挫酿,后續(xù)版本构眯,F(xiàn)link 有計(jì)劃將 DataStream 和 DataSet API 進(jìn)行統(tǒng)一。Libraries 層:該層稱(chēng)為 Flink 應(yīng)用框架層早龟,根據(jù) API 層的劃分惫霸,在 API 層之上構(gòu)建的滿(mǎn)足特定應(yīng)用的實(shí)現(xiàn)計(jì)算框架,也分別對(duì)應(yīng)于面向流處理和面向批處理兩類(lèi)葱弟。面向流處理支持:CEP(復(fù)雜事件處理)壹店、基于 SQL-like 的操作(基于 Table 的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫(kù))翘悉、Gelly(圖處理)茫打。
四、Flink集群的角色
TaskManager妖混,JobManager老赤,Client 三種角色。其中 JobManager 扮演著集群中的管理者 Master 的角色制市,它是整個(gè)集群的協(xié)調(diào)者抬旺,負(fù)責(zé)接收 Flink Job,協(xié)調(diào)檢查點(diǎn)祥楣,F(xiàn)ailover 故障恢復(fù)等开财,同時(shí)管理 Flink 集群中從節(jié)點(diǎn) TaskManager。
TaskManager 是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的 Worker误褪,在其上執(zhí)行 Flink Job 的一組 Task责鳍,每個(gè) TaskManager 負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息,如內(nèi)存兽间、磁盤(pán)历葛、網(wǎng)絡(luò),在啟動(dòng)的時(shí)候?qū)①Y源的狀態(tài)向 JobManager 匯報(bào)嘀略。
Client 是 Flink 程序提交的客戶(hù)端恤溶,當(dāng)用戶(hù)提交一個(gè) Flink 程序時(shí)乓诽,會(huì)首先創(chuàng)建一個(gè) Client,該 Client 首先會(huì)對(duì)用戶(hù)提交的 Flink 程序進(jìn)行預(yù)處理咒程,并提交到 Flink 集群中處理鸠天,所以 Client 需要從用戶(hù)提交的 Flink 程序配置中獲取 JobManager 的地址,并建立到 JobManager 的連接帐姻,將 Flink Job 提交給 JobManager稠集。
五、Task Slot 進(jìn)行資源管理
TaskManager 是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的 Worker卖宠,TaskManager 是一個(gè) JVM 進(jìn)程巍杈,并會(huì)以獨(dú)立的線(xiàn)程來(lái)執(zhí)行一個(gè) task 或多個(gè) subtask。為了控制一個(gè) TaskManager 能接受多少個(gè) task扛伍,F(xiàn)link 提出了 Task Slot 的概念筷畦。
簡(jiǎn)單的說(shuō),TaskManager 會(huì)將自己節(jié)點(diǎn)上管理的資源分為不同的 Slot:固定大小的資源子集刺洒。這樣就避免了不同 Job 的 Task 互相競(jìng)爭(zhēng)內(nèi)存資源鳖宾,但是需要主要的是,Slot 只會(huì)做內(nèi)存的隔離逆航。沒(méi)有做 CPU 的隔離鼎文。
Flink 分區(qū)策略
GlobalPartitioner數(shù)據(jù)會(huì)被分發(fā)到下游算子的第一個(gè)實(shí)例中進(jìn)行處理。
ShufflePartitioner數(shù)據(jù)會(huì)被隨機(jī)分發(fā)到下游算子的每一個(gè)實(shí)例中進(jìn)行處理因俐。
RebalancePartitioner數(shù)據(jù)會(huì)被循環(huán)發(fā)送到下游的每一個(gè)實(shí)例中進(jìn)行處理拇惋。
RescalePartitioner這種分區(qū)器會(huì)根據(jù)上下游算子的并行度,循環(huán)的方式輸出到下游算子的每個(gè)實(shí)例抹剩。這里有點(diǎn)難以理解撑帖,假設(shè)上游并行度為 2,編號(hào)為 A 和 B澳眷。下游并行度為 4胡嘿,編號(hào)為 1,2钳踊,3衷敌,4。那么 A 則把數(shù)據(jù)循環(huán)發(fā)送給 1 和 2拓瞪,B 則把數(shù)據(jù)循環(huán)發(fā)送給 3 和 4缴罗。假設(shè)上游并行度為 4,編號(hào)為 A祭埂,B瞒爬,C,D。下游并行度為 2侧但,編號(hào)為 1,2航罗。那么 A 和 B 則把數(shù)據(jù)發(fā)送給 1禀横,C 和 D 則把數(shù)據(jù)發(fā)送給 2。
BroadcastPartitioner廣播分區(qū)會(huì)將上游數(shù)據(jù)輸出到下游算子的每個(gè)實(shí)例中粥血。適合于大數(shù)據(jù)集和小數(shù)據(jù)集做 Jion 的場(chǎng)景柏锄。
ForwardPartitionerForwardPartitioner 用于將記錄輸出到下游本地的算子實(shí)例。它要求上下游算子并行度一樣复亏。簡(jiǎn)單的說(shuō)趾娃,F(xiàn)orwardPartitioner 用來(lái)做數(shù)據(jù)的控制臺(tái)打印。
KeyGroupStreamPartitionerHash 分區(qū)器缔御。會(huì)將數(shù)據(jù)按 Key 的 Hash 值輸出到下游算子實(shí)例中抬闷。
CustomPartitionerWrapper用戶(hù)自定義分區(qū)器。需要用戶(hù)自己實(shí)現(xiàn) Partitioner
接口耕突,來(lái)定義自己的分區(qū)邏輯
Flink 并行度
我們?cè)趯?shí)際生產(chǎn)環(huán)境中可以從四個(gè)不同層面設(shè)置并行度:
操作算子層面(Operator Level)
.map(new RollingAdditionMapper()).setParallelism(10) 將操作算子設(shè)置并行度
執(zhí)行環(huán)境層面(Execution Environment Level)
$FLINK_HOME/bin/flink 的-p參數(shù)修改并行度
客戶(hù)端層面(Client Level)
env.setParallelism(10)
系統(tǒng)層面(System Level)
全局配置在flink-conf.yaml文件中笤成,parallelism.default,默認(rèn)是1:可以設(shè)置默認(rèn)值大一點(diǎn)
需要注意的優(yōu)先級(jí):算子層面>環(huán)境層面>客戶(hù)端層面>系統(tǒng)層面眷茁。
每個(gè)算子的一個(gè)并行度實(shí)例就是一個(gè)subtask-在這里為了區(qū)分暫時(shí)叫做substask炕泳。那么,帶來(lái)很多問(wèn)題上祈,由于flink的taskmanager運(yùn)行task的時(shí)候是每個(gè)task采用一個(gè)單獨(dú)的線(xiàn)程培遵,這就會(huì)帶來(lái)很多線(xiàn)程切換開(kāi)銷(xiāo),進(jìn)而影響吞吐量登刺。為了減輕這種情況籽腕,flink進(jìn)行了優(yōu)化,也即對(duì)subtask進(jìn)行鏈?zhǔn)讲僮魈猎遥準(zhǔn)讲僮鹘Y(jié)束之后得到的task节仿,再作為一個(gè)調(diào)度執(zhí)行單元,放到一個(gè)線(xiàn)程里執(zhí)行掉蔬。
這樣做的好處主要有以下幾點(diǎn):
1.Flink 集群所需的taskslots數(shù)與job中最高的并行度一致廊宪。也就是說(shuō)我們不需要再去計(jì)算一個(gè)程序總共會(huì)起多少個(gè)task了。
2.更容易獲得更充分的資源利用女轿。如果沒(méi)有slot共享箭启,那么非密集型操作source/flatmap就會(huì)占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享蛉迹,將基線(xiàn)的2個(gè)并行度增加到6個(gè)傅寡,能充分利用slot資源,同時(shí)保證每個(gè)TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就會(huì)均分到申請(qǐng)的所有slot里荐操,這樣slot的負(fù)載就均衡了芜抒。
parallelism 是指 taskmanager 實(shí)際使用的并發(fā)能力。
slot 是指 taskmanager 的并發(fā)執(zhí)行能力.
Flink 有沒(méi)有重啟策略托启?
Flink 實(shí)現(xiàn)了多種重啟策略宅倒。
固定延遲重啟策略(Fixed Delay Restart Strategy)
故障率重啟策略(Failure Rate Restart Strategy)
沒(méi)有重啟策略(No Restart Strategy)
Fallback 重啟策略(Fallback Restart Strategy)
Flink 中分布式緩存
Flink 實(shí)現(xiàn)的分布式緩存和 Hadoop 有異曲同工之妙。目的是在本地讀取文件屯耸,并把他放在 taskmanager 節(jié)點(diǎn)中拐迁,防止 task 重復(fù)拉取。
val env = ExecutionEnvironment.getExecutionEnvironment// register a file from HDFSenv.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")// register a local executable file (script, executable, ...)env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)// define your program and execute...val input: DataSet[String] = ...val result: DataSet[Integer] = input.map(new MyMapper())...env.execute()
Flink 中廣播變量
Flink 是并行的疗绣,計(jì)算過(guò)程可能不在一個(gè) Slot 中進(jìn)行线召,那么有一種情況即:當(dāng)我們需要訪問(wèn)同一份數(shù)據(jù)。那么 Flink 中的廣播變量就是為了解決這種情況多矮。
我們可以把廣播變量理解為是一個(gè)公共的共享變量缓淹,我們可以把一個(gè) dataset 數(shù)據(jù)集廣播出去,然后不同的 task 在節(jié)點(diǎn)上都能夠獲取到工窍,這個(gè)數(shù)據(jù)在每個(gè)節(jié)點(diǎn)上只會(huì)存在一份割卖。
Flink中的窗口
Flink 支持兩種劃分窗口的方式,按照 time 和 count患雏。如果根據(jù)時(shí)間劃分窗口鹏溯,那么它就是一個(gè) time-window 如果根據(jù)數(shù)據(jù)劃分窗口,那么它就是一個(gè) count-window淹仑。
flink 支持窗口的兩個(gè)重要屬性(size 和 interval)
如果 size=interval,那么就會(huì)形成 tumbling-window(無(wú)重疊數(shù)據(jù))如果 size>interval,那么就會(huì)形成 sliding-window(有重疊數(shù)據(jù))
通過(guò)組合可以得出四種基本窗口:
time-tumbling-window 無(wú)重疊數(shù)據(jù)的時(shí)間窗口丙挽,設(shè)置方式舉例:timeWindow(Time.seconds(5))
time-sliding-window 有重疊數(shù)據(jù)的時(shí)間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window 無(wú)重疊數(shù)據(jù)的數(shù)量窗口匀借,設(shè)置方式舉例:countWindow(5)
count-sliding-window 有重疊數(shù)據(jù)的數(shù)量窗口颜阐,設(shè)置方式舉例:countWindow(5,3)
Flink中的狀態(tài)存儲(chǔ)
Flink 在做計(jì)算的過(guò)程中經(jīng)常需要存儲(chǔ)中間狀態(tài),來(lái)避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)吓肋。選擇的狀態(tài)存儲(chǔ)策略不同凳怨,會(huì)影響狀態(tài)持久化如何和 checkpoint 交互。
Flink 提供了三種狀態(tài)存儲(chǔ)方式:MemoryStateBackend是鬼、FsStateBackend肤舞、RocksDBStateBackend。
Flink中的時(shí)間窗口
Flink 中的時(shí)間和其他流式計(jì)算系統(tǒng)的時(shí)間一樣分為三類(lèi):事件時(shí)間均蜜,攝入時(shí)間李剖,處理時(shí)間三種。
如果以 EventTime 為基準(zhǔn)來(lái)定義時(shí)間窗口將形成 EventTimeWindow,要求消息本身就應(yīng)該攜帶 EventTime囤耳。如果以 IngesingtTime 為基準(zhǔn)來(lái)定義時(shí)間窗口將形成 IngestingTimeWindow,以 source 的 systemTime 為準(zhǔn)篙顺。如果以 ProcessingTime 基準(zhǔn)來(lái)定義時(shí)間窗口將形成 ProcessingTimeWindow偶芍,以 operator 的 systemTime 為準(zhǔn)。
Flink 中水印
Watermark 是 Apache Flink 為了處理 EventTime 窗口計(jì)算提出的一種機(jī)制, 本質(zhì)上是一種時(shí)間戳德玫。一般來(lái)講 Watermark 經(jīng)常和 Window 一起被用來(lái)處理亂序事件匪蟀。
通過(guò)watermark機(jī)制來(lái)處理out-of-order的問(wèn)題,屬于第一層防護(hù)宰僧,屬于全局性的防護(hù)萄窜,通常說(shuō)的亂序問(wèn)題的解決辦法,就是指這類(lèi)撒桨;
通過(guò)窗口上的allowedLateness機(jī)制來(lái)處理out-of-order的問(wèn)題,屬于第二層防護(hù)键兜,屬于特定window operator的防護(hù)凤类,late element的問(wèn)題就是指這類(lèi)。
Flink Table & SQL ?
TableEnvironment 是 Table API 和 SQL 集成的核心概念普气。
這個(gè)類(lèi)主要用來(lái):
在內(nèi)部 catalog 中注冊(cè)表
注冊(cè)外部 catalog
執(zhí)行 SQL 查詢(xún)
注冊(cè)用戶(hù)定義(標(biāo)量谜疤,表或聚合)函數(shù)
將 DataStream 或 DataSet 轉(zhuǎn)換為表
持有對(duì) ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
一次完整的 SQL 解析過(guò)程如下:
用戶(hù)使用對(duì)外提供 Stream SQL 的語(yǔ)法開(kāi)發(fā)業(yè)務(wù)應(yīng)用
用 calcite 對(duì) StreamSQL 進(jìn)行語(yǔ)法檢驗(yàn),語(yǔ)法檢驗(yàn)通過(guò)后现诀,轉(zhuǎn)換成 calcite 的邏輯樹(shù)節(jié)點(diǎn)夷磕;最終形成 calcite 的邏輯計(jì)劃
采用 Flink 自定義的優(yōu)化規(guī)則和 calcite 火山模型、啟發(fā)式模型共同對(duì)邏輯樹(shù)進(jìn)行優(yōu)化仔沿,生成最優(yōu)的 Flink 物理計(jì)劃
對(duì)物理計(jì)劃采用 janino codegen 生成代碼坐桩,生成用低階 API DataStream 描述的流應(yīng)用,提交到 Flink 平臺(tái)執(zhí)行
Flink 是如何支持批流一體的封锉?
Flink 的開(kāi)發(fā)者認(rèn)為批處理是流處理的一種特殊情況绵跷。批處理是有限的流處理。Flink 使用一個(gè)引擎支持了 DataSet API 和 DataStream API成福。
Flink 中Task如何做到數(shù)據(jù)交換
在一個(gè) Flink Job 中碾局,數(shù)據(jù)需要在不同的 task 中進(jìn)行交換,整個(gè)數(shù)據(jù)交換是有 TaskManager 負(fù)責(zé)的奴艾,TaskManager 的網(wǎng)絡(luò)組件首先從緩沖 buffer 中收集 records净当,然后再發(fā)送。Records 并不是一個(gè)一個(gè)被發(fā)送的蕴潦,是積累一個(gè)批次再發(fā)送像啼,batch 技術(shù)可以更加高效的利用網(wǎng)絡(luò)資源。
Flink 是如何容錯(cuò)的
Flink 實(shí)現(xiàn)容錯(cuò)主要靠強(qiáng)大的 CheckPoint 機(jī)制和 State 機(jī)制品擎。Checkpoint 負(fù)責(zé)定時(shí)制作分布式快照埋合、對(duì)程序中的狀態(tài)進(jìn)行備份;State 用來(lái)存儲(chǔ)計(jì)算過(guò)程中的中間狀態(tài)萄传。
Flink 實(shí)現(xiàn)分布式快照
Flink 的分布式快照是根據(jù) Chandy-Lamport 算法量身定做的甚颂。簡(jiǎn)單來(lái)說(shuō)就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流及其狀態(tài)的一致快照蜜猾。
核心思想是在 input source 端插入 barrier,控制 barrier 的同步來(lái)實(shí)現(xiàn) snapshot 的備份和 exactly-once 語(yǔ)義振诬。
Flink 如何保證 exactly-Once 語(yǔ)義
Flink 通過(guò)實(shí)現(xiàn)兩階段提交和狀態(tài)保存來(lái)實(shí)現(xiàn)端到端的一致性語(yǔ)義蹭睡。分為以下幾個(gè)步驟:
開(kāi)始事務(wù)(beginTransaction)創(chuàng)建一個(gè)臨時(shí)文件夾,來(lái)寫(xiě)把數(shù)據(jù)寫(xiě)入到這個(gè)文件夾里面
預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫(xiě)入文件并關(guān)閉
正式提交(commit)將之前寫(xiě)完的臨時(shí)文件放入目標(biāo)目錄下赶么。這代表著最終的數(shù)據(jù)會(huì)有一些延遲
丟棄(abort)丟棄臨時(shí)文件
若失敗發(fā)生在預(yù)提交成功后肩豁,正式提交前”枭耄可以根據(jù)狀態(tài)來(lái)提交預(yù)提交的數(shù)據(jù)清钥,也可刪除預(yù)提交的數(shù)據(jù)。
Flimk 如何做內(nèi)存管理
Flink 并不是將大量對(duì)象存在堆上放闺,而是將對(duì)象都序列化到一個(gè)預(yù)分配的內(nèi)存塊上祟昭。此外,F(xiàn)link 大量的使用了堆外內(nèi)存怖侦。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制篡悟,則會(huì)將部分?jǐn)?shù)據(jù)存儲(chǔ)到硬盤(pán)上。Flink 為了直接操作二進(jìn)制數(shù)據(jù)實(shí)現(xiàn)了自己的序列化框架匾寝。
理論上 Flink 的內(nèi)存管理分為三部分:
Network Buffers:這個(gè)是在 TaskManager 啟動(dòng)的時(shí)候分配的搬葬,這是一組用于緩存網(wǎng)絡(luò)數(shù)據(jù)的內(nèi)存,每個(gè)塊是 32K艳悔,默認(rèn)分配 2048 個(gè)急凰,可以通過(guò)“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的 Memory Segment 塊,用于運(yùn)行時(shí)的算法(Sort/Join/Shuffle 等)很钓,這部分啟動(dòng)的時(shí)候就會(huì)分配香府。下面這段代碼,根據(jù)配置文件中的各種參數(shù)來(lái)計(jì)算內(nèi)存的分配方法码倦。(heap or off-heap企孩,這個(gè)放到下節(jié)談),內(nèi)存的分配支持預(yù)分配和 lazy load袁稽,默認(rèn)懶加載的方式勿璃。
User Code,這部分是除了 Memory Manager 之外的內(nèi)存用于 User code 和 TaskManager 本身的數(shù)據(jù)結(jié)構(gòu)推汽。
Flink 的序列化
link 摒棄了 Java 原生的序列化方法补疑,以獨(dú)特的方式處理數(shù)據(jù)類(lèi)型和序列化,包含自己的類(lèi)型描述符歹撒,泛型類(lèi)型提取和類(lèi)型序列化框架莲组。
TypeInformation 是所有類(lèi)型描述符的基類(lèi)。它揭示了該類(lèi)型的一些基本屬性暖夭,并且可以生成序列化器锹杈。TypeInformation 支持以下幾種類(lèi)型:
BasicTypeInfo: 任意 Java 基本類(lèi)型或 String 類(lèi)型
BasicArrayTypeInfo: 任意 Java 基本類(lèi)型數(shù)組或 String 數(shù)組
WritableTypeInfo: 任意 Hadoop Writable 接口的實(shí)現(xiàn)類(lèi)
TupleTypeInfo: 任意的 Flink Tuple 類(lèi)型(支持 Tuple1 to Tuple25)撵孤。Flink tuples 是固定長(zhǎng)度固定類(lèi)型的 Java Tuple 實(shí)現(xiàn)
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
PojoTypeInfo: 任意的 POJO (Java or Scala),例如竭望,Java 對(duì)象的所有成員變量邪码,要么是 public 修飾符定義,要么有 getter/setter 方法
GenericTypeInfo: 任意無(wú)法匹配之前幾種類(lèi)型的類(lèi)
Flink中window 出現(xiàn)數(shù)據(jù)傾斜怎么解決咬清?
window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過(guò)多闭专。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過(guò)兩種方式來(lái)解決:
在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合
重新設(shè)計(jì)窗口聚合的 key
Flink 任務(wù)延時(shí)高旧烧,如何入手影钉?
在 Flink 的后臺(tái)任務(wù)管理中,我們可以看到 Flink 的哪個(gè)算子和 task 出現(xiàn)了反壓掘剪。最主要的手段是資源調(diào)優(yōu)和算子調(diào)優(yōu)斧拍。資源調(diào)優(yōu)即是對(duì)作業(yè)中的 Operator 的并發(fā)數(shù)(parallelism)、CPU(core)杖小、堆內(nèi)存(heap_memory)等參數(shù)進(jìn)行調(diào)優(yōu)。作業(yè)參數(shù)調(diào)優(yōu)包括:并行度的設(shè)置愚墓,State 的設(shè)置予权,checkpoint 的設(shè)置。
Flink 反壓
Flink 內(nèi)部是基于 producer-consumer 模型來(lái)進(jìn)行消息傳遞的浪册,F(xiàn)link 的反壓設(shè)計(jì)也是基于這個(gè)模型扫腺。Flink 使用了高效有界的分布式阻塞隊(duì)列,就像 Java 通用的阻塞隊(duì)列(BlockingQueue)一樣村象。下游消費(fèi)者消費(fèi)變慢笆环,上游就會(huì)受到阻塞。
Operator Chains(算子鏈)這個(gè)概念你了解嗎
為了更高效地分布式執(zhí)行厚者,F(xiàn)link 會(huì)盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task躁劣。每個(gè) task 在一個(gè)線(xiàn)程中執(zhí)行。將 operators 鏈接成 task 是非常有效的優(yōu)化:它能減少線(xiàn)程之間的切換库菲,減少消息的序列化/反序列化账忘,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量熙宇。這就是我們所說(shuō)的算子鏈鳖擒。
Flink 什么情況下才會(huì)把 Operator chain 在一起形成算子鏈?
兩個(gè) operator chain 在一起的的條件:
上下游的并行度一致
下游節(jié)點(diǎn)的入度為 1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接烫止,map蒋荚、flatmap、filter 等默認(rèn)是 ALWAYS)
上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接馆蠕,不能與上游鏈接期升,Source 默認(rèn)是 HEAD)
兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
用戶(hù)沒(méi)有禁用 chain
Flink Job 的提交流程
用戶(hù)提交的 Flink Job 會(huì)被轉(zhuǎn)化成一個(gè) DAG 任務(wù)運(yùn)行惊奇,分別是:StreamGraph、JobGraph吓妆、ExecutionGraph赊时,F(xiàn)link 中 JobManager 與 TaskManager,JobManager 與 Client 的交互是基于 Akka 工具包的行拢,是通過(guò)消息驅(qū)動(dòng)祖秒。整個(gè) Flink Job 的提交還包含著 ActorSystem 的創(chuàng)建,JobManager 的啟動(dòng)舟奠,TaskManager 的啟動(dòng)和注冊(cè)竭缝。
Flink "三層圖" 結(jié)構(gòu)
一個(gè) Flink 任務(wù)的 DAG 生成計(jì)算圖大致經(jīng)歷以下三個(gè)過(guò)程:
StreamGraph最接近代碼所表達(dá)的邏輯層面的計(jì)算拓?fù)浣Y(jié)構(gòu),按照用戶(hù)代碼的執(zhí)行順序向 StreamExecutionEnvironment 添加 StreamTransformation 構(gòu)成流式圖沼瘫。
JobGraph從 StreamGraph 生成抬纸,將可以串聯(lián)合并的節(jié)點(diǎn)進(jìn)行合并,設(shè)置節(jié)點(diǎn)之間的邊耿戚,安排資源共享 slot 槽位和放置相關(guān)聯(lián)的節(jié)點(diǎn)湿故,上傳任務(wù)所需的文件,設(shè)置檢查點(diǎn)配置等膜蛔。相當(dāng)于經(jīng)過(guò)部分初始化和優(yōu)化處理的任務(wù)圖坛猪。
ExecutionGraph由 JobGraph 轉(zhuǎn)換而來(lái),包含了任務(wù)具體執(zhí)行所需的內(nèi)容皂股,是最貼近底層實(shí)現(xiàn)的執(zhí)行圖墅茉。
JobManager 在集群中扮演角色
JobManager 負(fù)責(zé)整個(gè) Flink 集群任務(wù)的調(diào)度以及資源的管理,從客戶(hù)端中獲取提交的應(yīng)用呜呐,然后根據(jù)集群中 TaskManager 上 TaskSlot 的使用情況就斤,為提交的應(yīng)用分配相應(yīng)的 TaskSlot 資源并命令 TaskManager 啟動(dòng)從客戶(hù)端中獲取的應(yīng)用。
JobManager 相當(dāng)于整個(gè)集群的 Master 節(jié)點(diǎn)蘑辑,且整個(gè)集群有且只有一個(gè)活躍的 JobManager 洋机,負(fù)責(zé)整個(gè)集群的任務(wù)管理和資源管理。
JobManager 和 TaskManager 之間通過(guò) Actor System 進(jìn)行通信洋魂,獲取任務(wù)執(zhí)行的情況并通過(guò) Actor System 將應(yīng)用的任務(wù)執(zhí)行情況發(fā)送給客戶(hù)端槐秧。
同時(shí)在任務(wù)執(zhí)行的過(guò)程中,F(xiàn)link JobManager 會(huì)觸發(fā) Checkpoint 操作忧设,每個(gè) TaskManager 節(jié)點(diǎn) 收到 Checkpoint 觸發(fā)指令后刁标,完成 Checkpoint 操作,所有的 Checkpoint 協(xié)調(diào)過(guò)程都是在 Fink JobManager 中完成址晕。
當(dāng)任務(wù)完成后膀懈,F(xiàn)link 會(huì)將任務(wù)執(zhí)行的信息反饋給客戶(hù)端,并且釋放掉 TaskManager 中的資源以供下一次提交任務(wù)使用谨垃。
JobManager 在集群中起什么作用启搂?
JobManager 的職責(zé)主要是接收 Flink 作業(yè)硼控,調(diào)度 Task,收集作業(yè)狀態(tài)和管理 TaskManager胳赌。它包含一個(gè) Actor牢撼,并且做如下操作:
RegisterTaskManager: 它由想要注冊(cè)到 JobManager 的 TaskManager 發(fā)送。注冊(cè)成功會(huì)通過(guò) AcknowledgeRegistration 消息進(jìn)行 Ack疑苫。
SubmitJob: 由提交作業(yè)到系統(tǒng)的 Client 發(fā)送熏版。提交的信息是 JobGraph 形式的作業(yè)描述信息。
CancelJob: 請(qǐng)求取消指定 id 的作業(yè)捍掺。成功會(huì)返回 CancellationSuccess撼短,否則返回 CancellationFailure。
UpdateTaskExecutionState: 由 TaskManager 發(fā)送挺勿,用來(lái)更新執(zhí)行節(jié)點(diǎn)(ExecutionVertex)的狀態(tài)曲横。成功則返回 true,否則返回 false不瓶。
RequestNextInputSplit: TaskManager 上的 Task 請(qǐng)求下一個(gè)輸入 split禾嫉,成功則返回 NextInputSplit,否則返回 null蚊丐。
JobStatusChanged: 它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化夭织。這個(gè)消息由 ExecutionGraph 發(fā)送。
TaskManager 在集群中扮演的角色
TaskManager 相當(dāng)于整個(gè)集群的 Slave 節(jié)點(diǎn)吠撮,負(fù)責(zé)具體的任務(wù)執(zhí)行和對(duì)應(yīng)任務(wù)在每個(gè)節(jié)點(diǎn)上的資源申請(qǐng)和管理。
客戶(hù)端通過(guò)將編寫(xiě)好的 Flink 應(yīng)用編譯打包讲竿,提交到 JobManager泥兰,然后 JobManager 會(huì)根據(jù)已注冊(cè)在 JobManager 中 TaskManager 的資源情況,將任務(wù)分配給有資源的 TaskManager 節(jié)點(diǎn)题禀,然后啟動(dòng)并運(yùn)行任務(wù)鞋诗。
TaskManager 從 JobManager 接收需要部署的任務(wù),然后使用 Slot 資源啟動(dòng) Task迈嘹,建立數(shù)據(jù)接入的網(wǎng)絡(luò)連接削彬,接收數(shù)據(jù)并開(kāi)始數(shù)據(jù)處理。同時(shí) TaskManager 之間的數(shù)據(jù)交互都是通過(guò)數(shù)據(jù)流的方式進(jìn)行的秀仲。
可以看出融痛,F(xiàn)link 的任務(wù)運(yùn)行其實(shí)是采用多線(xiàn)程的方式,這和 MapReduce 多 JVM 進(jìn)行的方式有很大的區(qū)別神僵,F(xiàn)link 能夠極大提高 CPU 使用效率雁刷,在多個(gè)任務(wù)和 Task 之間通過(guò) TaskSlot 方式共享系統(tǒng)資源,每個(gè) TaskManager 中通過(guò)管理多個(gè) TaskSlot 資源池進(jìn)行對(duì)資源進(jìn)行有效管理保礼。
TaskManager 在集群?jiǎn)?dòng)過(guò)程中起到什么作用沛励?
TaskManager 的啟動(dòng)流程較為簡(jiǎn)單:?jiǎn)?dòng)類(lèi):org.apache.flink.runtime.taskmanager.TaskManager核心啟動(dòng)方法 : selectNetworkInterfaceAndRunTaskManager啟動(dòng)后直接向 JobManager 注冊(cè)自己责语,注冊(cè)完成后,進(jìn)行部分模塊的初始化
Flink 計(jì)算資源調(diào)度如何實(shí)現(xiàn)目派?
TaskManager 中最細(xì)粒度的資源是 Task slot坤候,代表了一個(gè)固定大小的資源子集,每個(gè) TaskManager 會(huì)將其所占有的資源平分給它的 slot企蹭。
通過(guò)調(diào)整 task slot 的數(shù)量白筹,用戶(hù)可以定義 task 之間是如何相互隔離的。每個(gè) TaskManager 有一個(gè) slot练对,也就意味著每個(gè) task 運(yùn)行在獨(dú)立的 JVM 中遍蟋。每個(gè) TaskManager 有多個(gè) slot 的話(huà),也就是說(shuō)多個(gè) task 運(yùn)行在同一個(gè) JVM 中螟凭。
而在同一個(gè) JVM 進(jìn)程中的 task虚青,可以共享 TCP 連接(基于多路復(fù)用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸螺男,也能共享一些數(shù)據(jù)結(jié)構(gòu)棒厘,一定程度上減少了每個(gè) task 的消耗。 每個(gè) slot 可以接受單個(gè) task下隧,也可以接受多個(gè)連續(xù) task 組成的 pipeline奢人,如下圖所示摸航,F(xiàn)latMap 函數(shù)占用一個(gè) taskslot亚隙,而 key Agg 函數(shù)和 sink 函數(shù)共用一個(gè) taskslot
Flink 的數(shù)據(jù)抽象及數(shù)據(jù)交換過(guò)程?
Flink 為了避免 JVM 的固有缺陷例如 java 對(duì)象存儲(chǔ)密度低玲销,F(xiàn)GC 影響吞吐和響應(yīng)等土辩,實(shí)現(xiàn)了自主管理內(nèi)存支救。MemorySegment 就是 Flink 的內(nèi)存抽象。默認(rèn)情況下拷淘,一個(gè) MemorySegment 可以被看做是一個(gè) 32kb 大的內(nèi)存塊的抽象各墨。這塊內(nèi)存既可以是 JVM 里的一個(gè) byte[],也可以是堆外內(nèi)存(DirectByteBuffer)启涯。
在 MemorySegment 這個(gè)抽象之上贬堵,F(xiàn)link 在數(shù)據(jù)從 operator 內(nèi)的數(shù)據(jù)對(duì)象在向 TaskManager 上轉(zhuǎn)移,預(yù)備被發(fā)給下個(gè)節(jié)點(diǎn)的過(guò)程中结洼,使用的抽象或者說(shuō)內(nèi)存對(duì)象是 Buffer黎做。
對(duì)接從 Java 對(duì)象轉(zhuǎn)為 Buffer 的中間對(duì)象是另一個(gè)抽象 StreamRecord。
Flink 中分布式快照機(jī)制如何實(shí)現(xiàn)松忍?
Flink 的容錯(cuò)機(jī)制的核心部分是制作分布式數(shù)據(jù)流和操作算子狀態(tài)的一致性快照引几。 這些快照充當(dāng)一致性 checkpoint,系統(tǒng)可以在發(fā)生故障時(shí)回滾。 Flink 用于制作這些快照的機(jī)制在“分布式數(shù)據(jù)流的輕量級(jí)異步快照”中進(jìn)行了描述伟桅。 它受到分布式快照的標(biāo)準(zhǔn) Chandy-Lamport 算法的啟發(fā)敞掘,專(zhuān)門(mén)針對(duì) Flink 的執(zhí)行模型而定制。
barriers 在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中楣铁【裂悖快照 n 的 barriers 被插入的位置(我們稱(chēng)之為 Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置。例如盖腕,在 Apache Kafka 中赫冬,此位置將是分區(qū)中最后一條記錄的偏移量。 將該位置 Sn 報(bào)告給 checkpoint 協(xié)調(diào)器(Flink 的 JobManager)溃列。
然后 barriers 向下游流動(dòng)劲厌。當(dāng)一個(gè)中間操作算子從其所有輸入流中收到快照 n 的 barriers 時(shí),它會(huì)為快照 n 發(fā)出 barriers 進(jìn)入其所有輸出流中听隐。 一旦 sink 操作算子(流式 DAG 的末端)從其所有輸入流接收到 barriers n补鼻,它就向 checkpoint 協(xié)調(diào)器確認(rèn)快照 n 完成。在所有 sink 確認(rèn)快照后雅任,意味快照著已完成风范。
一旦完成快照 n,job 將永遠(yuǎn)不再向數(shù)據(jù)源請(qǐng)求 Sn 之前的記錄沪么,因?yàn)榇藭r(shí)這些記錄(及其后續(xù)記錄)將已經(jīng)通過(guò)整個(gè)數(shù)據(jù)流拓?fù)渑鹦觯布词且呀?jīng)被處理結(jié)束。
FlinkSQL 的是如何實(shí)現(xiàn)的禽车?
構(gòu)建抽象語(yǔ)法樹(shù)的事情交給了 Calcite 去做寇漫。SQL query 會(huì)經(jīng)過(guò) Calcite 解析器轉(zhuǎn)變成 SQL 節(jié)點(diǎn)樹(shù),通過(guò)驗(yàn)證后構(gòu)建成 Calcite 的抽象語(yǔ)法樹(shù)(也就是圖中的 Logical Plan)殉摔。另一邊州胳,Table API 上的調(diào)用會(huì)構(gòu)建成 Table API 的抽象語(yǔ)法樹(shù),并通過(guò) Calcite 提供的 RelBuilder 轉(zhuǎn)變成 Calcite 的抽象語(yǔ)法樹(shù)钦勘。然后依次被轉(zhuǎn)換成邏輯執(zhí)行計(jì)劃和物理執(zhí)行計(jì)劃。
在提交任務(wù)后會(huì)分發(fā)到各個(gè) TaskManager 中運(yùn)行亚亲,在運(yùn)行時(shí)會(huì)使用 Janino 編譯器編譯代碼后運(yùn)行彻采。