1.流式計算分為無狀態(tài)和有狀態(tài)兩種情況集索。無狀態(tài)的計算觀察每個獨立事件屿愚,并根據(jù)最后一個事件輸出結(jié)果。例如务荆,流處理應(yīng)用程序從傳感器接收水位數(shù)據(jù)妆距,并在水位超過指定高度時發(fā)出警告。有狀態(tài)的計算則會基于多個事件輸出結(jié)果函匕。以下是一些例子娱据。
(1)所有類型的窗口。例如盅惜,計算過去一小時的平均水位吸耿,就是有狀態(tài)的計算。
(2)所有用于復(fù)雜事件處理的狀態(tài)機酷窥。例如咽安,若在一分鐘內(nèi)收到兩個相差20cm以上的水位差讀數(shù),則發(fā)出警告蓬推,這是有狀態(tài)的計算妆棒。
(3)流與流之間的所有關(guān)聯(lián)操作,以及流與靜態(tài)表或動態(tài)表之間的關(guān)聯(lián)操作沸伏,都是有狀態(tài)的計算糕珊。
2.下圖展示了無狀態(tài)流處理和有狀態(tài)流處理的主要區(qū)別。無狀態(tài)流處理分別接收每條數(shù)據(jù)記錄(圖中的黑條)毅糟,然后根據(jù)最新輸入的數(shù)據(jù)生成輸出數(shù)據(jù)(白條)红选。有狀態(tài)流處理會維護狀態(tài)(根據(jù)每條輸入記錄進行更新),并基于最新輸入的記錄和當(dāng)前的狀態(tài)值生成輸出記錄(灰條)姆另。
上圖中輸入數(shù)據(jù)由黑條表示喇肋。無狀態(tài)流處理每次只轉(zhuǎn)換一條輸入記錄坟乾,并且僅根據(jù)最新的輸入記錄輸出結(jié)果(白條)。有狀態(tài) 流處理維護所有已處理記錄的狀態(tài)值蝶防,并根據(jù)每條新輸入的記錄更新狀態(tài)甚侣,因此輸出記錄(灰條)反映的是綜合考慮多個事件之后的結(jié)果。
3.有狀態(tài)的算子和應(yīng)用程序
Flink內(nèi)置的很多算子间学,數(shù)據(jù)源source殷费,數(shù)據(jù)存儲sink都是有狀態(tài)的,流中的數(shù)據(jù)都是buffer records低葫,會保存一定的元素或者元數(shù)據(jù)详羡。例如: ProcessWindowFunction會緩存輸入流的數(shù)據(jù),ProcessFunction會保存設(shè)置的定時器信息等等嘿悬。
在Flink中实柠,狀態(tài)始終與特定算子相關(guān)聯(lián)∪的總的來說主到,有兩種類型的狀態(tài):
算子狀態(tài)(operator state)
鍵控狀態(tài)(keyed state)
4.算子狀態(tài)(operator state)
算子狀態(tài)的作用范圍限定為算子任務(wù)。這意味著由同一并行任務(wù)所處理的所有數(shù)據(jù)都可以訪問到相同的狀態(tài)躯概,狀態(tài)對于同一任務(wù)而言是共享的登钥。算子狀態(tài)不能由相同或不同算子的另一個任務(wù)訪問。
Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):
列表狀態(tài)(List state):將狀態(tài)表示為一組數(shù)據(jù)的列表娶靡。
聯(lián)合列表狀態(tài)(Union list state):也將狀態(tài)表示為數(shù)據(jù)的列表牧牢。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障時姿锭,或者從保存點(savepoint)啟動應(yīng)用程序時如何恢復(fù)塔鳍。
廣播狀態(tài)(Broadcast state):如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同呻此,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)
5.鍵控狀態(tài)(keyed state)
鍵控狀態(tài)是根據(jù)輸入數(shù)據(jù)流中定義的鍵(key)來維護和訪問的轮纫。Flink為每個鍵值維護一個狀態(tài)實例,并將具有相同鍵的所有數(shù)據(jù)焚鲜,都分區(qū)到同一個算子任務(wù)中掌唾,這個任務(wù)會維護和處理這個key對應(yīng)的狀態(tài)。當(dāng)任務(wù)處理一條數(shù)據(jù)時忿磅,它會自動將狀態(tài)的訪問范圍限定為當(dāng)前數(shù)據(jù)的key糯彬。因此,具有相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)葱她。Keyed State很類似于一個分布式的key-value?map數(shù)據(jù)結(jié)構(gòu)撩扒,只能用于KeyedStream(keyBy算子處理之后)。
6.狀態(tài)后端(state backend)
每傳入一條數(shù)據(jù)吨些,有狀態(tài)的算子任務(wù)都會讀取和更新狀態(tài)搓谆。由于有效的狀態(tài)訪問對于處理數(shù)據(jù)的低延遲至關(guān)重要炒辉,因此每個并行任務(wù)都會在本地維護其狀態(tài),以確蓖彀危快速的狀態(tài)訪問辆脸。狀態(tài)的存儲但校、訪問以及維護螃诅,由一個可插入的組件決定,這個組件就叫做狀態(tài)后端(state backend)
狀態(tài)后端主要負(fù)責(zé)兩件事:
1)本地的狀態(tài)管理
2)將檢查點(checkpoint)狀態(tài)寫入遠(yuǎn)程存儲
狀態(tài)后端分類:
(1)MemoryStateBackend
內(nèi)存級的狀態(tài)后端状囱,會將鍵控狀態(tài)作為內(nèi)存中的對象進行管理术裸,將它們存儲在TaskManager的JVM堆上;而將checkpoint存儲在JobManager的內(nèi)存中亭枷。
(2)FsStateBackend
將checkpoint存到遠(yuǎn)程的持久化文件系統(tǒng)(FileSystem)上袭艺。而對于本地狀態(tài),跟MemoryStateBackend一樣叨粘,也會存在TaskManager的JVM堆上猾编。
(3)RocksDBStateBackend
將所有狀態(tài)序列化后,存入本地的RocksDB中存儲升敲。
7.狀態(tài)一致性
當(dāng)在分布式系統(tǒng)中引入狀態(tài)時答倡,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法驴党,也就是說在成功處理故障并恢復(fù)之后得到的結(jié)果瘪撇,與沒有發(fā)生任何故障時得到的結(jié)果相比,前者到底有多正確港庄?舉例來說倔既,假設(shè)要對最近一小時登錄的用戶計數(shù)。在系統(tǒng)經(jīng)歷故障之后鹏氧,計數(shù)結(jié)果是多少渤涌?如果有偏差,是有漏掉的計數(shù)還是重復(fù)計數(shù)把还?
1)一致性級別
在流處理中实蓬,一致性可以分為3個級別:
(1)at-most-once: 這其實是沒有正確性保障的委婉說法——故障發(fā)生之后,計數(shù)結(jié)果可能丟失笨篷。同樣的還有udp瞳秽。
(2)at-least-once: 這表示計數(shù)結(jié)果可能大于正確值,但絕不會小于正確值率翅。也就是說练俐,計數(shù)程序在發(fā)生故障后可能多算,但是絕不會少算冕臭。
(3)exactly-once: 這指的是系統(tǒng)保證在發(fā)生故障后得到的計數(shù)結(jié)果與正確值一致腺晾。
曾經(jīng)燕锥,at-least-once非常流行。第一代流處理器(如Storm和Samza)剛問世時只保證at-least-once悯蝉,原因有二归形。
(1)保證exactly-once的系統(tǒng)實現(xiàn)起來更復(fù)雜。這在基礎(chǔ)架構(gòu)層(決定什么代表正確鼻由,以及exactly-once的范圍是什么)和實現(xiàn)層都很有挑戰(zhàn)性暇榴。
(2)流處理系統(tǒng)的早期用戶愿意接受框架的局限性,并在應(yīng)用層想辦法彌補(例如使應(yīng)用程序具有冪等性蕉世,或者用批量計算層再做一遍計算)蔼紧。
最先保證exactly-once的系統(tǒng)(Storm Trident和Spark Streaming)在性能和表現(xiàn)力這兩個方面付出了很大的代價。為了保證exactly-once狠轻,這些系統(tǒng)無法單獨地對每條記錄運用應(yīng)用邏輯奸例,而是同時處理多條(一批)記錄,保證對每一批的處理要么全部成功向楼,要么全部失敗查吊。這就導(dǎo)致在得到結(jié)果前,必須等待一批記錄處理結(jié)束湖蜕。因此逻卖,用戶經(jīng)常不得不使用兩個流處理框架(一個用來保證exactly-once,另一個用來對每個元素做低延遲處理)重荠,結(jié)果使基礎(chǔ)設(shè)施更加復(fù)雜箭阶。曾經(jīng),用戶不得不在保證exactly-once與獲得低延遲和效率之間權(quán)衡利弊戈鲁。Flink避免了這種權(quán)衡仇参。
Flink的一個重大價值在于,它既保證了exactly-once婆殿,也具有低延遲和高吞吐的處理能力诈乒。
從根本上說,F(xiàn)link通過使自身滿足所有需求來避免權(quán)衡婆芦,它是業(yè)界的一次意義重大的技術(shù)飛躍怕磨。盡管這在外行看來很神奇,但是一旦了解消约,就會恍然大悟肠鲫。
2)端到端(end-to-end)狀態(tài)一致性
目前我們看到的一致性保證都是由流處理器實現(xiàn)的,也就是說都是在 Flink 流處理器內(nèi)部保證的或粮;而在真實應(yīng)用中导饲,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)。
端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的始終渣锦;每一個組件都保證了它自己的一致性硝岗,整個端到端的一致性級別取決于所有組件中一致性最弱的組件。具體可以劃分如下:
1)source端 —— 需要外部源可重設(shè)數(shù)據(jù)的讀取位置
2)link內(nèi)部 —— 依賴checkpoint
3)sink端 —— 需要保證從故障恢復(fù)時袋毙,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng)
而對于sink端型檀,又有兩種具體的實現(xiàn)方式:冪等(Idempotent)寫入和事務(wù)性(Transactional)寫入。
4)冪等寫入
所謂冪等操作听盖,是說一個操作胀溺,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改媳溺,也就是說月幌,后面再重復(fù)執(zhí)行就不起作用了碍讯。
5)事務(wù)寫入
需要構(gòu)建事務(wù)來寫入外部系統(tǒng)悬蔽,構(gòu)建的事務(wù)對應(yīng)著 checkpoint,等到 checkpoint 真正完成的時候捉兴,才把所有對應(yīng)的結(jié)果寫入 sink 系統(tǒng)中蝎困。
對于事務(wù)性寫入,具體又有兩種實現(xiàn)方式:預(yù)寫日志(WAL)和兩階段提交(2PC)倍啥。
8.檢查點(checkpoint)
Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性禾乘,在出現(xiàn)故障時將系統(tǒng)重置回正確狀態(tài)。下面通過簡單的類比來解釋檢查點的作用虽缕。
9.Flink+Kafka如何實現(xiàn)端到端的exactly-once語義
我們知道始藕,端到端的狀態(tài)一致性的實現(xiàn),需要每一個組件都實現(xiàn)氮趋,對于Flink + Kafka的數(shù)據(jù)管道系統(tǒng)(Kafka進伍派、Kafka出)而言,各組件怎樣保證exactly-once語義呢剩胁?
1)內(nèi)部 —— 利用checkpoint機制诉植,把狀態(tài)存盤,發(fā)生故障的時候可以恢復(fù)昵观,保證內(nèi)部的狀態(tài)一致性
2)source —— kafka consumer作為source晾腔,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障啊犬,恢復(fù)的時候可以由連接器重置偏移量灼擂,重新消費數(shù)據(jù),保證一致性
3)sink —— kafka producer作為sink觉至,采用兩階段提交 sink剔应,需要實現(xiàn)一個TwoPhaseCommitSinkFunction
內(nèi)部的checkpoint機制我們已經(jīng)有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析领斥。
我們知道Flink由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲嫉到,checkpoint保存在 StateBackend中,默認(rèn)StateBackend是內(nèi)存級的月洛,也可以改為文件級的進行持久化保存何恶。
當(dāng)checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數(shù)據(jù)流嚼黔;barrier會在算子間傳遞下去细层。
每個算子會對當(dāng)前的狀態(tài)做個快照,保存到狀態(tài)后端唬涧。對于source任務(wù)而言疫赎,就會把當(dāng)前的offset作為狀態(tài)保存起來。下次從checkpoint恢復(fù)時碎节,source任務(wù)可以重新提交偏移量捧搞,從上次保存的位置開始重新消費數(shù)據(jù)。
每個內(nèi)部的transform 任務(wù)遇到 barrier 時狮荔,都會把狀態(tài)存到 checkpoint 里胎撇。
sink 任務(wù)首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費)殖氏;當(dāng)遇到 barrier 時晚树,把狀態(tài)保存到狀態(tài)后端,并開啟新的預(yù)提交事務(wù)雅采。
當(dāng)所有算子任務(wù)的快照完成爵憎,也就是這次的 checkpoint 完成時,JobManager 會向所有任務(wù)發(fā)通知婚瓜,確認(rèn)這次 checkpoint 完成宝鼓。
當(dāng)sink 任務(wù)收到確認(rèn)通知,就會正式提交之前的事務(wù)闰渔,kafka 中未確認(rèn)的數(shù)據(jù)就改為“已確認(rèn)”席函,數(shù)據(jù)就真正可以被消費了。
所以我們看到冈涧,執(zhí)行過程實際上是一個兩段式提交茂附,每個算子執(zhí)行完成,會進行“預(yù)提交”督弓,直到執(zhí)行完sink操作营曼,會發(fā)起“確認(rèn)提交”,如果執(zhí)行失敗愚隧,預(yù)提交會放棄掉蒂阱。
具體的兩階段提交步驟總結(jié)如下:
1)第一條數(shù)據(jù)來了之后,開啟一個 kafka 的事務(wù)(transaction),正常寫入 kafka 分區(qū)日志但標(biāo)記為未提交录煤,這就是“預(yù)提交”
2)觸發(fā) checkpoint 操作鳄厌,barrier從 source 開始向下傳遞,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端妈踊,并通知jobmanager
3)sink 連接器收到 barrier了嚎,保存當(dāng)前狀態(tài),存入 checkpoint廊营,通知 jobmanager歪泳,并開啟下一階段的事務(wù),用于提交下個檢查點的數(shù)據(jù)
4)jobmanager 收到所有任務(wù)的通知露筒,發(fā)出確認(rèn)信息呐伞,表示 checkpoint 完成
5)sink 任務(wù)收到 jobmanager 的確認(rèn)信息,正式提交這段時間的數(shù)據(jù)
6)外部kafka關(guān)閉事務(wù)慎式,提交的數(shù)據(jù)可以正常消費了伶氢。