基礎(chǔ)概念考察
一柜思、 簡單介紹一下 Flink
Flink 是一個框架和分布式處理引擎稠诲,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。并且 Flink 提供了數(shù)據(jù)分布铜跑、容錯機(jī)制以及資源管理等核心功能因妙。
Flink 的特性包括:
支持高吞吐痰憎、低延遲、高性能的流處理支持帶有事件時間的窗口 (Window) 操作支持有狀態(tài)計算的 Exactly-once 語義支持高度靈活的窗口 (Window) 操作攀涵,支持基于 time信殊、count、session 以及 data-driven 的窗口操作支持具有 Backpressure 功能的持續(xù)流模型支持基于輕量級分布式快照(Snapshot)實現(xiàn)的容錯一個運行時同時支持 Batch on Streaming 處理和 Streaming 處理Flink 在 JVM 內(nèi)部實現(xiàn)了自己的內(nèi)存管理支持迭代計算支持程序自動優(yōu)化:避免特定情況下 Shuffle汁果、排序等昂貴操作,中間結(jié)果有必要進(jìn)行緩存
二玲躯、Flink 相比傳統(tǒng)的 Spark Streaming 有什么區(qū)別?
- 架構(gòu)模型
Spark Streaming 在運行時的主要角色包括:Master据德、Worker、Driver跷车、Executor棘利,F(xiàn)link 在運行時主要包含:Jobmanager、Taskmanager 和 Slot朽缴。
- 任務(wù)調(diào)度
Spark Streaming 連續(xù)不斷的生成微小的數(shù)據(jù)批次善玫,構(gòu)建有向無環(huán)圖 DAG,Spark Streaming 會依次創(chuàng)建 DStreamGraph密强、JobGenerator茅郎、JobScheduler。
Flink 根據(jù)用戶提交的代碼生成 StreamGraph或渤,經(jīng)過優(yōu)化生成 JobGraph系冗,然后提交給 JobManager 進(jìn)行處理,JobManager 會根據(jù) JobGraph 生成 ExecutionGraph薪鹦,ExecutionGraph 是 Flink 調(diào)度最核心的數(shù)據(jù)結(jié)構(gòu)掌敬,JobManager 根據(jù) ExecutionGraph 對 Job 進(jìn)行調(diào)度惯豆。
- 時間機(jī)制
Spark Streaming 支持的時間機(jī)制有限,只支持處理時間奔害。Flink 支持了流處理程序在時間上的三個定義:處理時間楷兽、事件時間、注入時間华临。同時也支持 watermark 機(jī)制來處理滯后數(shù)據(jù)芯杀。
- 容錯機(jī)制
對于 Spark Streaming 任務(wù),我們可以設(shè)置 checkpoint银舱,然后假如發(fā)生故障并重啟瘪匿,我們可以從上次 checkpoint 之處恢復(fù),但是這個行為只能使得數(shù)據(jù)不丟失寻馏,可能會重復(fù)處理棋弥,不能做到恰一次處理語義。
Flink 則使用兩階段提交協(xié)議來解決這個問題诚欠。
三顽染、Flink 組件棧
自下而上,每一層分別代表:Deploy 層:該層主要涉及了 Flink 的部署模式轰绵,在上圖中我們可以看出粉寞,F(xiàn)link 支持包括 local、Standalone左腔、Cluster唧垦、Cloud 等多種部署模式。Runtime 層:Runtime 層提供了支持 Flink 計算的核心實現(xiàn)液样,比如:支持分布式 Stream 處理振亮、JobGraph 到 ExecutionGraph 的映射、調(diào)度等等鞭莽,為上層 API 層提供基礎(chǔ)服務(wù)坊秸。API 層:API 層主要實現(xiàn)了面向流(Stream)處理和批(Batch)處理 API,其中面向流處理對應(yīng) DataStream API澎怒,面向批處理對應(yīng) DataSet API褒搔,后續(xù)版本,F(xiàn)link 有計劃將 DataStream 和 DataSet API 進(jìn)行統(tǒng)一喷面。Libraries 層:該層稱為 Flink 應(yīng)用框架層星瘾,根據(jù) API 層的劃分,在 API 層之上構(gòu)建的滿足特定應(yīng)用的實現(xiàn)計算框架惧辈,也分別對應(yīng)于面向流處理和面向批處理兩類死相。面向流處理支持:CEP(復(fù)雜事件處理)、基于 SQL-like 的操作(基于 Table 的關(guān)系操作)咬像;面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)算撮、Gelly(圖處理)生宛。
四、Flink集群的角色
TaskManager肮柜,JobManager陷舅,Client 三種角色。其中 JobManager 扮演著集群中的管理者 Master 的角色审洞,它是整個集群的協(xié)調(diào)者莱睁,負(fù)責(zé)接收 Flink Job,協(xié)調(diào)檢查點芒澜,F(xiàn)ailover 故障恢復(fù)等仰剿,同時管理 Flink 集群中從節(jié)點 TaskManager。
TaskManager 是實際負(fù)責(zé)執(zhí)行計算的 Worker痴晦,在其上執(zhí)行 Flink Job 的一組 Task南吮,每個 TaskManager 負(fù)責(zé)管理其所在節(jié)點上的資源信息,如內(nèi)存誊酌、磁盤部凑、網(wǎng)絡(luò),在啟動的時候?qū)①Y源的狀態(tài)向 JobManager 匯報碧浊。
Client 是 Flink 程序提交的客戶端涂邀,當(dāng)用戶提交一個 Flink 程序時,會首先創(chuàng)建一個 Client箱锐,該 Client 首先會對用戶提交的 Flink 程序進(jìn)行預(yù)處理比勉,并提交到 Flink 集群中處理,所以 Client 需要從用戶提交的 Flink 程序配置中獲取 JobManager 的地址驹止,并建立到 JobManager 的連接敷搪,將 Flink Job 提交給 JobManager。
五幢哨、Task Slot 進(jìn)行資源管理
TaskManager 是實際負(fù)責(zé)執(zhí)行計算的 Worker,TaskManager 是一個 JVM 進(jìn)程嫂便,并會以獨立的線程來執(zhí)行一個 task 或多個 subtask捞镰。為了控制一個 TaskManager 能接受多少個 task,F(xiàn)link 提出了 Task Slot 的概念毙替。
簡單的說岸售,TaskManager 會將自己節(jié)點上管理的資源分為不同的 Slot:固定大小的資源子集。這樣就避免了不同 Job 的 Task 互相競爭內(nèi)存資源厂画,但是需要主要的是凸丸,Slot 只會做內(nèi)存的隔離。沒有做 CPU 的隔離袱院。
Flink 分區(qū)策略
GlobalPartitioner數(shù)據(jù)會被分發(fā)到下游算子的第一個實例中進(jìn)行處理屎慢。
ShufflePartitioner數(shù)據(jù)會被隨機(jī)分發(fā)到下游算子的每一個實例中進(jìn)行處理瞭稼。
RebalancePartitioner數(shù)據(jù)會被循環(huán)發(fā)送到下游的每一個實例中進(jìn)行處理。
RescalePartitioner這種分區(qū)器會根據(jù)上下游算子的并行度腻惠,循環(huán)的方式輸出到下游算子的每個實例环肘。這里有點難以理解,假設(shè)上游并行度為 2集灌,編號為 A 和 B悔雹。下游并行度為 4,編號為 1欣喧,2腌零,3,4唆阿。那么 A 則把數(shù)據(jù)循環(huán)發(fā)送給 1 和 2益涧,B 則把數(shù)據(jù)循環(huán)發(fā)送給 3 和 4。假設(shè)上游并行度為 4酷鸦,編號為 A饰躲,B,C臼隔,D嘹裂。下游并行度為 2,編號為 1摔握,2寄狼。那么 A 和 B 則把數(shù)據(jù)發(fā)送給 1,C 和 D 則把數(shù)據(jù)發(fā)送給 2氨淌。
BroadcastPartitioner廣播分區(qū)會將上游數(shù)據(jù)輸出到下游算子的每個實例中泊愧。適合于大數(shù)據(jù)集和小數(shù)據(jù)集做 Jion 的場景。
ForwardPartitionerForwardPartitioner 用于將記錄輸出到下游本地的算子實例盛正。它要求上下游算子并行度一樣删咱。簡單的說,F(xiàn)orwardPartitioner 用來做數(shù)據(jù)的控制臺打印豪筝。
KeyGroupStreamPartitionerHash 分區(qū)器痰滋。會將數(shù)據(jù)按 Key 的 Hash 值輸出到下游算子實例中。
CustomPartitionerWrapper用戶自定義分區(qū)器续崖。需要用戶自己實現(xiàn) Partitioner
接口敲街,來定義自己的分區(qū)邏輯
Flink 并行度
我們在實際生產(chǎn)環(huán)境中可以從四個不同層面設(shè)置并行度:
操作算子層面(Operator Level)
.map(new RollingAdditionMapper()).setParallelism(10) 將操作算子設(shè)置并行度
執(zhí)行環(huán)境層面(Execution Environment Level)
$FLINK_HOME/bin/flink 的-p參數(shù)修改并行度
客戶端層面(Client Level)
env.setParallelism(10)
系統(tǒng)層面(System Level)
全局配置在flink-conf.yaml文件中,parallelism.default严望,默認(rèn)是1:可以設(shè)置默認(rèn)值大一點
需要注意的優(yōu)先級:算子層面>環(huán)境層面>客戶端層面>系統(tǒng)層面多艇。
每個算子的一個并行度實例就是一個subtask-在這里為了區(qū)分暫時叫做substask。那么像吻,帶來很多問題峻黍,由于flink的taskmanager運行task的時候是每個task采用一個單獨的線程复隆,這就會帶來很多線程切換開銷舆声,進(jìn)而影響吞吐量熟空。為了減輕這種情況,flink進(jìn)行了優(yōu)化视哑,也即對subtask進(jìn)行鏈?zhǔn)讲僮髡竺妫準(zhǔn)讲僮鹘Y(jié)束之后得到的task轻局,再作為一個調(diào)度執(zhí)行單元,放到一個線程里執(zhí)行样刷。
這樣做的好處主要有以下幾點:
1.Flink 集群所需的taskslots數(shù)與job中最高的并行度一致仑扑。也就是說我們不需要再去計算一個程序總共會起多少個task了。
2.更容易獲得更充分的資源利用置鼻。如果沒有slot共享镇饮,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享箕母,將基線的2個并行度增加到6個储藐,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks嘶是,比如keyby/window/apply操作就會均分到申請的所有slot里钙勃,這樣slot的負(fù)載就均衡了。
parallelism 是指 taskmanager 實際使用的并發(fā)能力聂喇。
slot 是指 taskmanager 的并發(fā)執(zhí)行能力.
Flink 有沒有重啟策略辖源?
Flink 實現(xiàn)了多種重啟策略。
固定延遲重啟策略(Fixed Delay Restart Strategy)
故障率重啟策略(Failure Rate Restart Strategy)
沒有重啟策略(No Restart Strategy)
Fallback 重啟策略(Fallback Restart Strategy)
Flink 中分布式緩存
Flink 實現(xiàn)的分布式緩存和 Hadoop 有異曲同工之妙希太。目的是在本地讀取文件克饶,并把他放在 taskmanager 節(jié)點中,防止 task 重復(fù)拉取誊辉。
val env = ExecutionEnvironment.getExecutionEnvironment// register a file from HDFSenv.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")// register a local executable file (script, executable, ...)env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)// define your program and execute...val input: DataSet[String] = ...val result: DataSet[Integer] = input.map(new MyMapper())...env.execute()
Flink 中廣播變量
Flink 是并行的矾湃,計算過程可能不在一個 Slot 中進(jìn)行,那么有一種情況即:當(dāng)我們需要訪問同一份數(shù)據(jù)堕澄。那么 Flink 中的廣播變量就是為了解決這種情況邀跃。
我們可以把廣播變量理解為是一個公共的共享變量,我們可以把一個 dataset 數(shù)據(jù)集廣播出去奈偏,然后不同的 task 在節(jié)點上都能夠獲取到,這個數(shù)據(jù)在每個節(jié)點上只會存在一份躯护。
Flink中的窗口
Flink 支持兩種劃分窗口的方式惊来,按照 time 和 count。如果根據(jù)時間劃分窗口棺滞,那么它就是一個 time-window 如果根據(jù)數(shù)據(jù)劃分窗口裁蚁,那么它就是一個 count-window矢渊。
flink 支持窗口的兩個重要屬性(size 和 interval)
如果 size=interval,那么就會形成 tumbling-window(無重疊數(shù)據(jù))如果 size>interval,那么就會形成 sliding-window(有重疊數(shù)據(jù))
通過組合可以得出四種基本窗口:
time-tumbling-window 無重疊數(shù)據(jù)的時間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5))
time-sliding-window 有重疊數(shù)據(jù)的時間窗口枉证,設(shè)置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window 無重疊數(shù)據(jù)的數(shù)量窗口矮男,設(shè)置方式舉例:countWindow(5)
count-sliding-window 有重疊數(shù)據(jù)的數(shù)量窗口,設(shè)置方式舉例:countWindow(5,3)
Flink中的狀態(tài)存儲
Flink 在做計算的過程中經(jīng)常需要存儲中間狀態(tài)室谚,來避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)毡鉴。選擇的狀態(tài)存儲策略不同,會影響狀態(tài)持久化如何和 checkpoint 交互秒赤。
Flink 提供了三種狀態(tài)存儲方式:MemoryStateBackend猪瞬、FsStateBackend、RocksDBStateBackend入篮。
Flink中的時間窗口
Flink 中的時間和其他流式計算系統(tǒng)的時間一樣分為三類:事件時間陈瘦,攝入時間,處理時間三種潮售。
如果以 EventTime 為基準(zhǔn)來定義時間窗口將形成 EventTimeWindow,要求消息本身就應(yīng)該攜帶 EventTime痊项。如果以 IngesingtTime 為基準(zhǔn)來定義時間窗口將形成 IngestingTimeWindow,以 source 的 systemTime 為準(zhǔn)。如果以 ProcessingTime 基準(zhǔn)來定義時間窗口將形成 ProcessingTimeWindow酥诽,以 operator 的 systemTime 為準(zhǔn)鞍泉。
Flink 中水印
Watermark 是 Apache Flink 為了處理 EventTime 窗口計算提出的一種機(jī)制, 本質(zhì)上是一種時間戳。一般來講 Watermark 經(jīng)常和 Window 一起被用來處理亂序事件盆均。
通過watermark機(jī)制來處理out-of-order的問題塞弊,屬于第一層防護(hù),屬于全局性的防護(hù)泪姨,通常說的亂序問題的解決辦法游沿,就是指這類;
通過窗口上的allowedLateness機(jī)制來處理out-of-order的問題肮砾,屬于第二層防護(hù)诀黍,屬于特定window operator的防護(hù),late element的問題就是指這類仗处。
Flink Table & SQL ?
TableEnvironment 是 Table API 和 SQL 集成的核心概念眯勾。
這個類主要用來:
在內(nèi)部 catalog 中注冊表
注冊外部 catalog
執(zhí)行 SQL 查詢
注冊用戶定義(標(biāo)量,表或聚合)函數(shù)
將 DataStream 或 DataSet 轉(zhuǎn)換為表
持有對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
一次完整的 SQL 解析過程如下:
用戶使用對外提供 Stream SQL 的語法開發(fā)業(yè)務(wù)應(yīng)用
用 calcite 對 StreamSQL 進(jìn)行語法檢驗婆誓,語法檢驗通過后吃环,轉(zhuǎn)換成 calcite 的邏輯樹節(jié)點;最終形成 calcite 的邏輯計劃
采用 Flink 自定義的優(yōu)化規(guī)則和 calcite 火山模型洋幻、啟發(fā)式模型共同對邏輯樹進(jìn)行優(yōu)化郁轻,生成最優(yōu)的 Flink 物理計劃
對物理計劃采用 janino codegen 生成代碼,生成用低階 API DataStream 描述的流應(yīng)用,提交到 Flink 平臺執(zhí)行
Flink 是如何支持批流一體的好唯?
Flink 的開發(fā)者認(rèn)為批處理是流處理的一種特殊情況竭沫。批處理是有限的流處理。Flink 使用一個引擎支持了 DataSet API 和 DataStream API骑篙。
Flink 中Task如何做到數(shù)據(jù)交換
在一個 Flink Job 中蜕提,數(shù)據(jù)需要在不同的 task 中進(jìn)行交換,整個數(shù)據(jù)交換是有 TaskManager 負(fù)責(zé)的靶端,TaskManager 的網(wǎng)絡(luò)組件首先從緩沖 buffer 中收集 records谎势,然后再發(fā)送。Records 并不是一個一個被發(fā)送的躲查,是積累一個批次再發(fā)送它浅,batch 技術(shù)可以更加高效的利用網(wǎng)絡(luò)資源。
Flink 是如何容錯的
Flink 實現(xiàn)容錯主要靠強(qiáng)大的 CheckPoint 機(jī)制和 State 機(jī)制镣煮。Checkpoint 負(fù)責(zé)定時制作分布式快照姐霍、對程序中的狀態(tài)進(jìn)行備份;State 用來存儲計算過程中的中間狀態(tài)典唇。
Flink 實現(xiàn)分布式快照
Flink 的分布式快照是根據(jù) Chandy-Lamport 算法量身定做的镊折。簡單來說就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流及其狀態(tài)的一致快照。
核心思想是在 input source 端插入 barrier介衔,控制 barrier 的同步來實現(xiàn) snapshot 的備份和 exactly-once 語義恨胚。
Flink 如何保證 exactly-Once 語義
Flink 通過實現(xiàn)兩階段提交和狀態(tài)保存來實現(xiàn)端到端的一致性語義。分為以下幾個步驟:
開始事務(wù)(beginTransaction)創(chuàng)建一個臨時文件夾炎咖,來寫把數(shù)據(jù)寫入到這個文件夾里面
預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉
正式提交(commit)將之前寫完的臨時文件放入目標(biāo)目錄下赃泡。這代表著最終的數(shù)據(jù)會有一些延遲
丟棄(abort)丟棄臨時文件
若失敗發(fā)生在預(yù)提交成功后,正式提交前乘盼∩埽可以根據(jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)绸栅。
Flimk 如何做內(nèi)存管理
Flink 并不是將大量對象存在堆上级野,而是將對象都序列化到一個預(yù)分配的內(nèi)存塊上。此外粹胯,F(xiàn)link 大量的使用了堆外內(nèi)存蓖柔。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會將部分?jǐn)?shù)據(jù)存儲到硬盤上风纠。Flink 為了直接操作二進(jìn)制數(shù)據(jù)實現(xiàn)了自己的序列化框架况鸣。
理論上 Flink 的內(nèi)存管理分為三部分:
Network Buffers:這個是在 TaskManager 啟動的時候分配的,這是一組用于緩存網(wǎng)絡(luò)數(shù)據(jù)的內(nèi)存竹观,每個塊是 32K镐捧,默認(rèn)分配 2048 個,可以通過“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的 Memory Segment 塊,用于運行時的算法(Sort/Join/Shuffle 等)愤估,這部分啟動的時候就會分配。下面這段代碼速址,根據(jù)配置文件中的各種參數(shù)來計算內(nèi)存的分配方法玩焰。(heap or off-heap,這個放到下節(jié)談)芍锚,內(nèi)存的分配支持預(yù)分配和 lazy load昔园,默認(rèn)懶加載的方式。
User Code并炮,這部分是除了 Memory Manager 之外的內(nèi)存用于 User code 和 TaskManager 本身的數(shù)據(jù)結(jié)構(gòu)默刚。
Flink 的序列化
link 摒棄了 Java 原生的序列化方法,以獨特的方式處理數(shù)據(jù)類型和序列化逃魄,包含自己的類型描述符荤西,泛型類型提取和類型序列化框架。
TypeInformation 是所有類型描述符的基類伍俘。它揭示了該類型的一些基本屬性邪锌,并且可以生成序列化器。TypeInformation 支持以下幾種類型:
BasicTypeInfo: 任意 Java 基本類型或 String 類型
BasicArrayTypeInfo: 任意 Java 基本類型數(shù)組或 String 數(shù)組
WritableTypeInfo: 任意 Hadoop Writable 接口的實現(xiàn)類
TupleTypeInfo: 任意的 Flink Tuple 類型(支持 Tuple1 to Tuple25)癌瘾。Flink tuples 是固定長度固定類型的 Java Tuple 實現(xiàn)
CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
PojoTypeInfo: 任意的 POJO (Java or Scala)觅丰,例如,Java 對象的所有成員變量妨退,要么是 public 修飾符定義妇萄,要么有 getter/setter 方法
GenericTypeInfo: 任意無法匹配之前幾種類型的類
Flink中window 出現(xiàn)數(shù)據(jù)傾斜怎么解決?
window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多咬荷。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的冠句。出現(xiàn)這種情況一般通過兩種方式來解決:
在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合
重新設(shè)計窗口聚合的 key
Flink 任務(wù)延時高,如何入手萍丐?
在 Flink 的后臺任務(wù)管理中轩端,我們可以看到 Flink 的哪個算子和 task 出現(xiàn)了反壓。最主要的手段是資源調(diào)優(yōu)和算子調(diào)優(yōu)逝变。資源調(diào)優(yōu)即是對作業(yè)中的 Operator 的并發(fā)數(shù)(parallelism)基茵、CPU(core)、堆內(nèi)存(heap_memory)等參數(shù)進(jìn)行調(diào)優(yōu)壳影。作業(yè)參數(shù)調(diào)優(yōu)包括:并行度的設(shè)置拱层,State 的設(shè)置,checkpoint 的設(shè)置宴咧。
Flink 反壓
Flink 內(nèi)部是基于 producer-consumer 模型來進(jìn)行消息傳遞的根灯,F(xiàn)link 的反壓設(shè)計也是基于這個模型。Flink 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣烙肺。下游消費者消費變慢纳猪,上游就會受到阻塞。
Operator Chains(算子鏈)這個概念你了解嗎
為了更高效地分布式執(zhí)行桃笙,F(xiàn)link 會盡可能地將 operator 的 subtask 鏈接(chain)在一起形成 task氏堤。每個 task 在一個線程中執(zhí)行。將 operators 鏈接成 task 是非常有效的優(yōu)化:它能減少線程之間的切換搏明,減少消息的序列化/反序列化鼠锈,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時提高整體的吞吐量星著。這就是我們所說的算子鏈购笆。
Flink 什么情況下才會把 Operator chain 在一起形成算子鏈?
兩個 operator chain 在一起的的條件:
上下游的并行度一致
下游節(jié)點的入度為 1 (也就是說下游節(jié)點沒有來自其他節(jié)點的輸入)
上下游節(jié)點都在同一個 slot group 中(下面會解釋 slot group)
下游節(jié)點的 chain 策略為 ALWAYS(可以與上下游鏈接虚循,map同欠、flatmap、filter 等默認(rèn)是 ALWAYS)
上游節(jié)點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接横缔,不能與上游鏈接行您,Source 默認(rèn)是 HEAD)
兩個節(jié)點間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
用戶沒有禁用 chain
Flink Job 的提交流程
用戶提交的 Flink Job 會被轉(zhuǎn)化成一個 DAG 任務(wù)運行,分別是:StreamGraph剪廉、JobGraph娃循、ExecutionGraph,F(xiàn)link 中 JobManager 與 TaskManager斗蒋,JobManager 與 Client 的交互是基于 Akka 工具包的捌斧,是通過消息驅(qū)動。整個 Flink Job 的提交還包含著 ActorSystem 的創(chuàng)建泉沾,JobManager 的啟動捞蚂,TaskManager 的啟動和注冊。
Flink "三層圖" 結(jié)構(gòu)
一個 Flink 任務(wù)的 DAG 生成計算圖大致經(jīng)歷以下三個過程:
StreamGraph最接近代碼所表達(dá)的邏輯層面的計算拓?fù)浣Y(jié)構(gòu)跷究,按照用戶代碼的執(zhí)行順序向 StreamExecutionEnvironment 添加 StreamTransformation 構(gòu)成流式圖姓迅。
JobGraph從 StreamGraph 生成,將可以串聯(lián)合并的節(jié)點進(jìn)行合并俊马,設(shè)置節(jié)點之間的邊丁存,安排資源共享 slot 槽位和放置相關(guān)聯(lián)的節(jié)點,上傳任務(wù)所需的文件柴我,設(shè)置檢查點配置等解寝。相當(dāng)于經(jīng)過部分初始化和優(yōu)化處理的任務(wù)圖。
ExecutionGraph由 JobGraph 轉(zhuǎn)換而來艘儒,包含了任務(wù)具體執(zhí)行所需的內(nèi)容聋伦,是最貼近底層實現(xiàn)的執(zhí)行圖夫偶。
JobManager 在集群中扮演角色
JobManager 負(fù)責(zé)整個 Flink 集群任務(wù)的調(diào)度以及資源的管理,從客戶端中獲取提交的應(yīng)用觉增,然后根據(jù)集群中 TaskManager 上 TaskSlot 的使用情況兵拢,為提交的應(yīng)用分配相應(yīng)的 TaskSlot 資源并命令 TaskManager 啟動從客戶端中獲取的應(yīng)用。
JobManager 相當(dāng)于整個集群的 Master 節(jié)點逾礁,且整個集群有且只有一個活躍的 JobManager 卵佛,負(fù)責(zé)整個集群的任務(wù)管理和資源管理。
JobManager 和 TaskManager 之間通過 Actor System 進(jìn)行通信敞斋,獲取任務(wù)執(zhí)行的情況并通過 Actor System 將應(yīng)用的任務(wù)執(zhí)行情況發(fā)送給客戶端。
同時在任務(wù)執(zhí)行的過程中疾牲,F(xiàn)link JobManager 會觸發(fā) Checkpoint 操作植捎,每個 TaskManager 節(jié)點 收到 Checkpoint 觸發(fā)指令后,完成 Checkpoint 操作阳柔,所有的 Checkpoint 協(xié)調(diào)過程都是在 Fink JobManager 中完成焰枢。
當(dāng)任務(wù)完成后,F(xiàn)link 會將任務(wù)執(zhí)行的信息反饋給客戶端舌剂,并且釋放掉 TaskManager 中的資源以供下一次提交任務(wù)使用济锄。
JobManager 在集群中起什么作用?
JobManager 的職責(zé)主要是接收 Flink 作業(yè)霍转,調(diào)度 Task荐绝,收集作業(yè)狀態(tài)和管理 TaskManager。它包含一個 Actor避消,并且做如下操作:
RegisterTaskManager: 它由想要注冊到 JobManager 的 TaskManager 發(fā)送低滩。注冊成功會通過 AcknowledgeRegistration 消息進(jìn)行 Ack。
SubmitJob: 由提交作業(yè)到系統(tǒng)的 Client 發(fā)送岩喷。提交的信息是 JobGraph 形式的作業(yè)描述信息恕沫。
CancelJob: 請求取消指定 id 的作業(yè)。成功會返回 CancellationSuccess纱意,否則返回 CancellationFailure婶溯。
UpdateTaskExecutionState: 由 TaskManager 發(fā)送,用來更新執(zhí)行節(jié)點(ExecutionVertex)的狀態(tài)偷霉。成功則返回 true迄委,否則返回 false。
RequestNextInputSplit: TaskManager 上的 Task 請求下一個輸入 split类少,成功則返回 NextInputSplit跑筝,否則返回 null。
JobStatusChanged: 它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化瞒滴。這個消息由 ExecutionGraph 發(fā)送曲梗。
TaskManager 在集群中扮演的角色
TaskManager 相當(dāng)于整個集群的 Slave 節(jié)點赞警,負(fù)責(zé)具體的任務(wù)執(zhí)行和對應(yīng)任務(wù)在每個節(jié)點上的資源申請和管理。
客戶端通過將編寫好的 Flink 應(yīng)用編譯打包虏两,提交到 JobManager愧旦,然后 JobManager 會根據(jù)已注冊在 JobManager 中 TaskManager 的資源情況,將任務(wù)分配給有資源的 TaskManager 節(jié)點定罢,然后啟動并運行任務(wù)笤虫。
TaskManager 從 JobManager 接收需要部署的任務(wù),然后使用 Slot 資源啟動 Task祖凫,建立數(shù)據(jù)接入的網(wǎng)絡(luò)連接琼蚯,接收數(shù)據(jù)并開始數(shù)據(jù)處理。同時 TaskManager 之間的數(shù)據(jù)交互都是通過數(shù)據(jù)流的方式進(jìn)行的惠况。
可以看出遭庶,F(xiàn)link 的任務(wù)運行其實是采用多線程的方式,這和 MapReduce 多 JVM 進(jìn)行的方式有很大的區(qū)別稠屠,F(xiàn)link 能夠極大提高 CPU 使用效率峦睡,在多個任務(wù)和 Task 之間通過 TaskSlot 方式共享系統(tǒng)資源,每個 TaskManager 中通過管理多個 TaskSlot 資源池進(jìn)行對資源進(jìn)行有效管理权埠。
TaskManager 在集群啟動過程中起到什么作用榨了?
TaskManager 的啟動流程較為簡單:啟動類:org.apache.flink.runtime.taskmanager.TaskManager核心啟動方法 : selectNetworkInterfaceAndRunTaskManager啟動后直接向 JobManager 注冊自己,注冊完成后攘蔽,進(jìn)行部分模塊的初始化
Flink 計算資源調(diào)度如何實現(xiàn)龙屉?
TaskManager 中最細(xì)粒度的資源是 Task slot,代表了一個固定大小的資源子集满俗,每個 TaskManager 會將其所占有的資源平分給它的 slot叔扼。
通過調(diào)整 task slot 的數(shù)量,用戶可以定義 task 之間是如何相互隔離的漫雷。每個 TaskManager 有一個 slot瓜富,也就意味著每個 task 運行在獨立的 JVM 中。每個 TaskManager 有多個 slot 的話降盹,也就是說多個 task 運行在同一個 JVM 中与柑。
而在同一個 JVM 進(jìn)程中的 task,可以共享 TCP 連接(基于多路復(fù)用)和心跳消息蓄坏,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸价捧,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個 task 的消耗涡戳。 每個 slot 可以接受單個 task结蟋,也可以接受多個連續(xù) task 組成的 pipeline,如下圖所示渔彰,F(xiàn)latMap 函數(shù)占用一個 taskslot嵌屎,而 key Agg 函數(shù)和 sink 函數(shù)共用一個 taskslot
Flink 的數(shù)據(jù)抽象及數(shù)據(jù)交換過程推正?
Flink 為了避免 JVM 的固有缺陷例如 java 對象存儲密度低,F(xiàn)GC 影響吞吐和響應(yīng)等宝惰,實現(xiàn)了自主管理內(nèi)存植榕。MemorySegment 就是 Flink 的內(nèi)存抽象。默認(rèn)情況下尼夺,一個 MemorySegment 可以被看做是一個 32kb 大的內(nèi)存塊的抽象尊残。這塊內(nèi)存既可以是 JVM 里的一個 byte[],也可以是堆外內(nèi)存(DirectByteBuffer)淤堵。
在 MemorySegment 這個抽象之上寝衫,F(xiàn)link 在數(shù)據(jù)從 operator 內(nèi)的數(shù)據(jù)對象在向 TaskManager 上轉(zhuǎn)移,預(yù)備被發(fā)給下個節(jié)點的過程中拐邪,使用的抽象或者說內(nèi)存對象是 Buffer慰毅。
對接從 Java 對象轉(zhuǎn)為 Buffer 的中間對象是另一個抽象 StreamRecord。
Flink 中分布式快照機(jī)制如何實現(xiàn)庙睡?
Flink 的容錯機(jī)制的核心部分是制作分布式數(shù)據(jù)流和操作算子狀態(tài)的一致性快照。 這些快照充當(dāng)一致性 checkpoint技俐,系統(tǒng)可以在發(fā)生故障時回滾乘陪。 Flink 用于制作這些快照的機(jī)制在“分布式數(shù)據(jù)流的輕量級異步快照”中進(jìn)行了描述。 它受到分布式快照的標(biāo)準(zhǔn) Chandy-Lamport 算法的啟發(fā)雕擂,專門針對 Flink 的執(zhí)行模型而定制啡邑。
barriers 在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中【模快照 n 的 barriers 被插入的位置(我們稱之為 Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置谤逼。例如,在 Apache Kafka 中仇穗,此位置將是分區(qū)中最后一條記錄的偏移量流部。 將該位置 Sn 報告給 checkpoint 協(xié)調(diào)器(Flink 的 JobManager)。
然后 barriers 向下游流動纹坐。當(dāng)一個中間操作算子從其所有輸入流中收到快照 n 的 barriers 時枝冀,它會為快照 n 發(fā)出 barriers 進(jìn)入其所有輸出流中。 一旦 sink 操作算子(流式 DAG 的末端)從其所有輸入流接收到 barriers n耘子,它就向 checkpoint 協(xié)調(diào)器確認(rèn)快照 n 完成果漾。在所有 sink 確認(rèn)快照后,意味快照著已完成谷誓。
一旦完成快照 n绒障,job 將永遠(yuǎn)不再向數(shù)據(jù)源請求 Sn 之前的記錄,因為此時這些記錄(及其后續(xù)記錄)將已經(jīng)通過整個數(shù)據(jù)流拓?fù)浜赐幔布词且呀?jīng)被處理結(jié)束户辱。
FlinkSQL 的是如何實現(xiàn)的鸵钝?
構(gòu)建抽象語法樹的事情交給了 Calcite 去做。SQL query 會經(jīng)過 Calcite 解析器轉(zhuǎn)變成 SQL 節(jié)點樹焕妙,通過驗證后構(gòu)建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)蒋伦。另一邊,Table API 上的調(diào)用會構(gòu)建成 Table API 的抽象語法樹焚鹊,并通過 Calcite 提供的 RelBuilder 轉(zhuǎn)變成 Calcite 的抽象語法樹痕届。然后依次被轉(zhuǎn)換成邏輯執(zhí)行計劃和物理執(zhí)行計劃。
在提交任務(wù)后會分發(fā)到各個 TaskManager 中運行末患,在運行時會使用 Janino 編譯器編譯代碼后運行研叫。
Flink 解決時差8 小時
時區(qū)問題解決方案比較多吧,要想不傷筋動骨璧针,主要介紹以下三種:
flink端不做處理嚷炉。也即是在讀取數(shù)據(jù)的時候加上8小時的offset。
使用udf等算子給時間戳加上8小時的offset探橱。
sink內(nèi)部做處理申屹。
public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 28800000;
return new Timestamp(timestamp);
}
}
tEnv.registerFunction("utc2local",new UTC2Local());
Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");