Flink 狀態(tài)編程

1.流式計算分為無狀態(tài)和有狀態(tài)兩種情況集索。無狀態(tài)的計算觀察每個獨立事件屿愚,并根據(jù)最后一個事件輸出結(jié)果。例如务荆,流處理應(yīng)用程序從傳感器接收水位數(shù)據(jù)妆距,并在水位超過指定高度時發(fā)出警告。有狀態(tài)的計算則會基于多個事件輸出結(jié)果函匕。以下是一些例子娱据。

(1)所有類型的窗口。例如盅惜,計算過去一小時的平均水位吸耿,就是有狀態(tài)的計算。

(2)所有用于復(fù)雜事件處理的狀態(tài)機酷窥。例如咽安,若在一分鐘內(nèi)收到兩個相差20cm以上的水位差讀數(shù),則發(fā)出警告蓬推,這是有狀態(tài)的計算妆棒。

(3)流與流之間的所有關(guān)聯(lián)操作,以及流與靜態(tài)表或動態(tài)表之間的關(guān)聯(lián)操作沸伏,都是有狀態(tài)的計算糕珊。

2.下圖展示了無狀態(tài)流處理和有狀態(tài)流處理的主要區(qū)別。無狀態(tài)流處理分別接收每條數(shù)據(jù)記錄(圖中的黑條)毅糟,然后根據(jù)最新輸入的數(shù)據(jù)生成輸出數(shù)據(jù)(白條)红选。有狀態(tài)流處理會維護狀態(tài)(根據(jù)每條輸入記錄進行更新),并基于最新輸入的記錄和當(dāng)前的狀態(tài)值生成輸出記錄(灰條)姆另。

無狀態(tài)和有狀態(tài)的流處理

上圖中輸入數(shù)據(jù)由黑條表示喇肋。無狀態(tài)流處理每次只轉(zhuǎn)換一條輸入記錄坟乾,并且僅根據(jù)最新的輸入記錄輸出結(jié)果(白條)。有狀態(tài) 流處理維護所有已處理記錄的狀態(tài)值蝶防,并根據(jù)每條新輸入的記錄更新狀態(tài)甚侣,因此輸出記錄(灰條)反映的是綜合考慮多個事件之后的結(jié)果。

3.有狀態(tài)的算子和應(yīng)用程序

Flink內(nèi)置的很多算子间学,數(shù)據(jù)源source殷费,數(shù)據(jù)存儲sink都是有狀態(tài)的,流中的數(shù)據(jù)都是buffer records低葫,會保存一定的元素或者元數(shù)據(jù)详羡。例如: ProcessWindowFunction會緩存輸入流的數(shù)據(jù),ProcessFunction會保存設(shè)置的定時器信息等等嘿悬。

在Flink中实柠,狀態(tài)始終與特定算子相關(guān)聯(lián)∪的總的來說主到,有兩種類型的狀態(tài):

算子狀態(tài)(operator state)

鍵控狀態(tài)(keyed state)

4.算子狀態(tài)(operator state)

算子狀態(tài)的作用范圍限定為算子任務(wù)。這意味著由同一并行任務(wù)所處理的所有數(shù)據(jù)都可以訪問到相同的狀態(tài)躯概,狀態(tài)對于同一任務(wù)而言是共享的登钥。算子狀態(tài)不能由相同或不同算子的另一個任務(wù)訪問。

具有算子狀態(tài)的任務(wù)

Flink為算子狀態(tài)提供三種基本數(shù)據(jù)結(jié)構(gòu):

列表狀態(tài)(List state):將狀態(tài)表示為一組數(shù)據(jù)的列表娶靡。

聯(lián)合列表狀態(tài)(Union list state):也將狀態(tài)表示為數(shù)據(jù)的列表牧牢。它與常規(guī)列表狀態(tài)的區(qū)別在于,在發(fā)生故障時姿锭,或者從保存點(savepoint)啟動應(yīng)用程序時如何恢復(fù)塔鳍。

廣播狀態(tài)(Broadcast state):如果一個算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同呻此,那么這種特殊情況最適合應(yīng)用廣播狀態(tài)

5.鍵控狀態(tài)(keyed state)

鍵控狀態(tài)是根據(jù)輸入數(shù)據(jù)流中定義的鍵(key)來維護和訪問的轮纫。Flink為每個鍵值維護一個狀態(tài)實例,并將具有相同鍵的所有數(shù)據(jù)焚鲜,都分區(qū)到同一個算子任務(wù)中掌唾,這個任務(wù)會維護和處理這個key對應(yīng)的狀態(tài)。當(dāng)任務(wù)處理一條數(shù)據(jù)時忿磅,它會自動將狀態(tài)的訪問范圍限定為當(dāng)前數(shù)據(jù)的key糯彬。因此,具有相同key的所有數(shù)據(jù)都會訪問相同的狀態(tài)葱她。Keyed State很類似于一個分布式的key-value?map數(shù)據(jù)結(jié)構(gòu)撩扒,只能用于KeyedStream(keyBy算子處理之后)。

具有鍵控狀態(tài)的任務(wù)

6.狀態(tài)后端(state backend)

每傳入一條數(shù)據(jù)吨些,有狀態(tài)的算子任務(wù)都會讀取和更新狀態(tài)搓谆。由于有效的狀態(tài)訪問對于處理數(shù)據(jù)的低延遲至關(guān)重要炒辉,因此每個并行任務(wù)都會在本地維護其狀態(tài),以確蓖彀危快速的狀態(tài)訪問辆脸。狀態(tài)的存儲但校、訪問以及維護螃诅,由一個可插入的組件決定,這個組件就叫做狀態(tài)后端(state backend)

狀態(tài)后端主要負(fù)責(zé)兩件事:

1)本地的狀態(tài)管理

2)將檢查點(checkpoint)狀態(tài)寫入遠(yuǎn)程存儲

狀態(tài)后端分類:

(1)MemoryStateBackend

內(nèi)存級的狀態(tài)后端状囱,會將鍵控狀態(tài)作為內(nèi)存中的對象進行管理术裸,將它們存儲在TaskManager的JVM堆上;而將checkpoint存儲在JobManager的內(nèi)存中亭枷。

(2)FsStateBackend

將checkpoint存到遠(yuǎn)程的持久化文件系統(tǒng)(FileSystem)上袭艺。而對于本地狀態(tài),跟MemoryStateBackend一樣叨粘,也會存在TaskManager的JVM堆上猾编。

(3)RocksDBStateBackend

將所有狀態(tài)序列化后,存入本地的RocksDB中存儲升敲。

7.狀態(tài)一致性

當(dāng)在分布式系統(tǒng)中引入狀態(tài)時答倡,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法驴党,也就是說在成功處理故障并恢復(fù)之后得到的結(jié)果瘪撇,與沒有發(fā)生任何故障時得到的結(jié)果相比,前者到底有多正確港庄?舉例來說倔既,假設(shè)要對最近一小時登錄的用戶計數(shù)。在系統(tǒng)經(jīng)歷故障之后鹏氧,計數(shù)結(jié)果是多少渤涌?如果有偏差,是有漏掉的計數(shù)還是重復(fù)計數(shù)把还?

1)一致性級別

在流處理中实蓬,一致性可以分為3個級別:

(1)at-most-once: 這其實是沒有正確性保障的委婉說法——故障發(fā)生之后,計數(shù)結(jié)果可能丟失笨篷。同樣的還有udp瞳秽。

(2)at-least-once: 這表示計數(shù)結(jié)果可能大于正確值,但絕不會小于正確值率翅。也就是說练俐,計數(shù)程序在發(fā)生故障后可能多算,但是絕不會少算冕臭。

(3)exactly-once: 這指的是系統(tǒng)保證在發(fā)生故障后得到的計數(shù)結(jié)果與正確值一致腺晾。

曾經(jīng)燕锥,at-least-once非常流行。第一代流處理器(如Storm和Samza)剛問世時只保證at-least-once悯蝉,原因有二归形。

(1)保證exactly-once的系統(tǒng)實現(xiàn)起來更復(fù)雜。這在基礎(chǔ)架構(gòu)層(決定什么代表正確鼻由,以及exactly-once的范圍是什么)和實現(xiàn)層都很有挑戰(zhàn)性暇榴。

(2)流處理系統(tǒng)的早期用戶愿意接受框架的局限性,并在應(yīng)用層想辦法彌補(例如使應(yīng)用程序具有冪等性蕉世,或者用批量計算層再做一遍計算)蔼紧。

最先保證exactly-once的系統(tǒng)(Storm Trident和Spark Streaming)在性能和表現(xiàn)力這兩個方面付出了很大的代價。為了保證exactly-once狠轻,這些系統(tǒng)無法單獨地對每條記錄運用應(yīng)用邏輯奸例,而是同時處理多條(一批)記錄,保證對每一批的處理要么全部成功向楼,要么全部失敗查吊。這就導(dǎo)致在得到結(jié)果前,必須等待一批記錄處理結(jié)束湖蜕。因此逻卖,用戶經(jīng)常不得不使用兩個流處理框架(一個用來保證exactly-once,另一個用來對每個元素做低延遲處理)重荠,結(jié)果使基礎(chǔ)設(shè)施更加復(fù)雜箭阶。曾經(jīng),用戶不得不在保證exactly-once與獲得低延遲和效率之間權(quán)衡利弊戈鲁。Flink避免了這種權(quán)衡仇参。

Flink的一個重大價值在于,它既保證了exactly-once婆殿,也具有低延遲和高吞吐的處理能力诈乒。

從根本上說,F(xiàn)link通過使自身滿足所有需求來避免權(quán)衡婆芦,它是業(yè)界的一次意義重大的技術(shù)飛躍怕磨。盡管這在外行看來很神奇,但是一旦了解消约,就會恍然大悟肠鲫。

2)端到端(end-to-end)狀態(tài)一致性

目前我們看到的一致性保證都是由流處理器實現(xiàn)的,也就是說都是在 Flink 流處理器內(nèi)部保證的或粮;而在真實應(yīng)用中导饲,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)。

端到端的一致性保證,意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的始終渣锦;每一個組件都保證了它自己的一致性硝岗,整個端到端的一致性級別取決于所有組件中一致性最弱的組件。具體可以劃分如下:

1)source端 —— 需要外部源可重設(shè)數(shù)據(jù)的讀取位置

2)link內(nèi)部 —— 依賴checkpoint

3)sink端 —— 需要保證從故障恢復(fù)時袋毙,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng)

而對于sink端型檀,又有兩種具體的實現(xiàn)方式:冪等(Idempotent)寫入和事務(wù)性(Transactional)寫入。

4)冪等寫入

所謂冪等操作听盖,是說一個操作胀溺,可以重復(fù)執(zhí)行很多次,但只導(dǎo)致一次結(jié)果更改媳溺,也就是說月幌,后面再重復(fù)執(zhí)行就不起作用了碍讯。

5)事務(wù)寫入

需要構(gòu)建事務(wù)來寫入外部系統(tǒng)悬蔽,構(gòu)建的事務(wù)對應(yīng)著 checkpoint,等到 checkpoint 真正完成的時候捉兴,才把所有對應(yīng)的結(jié)果寫入 sink 系統(tǒng)中蝎困。

對于事務(wù)性寫入,具體又有兩種實現(xiàn)方式:預(yù)寫日志(WAL)和兩階段提交(2PC)倍啥。

二階段提交

8.檢查點(checkpoint)

Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性禾乘,在出現(xiàn)故障時將系統(tǒng)重置回正確狀態(tài)。下面通過簡單的類比來解釋檢查點的作用虽缕。

9.Flink+Kafka如何實現(xiàn)端到端的exactly-once語義

我們知道始藕,端到端的狀態(tài)一致性的實現(xiàn),需要每一個組件都實現(xiàn)氮趋,對于Flink + Kafka的數(shù)據(jù)管道系統(tǒng)(Kafka進伍派、Kafka出)而言,各組件怎樣保證exactly-once語義呢剩胁?

1)內(nèi)部 —— 利用checkpoint機制诉植,把狀態(tài)存盤,發(fā)生故障的時候可以恢復(fù)昵观,保證內(nèi)部的狀態(tài)一致性

2)source —— kafka consumer作為source晾腔,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障啊犬,恢復(fù)的時候可以由連接器重置偏移量灼擂,重新消費數(shù)據(jù),保證一致性

3)sink —— kafka producer作為sink觉至,采用兩階段提交 sink剔应,需要實現(xiàn)一個TwoPhaseCommitSinkFunction

內(nèi)部的checkpoint機制我們已經(jīng)有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析领斥。

我們知道Flink由JobManager協(xié)調(diào)各個TaskManager進行checkpoint存儲嫉到,checkpoint保存在 StateBackend中,默認(rèn)StateBackend是內(nèi)存級的月洛,也可以改為文件級的進行持久化保存何恶。

當(dāng)checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數(shù)據(jù)流嚼黔;barrier會在算子間傳遞下去细层。

每個算子會對當(dāng)前的狀態(tài)做個快照,保存到狀態(tài)后端唬涧。對于source任務(wù)而言疫赎,就會把當(dāng)前的offset作為狀態(tài)保存起來。下次從checkpoint恢復(fù)時碎节,source任務(wù)可以重新提交偏移量捧搞,從上次保存的位置開始重新消費數(shù)據(jù)。

每個內(nèi)部的transform 任務(wù)遇到 barrier 時狮荔,都會把狀態(tài)存到 checkpoint 里胎撇。

sink 任務(wù)首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費)殖氏;當(dāng)遇到 barrier 時晚树,把狀態(tài)保存到狀態(tài)后端,并開啟新的預(yù)提交事務(wù)雅采。

當(dāng)所有算子任務(wù)的快照完成爵憎,也就是這次的 checkpoint 完成時,JobManager 會向所有任務(wù)發(fā)通知婚瓜,確認(rèn)這次 checkpoint 完成宝鼓。

當(dāng)sink 任務(wù)收到確認(rèn)通知,就會正式提交之前的事務(wù)闰渔,kafka 中未確認(rèn)的數(shù)據(jù)就改為“已確認(rèn)”席函,數(shù)據(jù)就真正可以被消費了。

所以我們看到冈涧,執(zhí)行過程實際上是一個兩段式提交茂附,每個算子執(zhí)行完成,會進行“預(yù)提交”督弓,直到執(zhí)行完sink操作营曼,會發(fā)起“確認(rèn)提交”,如果執(zhí)行失敗愚隧,預(yù)提交會放棄掉蒂阱。

具體的兩階段提交步驟總結(jié)如下:

1)第一條數(shù)據(jù)來了之后,開啟一個 kafka 的事務(wù)(transaction),正常寫入 kafka 分區(qū)日志但標(biāo)記為未提交录煤,這就是“預(yù)提交”

2)觸發(fā) checkpoint 操作鳄厌,barrier從 source 開始向下傳遞,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端妈踊,并通知jobmanager

3)sink 連接器收到 barrier了嚎,保存當(dāng)前狀態(tài),存入 checkpoint廊营,通知 jobmanager歪泳,并開啟下一階段的事務(wù),用于提交下個檢查點的數(shù)據(jù)

4)jobmanager 收到所有任務(wù)的通知露筒,發(fā)出確認(rèn)信息呐伞,表示 checkpoint 完成

5)sink 任務(wù)收到 jobmanager 的確認(rèn)信息,正式提交這段時間的數(shù)據(jù)

6)外部kafka關(guān)閉事務(wù)慎式,提交的數(shù)據(jù)可以正常消費了伶氢。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市瞬捕,隨后出現(xiàn)的幾起案子鞍历,更是在濱河造成了極大的恐慌,老刑警劉巖肪虎,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異惧蛹,居然都是意外死亡扇救,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門香嗓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來迅腔,“玉大人,你說我怎么就攤上這事靠娱〔琢遥” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵像云,是天一觀的道長锌雀。 經(jīng)常有香客問我,道長迅诬,這世上最難降的妖魔是什么腋逆? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮侈贷,結(jié)果婚禮上惩歉,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好撑蚌,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布上遥。 她就那樣靜靜地躺著,像睡著了一般争涌。 火紅的嫁衣襯著肌膚如雪露该。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天第煮,我揣著相機與錄音解幼,去河邊找鬼。 笑死包警,一個胖子當(dāng)著我的面吹牛撵摆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播害晦,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼特铝,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了壹瘟?” 一聲冷哼從身側(cè)響起鲫剿,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎稻轨,沒想到半個月后灵莲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡殴俱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年政冻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片线欲。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡氧腰,死狀恐怖参萄,靈堂內(nèi)的尸體忽然破棺而出渣刷,到底是詐尸還是另有隱情苗桂,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布趴泌,位于F島的核電站舟舒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏踱讨。R本人自食惡果不足惜魏蔗,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望痹筛。 院中可真熱鬧莺治,春花似錦廓鞠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至榄审,卻和暖如春砌们,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背搁进。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工浪感, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人饼问。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓影兽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親莱革。 傳聞我的和親對象是個殘疾皇子峻堰,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355