Apache Flink——狀態(tài)編程

概述

Flink 處理機(jī)制的核心蝌数,就是“有狀態(tài)的流式計(jì)算”菲宴。不論是簡單聚合贷祈、窗口聚合,還是處理函數(shù)的應(yīng)用喝峦,都會有狀態(tài)的身影出現(xiàn)势誊。

狀態(tài)就如同事務(wù)處理時(shí)數(shù)據(jù)庫中保存的信息一樣,是用來輔助進(jìn)行任務(wù)計(jì)算的數(shù)據(jù)愈犹。而在 Flink 這樣的分布式系統(tǒng)中键科,我們不僅需要定義出狀態(tài)在任務(wù)并行時(shí)的處理方式闻丑,還需要考慮如何持久化保存漩怎、以便發(fā)生故障時(shí)正確地恢復(fù)勋颖。這就需要一套完整的管理機(jī)制來處理所有的狀態(tài)。

一勋锤、Flink 中的狀態(tài)

1.1 有狀態(tài)算子

在流處理中饭玲,數(shù)據(jù)是連續(xù)不斷到來和處理的。每個(gè)任務(wù)進(jìn)行計(jì)算處理時(shí)叁执,可以基于當(dāng)前數(shù)據(jù)直接轉(zhuǎn)換得到輸出結(jié)果茄厘;也可以依賴一些其他數(shù)據(jù)。這些由一個(gè)任務(wù)維護(hù)谈宛,并且用來計(jì)算輸出結(jié)果的所有數(shù)據(jù)次哈,就叫作這個(gè)任務(wù)的狀態(tài)。

在 Flink 中吆录,算子任務(wù)可以分為無狀態(tài)和有狀態(tài)兩種情況窑滞。

無狀態(tài)的算子任務(wù)只需要觀察每個(gè)獨(dú)立事件,根據(jù)當(dāng)前輸入的數(shù)據(jù)直接轉(zhuǎn)換輸出結(jié)果恢筝,例如哀卫,可以將一個(gè)字符串類型的數(shù)據(jù)拆分開作為元組輸出;也可以對數(shù)據(jù)做一些計(jì)算撬槽,比如每個(gè)代表數(shù)量的字段加 1此改。基本轉(zhuǎn)換算子侄柔,如 map共啃、filter、flatMap暂题,計(jì)算時(shí)不依賴其他數(shù)據(jù)勋磕,就都屬于無狀態(tài)的算子。

而有狀態(tài)的算子任務(wù)敢靡,則除當(dāng)前數(shù)據(jù)之外挂滓,還需要一些其他數(shù)據(jù)來得到計(jì)算結(jié)果。這里的“其他數(shù)據(jù)”啸胧,就是所謂的狀態(tài)(state)赶站,最常見的就是之前到達(dá)的數(shù)據(jù),或者由之前數(shù)據(jù)計(jì)算出的某個(gè)結(jié)果纺念。比如贝椿,做求和(sum)計(jì)算時(shí),需要保存之前所有數(shù)據(jù)的和陷谱,這就是狀態(tài)烙博;窗口算子中會保存已經(jīng)到達(dá)的所有數(shù)據(jù)瑟蜈,這些也都是它的狀態(tài)。另外渣窜,如果我們希望檢索到某種“事件模式”(event pattern)铺根,比如“先有下單行為,后有支付行為”乔宿,那么也應(yīng)該把之前的行為保存下來位迂,這同樣屬于狀態(tài)。容易發(fā)現(xiàn)详瑞,之前講過的聚合算子掂林、窗口算子都屬于有狀態(tài)的算子。

如圖所示為有狀態(tài)算子的一般處理流程坝橡,具體步驟如下泻帮。

  • 1 算子任務(wù)接收到上游發(fā)來的數(shù)據(jù);
  • 2 獲取當(dāng)前狀態(tài)计寇;
  • 3 根據(jù)業(yè)務(wù)邏輯進(jìn)行計(jì)算锣杂,更新狀態(tài);
  • 4 得到計(jì)算結(jié)果饲常,輸出發(fā)送到下游任務(wù)蹲堂。

1.2 狀態(tài)的管理

在傳統(tǒng)的事務(wù)型處理架構(gòu)中,這種額外的狀態(tài)數(shù)據(jù)是保存在數(shù)據(jù)庫中的贝淤。而對于實(shí)時(shí)流處理來說柒竞,這樣做需要頻繁讀寫外部數(shù)據(jù)庫,如果數(shù)據(jù)規(guī)模非常大肯定就達(dá)不到性能要求了播聪。所以 Flink 的解決方案是朽基,將狀態(tài)直接保存在內(nèi)存中來保證性能,并通過分布式擴(kuò)展來提高吞吐量离陶。

在 Flink 中稼虎,每一個(gè)算子任務(wù)都可以設(shè)置并行度,從而可以在不同的 slot 上并行運(yùn)行多個(gè)實(shí)例招刨,我們把它們叫作“并行子任務(wù)”霎俩。而狀態(tài)既然在內(nèi)存中,那么就可以認(rèn)為是子任務(wù)實(shí)例上的一個(gè)本地變量沉眶,能夠被任務(wù)的業(yè)務(wù)邏輯訪問和修改打却。

這樣看來狀態(tài)的管理似乎非常簡單,我們直接把它作為一個(gè)對象交給 JVM 就可以了谎倔。然而大數(shù)據(jù)的場景下柳击,我們必須使用分布式架構(gòu)來做擴(kuò)展,在低延遲片习、高吞吐的基礎(chǔ)上還要保證容錯性捌肴,一系列復(fù)雜的問題就會隨之而來了蹬叭。

  • 狀態(tài)的訪問權(quán)限:我們知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream 的状知,數(shù)據(jù)會按照 key 的哈希值進(jìn)行分區(qū)秽五,聚合處理的結(jié)果也應(yīng)該是只對當(dāng)前 key 有效。然而同一個(gè)分區(qū)(也就是 slot)上執(zhí)行的任務(wù)實(shí)例试幽,可能會包含多個(gè)key 的數(shù)據(jù)筝蚕,它們同時(shí)訪問和更改本地變量卦碾,就會導(dǎo)致計(jì)算結(jié)果錯誤铺坞。所以這時(shí)狀態(tài)并不是單純的本地變量。

  • 容錯性:也就是故障后的恢復(fù)洲胖。狀態(tài)只保存在內(nèi)存中顯然是不夠穩(wěn)定的济榨,我們需要將它持久化保存,做一個(gè)備份绿映;在發(fā)生故障后可以從這個(gè)備份中恢復(fù)狀態(tài)擒滑。

  • 還應(yīng)該考慮到分布式應(yīng)用的橫向擴(kuò)展性:比如處理的數(shù)據(jù)量增大時(shí),我們應(yīng)該相應(yīng)地對計(jì)算資源擴(kuò)容叉弦,調(diào)大并行度丐一。這時(shí)就涉及到了狀態(tài)的重組調(diào)整。

可見狀態(tài)的管理并不是一件輕松的事淹冰。好在 Flink 作為有狀態(tài)的大數(shù)據(jù)流式處理框架库车,已經(jīng)幫我們搞定了這一切。Flink 有一套完整的狀態(tài)管理機(jī)制樱拴,將底層一些核心功能全部封裝起來柠衍,包括狀態(tài)的高效存儲和訪問、持久化保存和故障恢復(fù)晶乔,以及資源擴(kuò)展時(shí)的調(diào)整珍坊。這樣,我們只需要調(diào)用相應(yīng)的 API 就可以很方便地使用狀態(tài)正罢,或?qū)?yīng)用的容錯機(jī)制進(jìn)行配置阵漏,從而將更多的精力放在業(yè)務(wù)邏輯的開發(fā)上。

1.3 狀態(tài)的分類

1.3.1 托管狀態(tài)(Managed State)和原始狀態(tài)(Raw State)

Flink 的狀態(tài)有兩種:托管狀態(tài)(Managed State)和原始狀態(tài)(Raw State)翻具。托管狀態(tài)就是由 Flink 統(tǒng)一管理的履怯,狀態(tài)的存儲訪問、故障恢復(fù)和重組等一系列問題都由 Flink 實(shí)現(xiàn)呛占,我們只要調(diào)接口就可以虑乖;而原始狀態(tài)則是自定義的,相當(dāng)于就是開辟了一塊內(nèi)存晾虑,需要我們自己管理疹味,實(shí)現(xiàn)狀態(tài)的序列化和故障恢復(fù)仅叫。

具體來講,托管狀態(tài)是由 Flink 的運(yùn)行時(shí)(Runtime)來托管的糙捺;在配置容錯機(jī)制后诫咱,狀態(tài)會自動持久化保存,并在發(fā)生故障時(shí)自動恢復(fù)洪灯。當(dāng)應(yīng)用發(fā)生橫向擴(kuò)展時(shí)坎缭,狀態(tài)也會自動地重組分配到所有的子任務(wù)實(shí)例上。對于具體的狀態(tài)內(nèi)容签钩,F(xiàn)link 也提供了值狀態(tài)(ValueState)掏呼、列表狀態(tài)(ListState)、映射狀態(tài)(MapState)铅檩、聚合狀態(tài)(AggregateState)等多種結(jié)構(gòu)憎夷,內(nèi)部支持各種數(shù)據(jù)類型。聚合昧旨、窗口等算子中內(nèi)置的狀態(tài)拾给,就都是托管狀態(tài);我們也可以在富函數(shù)類(RichFunction)中通過上下文來自定義狀態(tài)兔沃,這些也都是托管狀態(tài)蒋得。

而對比之下,原始狀態(tài)就全部需要自定義了乒疏。Flink 不會對狀態(tài)進(jìn)行任何自動操作额衙,也不知道狀態(tài)的具體數(shù)據(jù)類型,只會把它當(dāng)作最原始的字節(jié)(Byte)數(shù)組來存儲缰雇。我們需要花費(fèi)大量的精力來處理狀態(tài)的管理和維護(hù)入偷。

所以只有在遇到托管狀態(tài)無法實(shí)現(xiàn)的特殊需求時(shí),我們才會考慮使用原始狀態(tài)械哟;一般情況下不推薦使用疏之。絕大多數(shù)應(yīng)用場景,我們都可以用 Flink 提供的算子或者自定義托管狀態(tài)來實(shí)現(xiàn)需求暇咆。

1.3.2 算子狀態(tài)(Operator State)和按鍵分區(qū)狀態(tài)(Keyed State)

接下來的重點(diǎn)就是托管狀態(tài)(Managed State)

在 Flink 中锋爪,一個(gè)算子任務(wù)會按照并行度分為多個(gè)并行子任務(wù)執(zhí)行,而不同的子任務(wù)會占據(jù)不同的任務(wù)槽(task slot)爸业。由于不同的 slot 在計(jì)算資源上是物理隔離的其骄,所以 Flink能管理的狀態(tài)在并行任務(wù)間是無法共享的,每個(gè)狀態(tài)只能針對當(dāng)前子任務(wù)的實(shí)例有效扯旷。

而很多有狀態(tài)的操作(比如聚合拯爽、窗口)都是要先做 keyBy 進(jìn)行按鍵分區(qū)的。按鍵分區(qū)之后钧忽,任務(wù)所進(jìn)行的所有計(jì)算都應(yīng)該只針對當(dāng)前 key 有效毯炮,所以狀態(tài)也應(yīng)該按照 key 彼此隔離逼肯。在這種情況下,狀態(tài)的訪問方式又會有所不同桃煎。

基于這樣的想法篮幢,我們又可以將托管狀態(tài)分為兩類:算子狀態(tài)和按鍵分區(qū)狀態(tài)。

(1)算子狀態(tài)(Operator State)

狀態(tài)作用范圍限定為當(dāng)前的算子任務(wù)實(shí)例为迈,也就是只對當(dāng)前并行子任務(wù)實(shí)例有效三椿。這就意味著對于一個(gè)并行子任務(wù),占據(jù)了一個(gè)“分區(qū)”葫辐,它所處理的所有數(shù)據(jù)都會訪問到相同的狀態(tài)搜锰, 狀態(tài)對于同一任務(wù)而言是共享的,如圖所示另患。

算子狀態(tài)可以用在所有算子上氏捞,使用的時(shí)候其實(shí)就跟一個(gè)本地變量沒什么區(qū)別——因?yàn)楸镜刈兞康淖饔糜蛞彩钱?dāng)前任務(wù)實(shí)例彤侍。在使用時(shí)似将,我們還需進(jìn)一步實(shí)現(xiàn)CheckpointedFunction 接口烹俗。

(2)按鍵分區(qū)狀態(tài)(Keyed State)

狀態(tài)是根據(jù)輸入流中定義的鍵(key)來維護(hù)和訪問的颤介,所以只能定義在按鍵分區(qū)流(KeyedStream)中熟妓,也就keyBy之后才可以使用刁愿,如圖所示嘹害。

按鍵分區(qū)狀態(tài)應(yīng)用非常廣泛顽爹。聚合算子必須在 keyBy 之后才能使用纤泵,就是因?yàn)榫酆系慕Y(jié)果是以Keyed State 的形式保存的。另外镜粤,也可以通過富函數(shù)類(Rich Function) 來自定義Keyed State捏题,所以只要提供了富函數(shù)類接口的算子,也都可以使用 Keyed State肉渴。

所以即使是map公荧、filter 這樣無狀態(tài)的基本轉(zhuǎn)換算子,我們也可以通過富函數(shù)類給它們“追加”Keyed State同规,或者實(shí)現(xiàn) CheckpointedFunction 接口來定義Operator State循狰;從這個(gè)角度講, Flink 中所有的算子都可以是有狀態(tài)的券勺,不愧是“有狀態(tài)的流處理”绪钥。

無論是 Keyed State 還是 Operator State,它們都是在本地實(shí)例上維護(hù)的关炼,也就是說每個(gè)并行子任務(wù)維護(hù)著對應(yīng)的狀態(tài)程腹,算子的子任務(wù)之間狀態(tài)不共享。

二儒拂、按鍵分區(qū)狀態(tài)(Keyed State)

在實(shí)際應(yīng)用中寸潦,一般都需要將數(shù)據(jù)按照某個(gè) key 進(jìn)行分區(qū)缀去,然后再進(jìn)行計(jì)算處理;所以最為常見的狀態(tài)類型就是 Keyed State甸祭。之前介紹到 keyBy 之后的聚合缕碎、窗口計(jì)算,算子所持有的狀態(tài)池户,都是Keyed State咏雌。

另外,我們還可以通過富函數(shù)類(Rich Function)對轉(zhuǎn)換算子進(jìn)行擴(kuò)展校焦、實(shí)現(xiàn)自定義功能赊抖, 比如 RichMapFunction、RichFilterFunction寨典。在富函數(shù)中氛雪,我們可以調(diào)用.getRuntimeContext() 獲取當(dāng)前的運(yùn)行時(shí)上下文(RuntimeContext),進(jìn)而獲取到訪問狀態(tài)的句柄耸成;這種富函數(shù)中自定義的狀態(tài)也是 Keyed State报亩。

2.1 基本概念和特點(diǎn)

按鍵分區(qū)狀態(tài)(Keyed State)顧名思義,是任務(wù)按照鍵(key)來訪問和維護(hù)的狀態(tài)井氢。它的特點(diǎn)非常鮮明弦追,就是以key 為作用范圍進(jìn)行隔離。

在進(jìn)行按鍵分區(qū)(keyBy)之后花竞,具有相同鍵的所有數(shù)據(jù)劲件,都會分配到同一個(gè)并行子任務(wù)中;所以如果當(dāng)前任務(wù)定義了狀態(tài)约急,F(xiàn)link 就會在當(dāng)前并行子任務(wù)實(shí)例中零远,為每個(gè)鍵值維護(hù)一個(gè)狀態(tài)的實(shí)例。于是當(dāng)前任務(wù)就會為分配來的所有數(shù)據(jù)厌蔽,按照 key 維護(hù)和處理對應(yīng)的狀態(tài)牵辣。

因?yàn)橐粋€(gè)并行子任務(wù)可能會處理多個(gè) key 的數(shù)據(jù),所以 Flink 需要對 Keyed State 進(jìn)行一些特殊優(yōu)化躺枕。在底層服猪,Keyed State 類似于一個(gè)分布式的映射(map)數(shù)據(jù)結(jié)構(gòu),所有的狀態(tài)會根據(jù) key 保存成鍵值對(key-value)的形式拐云。這樣當(dāng)一條數(shù)據(jù)到來時(shí)罢猪,任務(wù)就會自動將狀態(tài)的訪問范圍限定為當(dāng)前數(shù)據(jù)的key,從 map 存儲中讀取出對應(yīng)的狀態(tài)值叉瘩。所以具有相同 key 的所有數(shù)據(jù)都會到訪問相同的狀態(tài)膳帕,而不同 key 的狀態(tài)之間是彼此隔離的。

這種將狀態(tài)綁定到key 上的方式,相當(dāng)于使得狀態(tài)和流的邏輯分區(qū)一一對應(yīng)了:不會有別的 key 的數(shù)據(jù)來訪問當(dāng)前狀態(tài)危彩;而當(dāng)前狀態(tài)對應(yīng) key 的數(shù)據(jù)也只會訪問這一個(gè)狀態(tài)攒磨,不會分發(fā)到其他分區(qū)去。這就保證了對狀態(tài)的操作都是本地進(jìn)行的汤徽,對數(shù)據(jù)流和狀態(tài)的處理做到了分區(qū)一致性娩缰。

另外,在應(yīng)用的并行度改變時(shí)谒府,狀態(tài)也需要隨之進(jìn)行重組拼坎。不同 key 對應(yīng)的 Keyed State 可以進(jìn)一步組成所謂的鍵組(key groups),每一組都對應(yīng)著一個(gè)并行子任務(wù)完疫。鍵組是 Flink 重新分配 Keyed State 的單元泰鸡,鍵組的數(shù)量就等于定義的最大并行度。當(dāng)算子并行度發(fā)生改變時(shí)壳鹤, Keyed State 就會按照當(dāng)前的并行度重新平均分配盛龄,保證運(yùn)行時(shí)各個(gè)子任務(wù)的負(fù)載相同。

需要注意芳誓,使用Keyed State 必須基于KeyedStream余舶。沒有進(jìn)行 keyBy 分區(qū)的DataStream, 即使轉(zhuǎn)換算子實(shí)現(xiàn)了對應(yīng)的富函數(shù)類兆沙,也不能通過運(yùn)行時(shí)上下文訪問Keyed State欧芽。

2.2 支持的結(jié)構(gòu)類型

實(shí)際應(yīng)用中,需要保存為狀態(tài)的數(shù)據(jù)會有各種各樣的類型葛圃,有時(shí)還需要復(fù)雜的集合類型, 比如列表(List)和映射(Map)憎妙。對于這些常見的用法库正,F(xiàn)link 的按鍵分區(qū)狀態(tài)(Keyed State)提供了足夠的支持。接下來我們就來了解一下 Keyed State 所支持的結(jié)構(gòu)類型.


Managed Keyed State 又分為六種類型

2.2.1 值狀態(tài)(ValueState)

顧名思義厘唾,狀態(tài)中只保存一個(gè)“值”(value)褥符。ValueState<T>本身是一個(gè)接口,源碼中定義如下:

public interface ValueState<T> extends State { 
    T value() throws IOException;
    void update(T value) throws IOException;
}

這里的 T 是泛型抚垃,表示狀態(tài)的數(shù)據(jù)內(nèi)容可以是任何具體的數(shù)據(jù)類型喷楣。如果想要保存一個(gè)長整型值作為狀態(tài),那么類型就是 ValueState<Long>鹤树。

我們可以在代碼中讀寫值狀態(tài)铣焊,實(shí)現(xiàn)對于狀態(tài)的訪問和更新。

  • T value():獲取當(dāng)前狀態(tài)的值罕伯;
  • update(T value):對狀態(tài)進(jìn)行更新曲伊,傳入的參數(shù) value 就是要覆寫的狀態(tài)值。

在具體使用時(shí)追他,為了讓運(yùn)行時(shí)上下文清楚到底是哪個(gè)狀態(tài)坟募,我們還需要創(chuàng)建一個(gè)“狀態(tài)描述器”(StateDescriptor)來提供狀態(tài)的基本信息岛蚤。例如源碼中,ValueState 的狀態(tài)描述器構(gòu)造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) { 
    super(name, typeClass, null);
}

這里需要傳入狀態(tài)的名稱和類型——這跟我們聲明一個(gè)變量時(shí)做的事情完全一樣懈糯。有了這個(gè)描述器涤妒,運(yùn)行時(shí)環(huán)境就可以獲取到狀態(tài)的控制句柄(handler)了。

2.2.2 列表狀態(tài)(ListState)

將需要保存的數(shù)據(jù)赚哗,以列表(List)的形式組織起來届腐。在 ListState<T>接口中同樣有一個(gè)類型參數(shù)T,表示列表中數(shù)據(jù)的類型蜂奸。ListState 也提供了一系列的方法來操作狀態(tài)犁苏,使用方式與一般的List 非常相似。

  • Iterable<T> get():獲取當(dāng)前的列表狀態(tài)扩所,返回的是一個(gè)可迭代類型 Iterable围详;
  • update(List<T> values):傳入一個(gè)列表values,直接對狀態(tài)進(jìn)行覆蓋祖屏;
  • add(T value):在狀態(tài)列表中添加一個(gè)元素 value助赞;
  • addAll(List<T> values):向列表中添加多個(gè)元素,以列表 values 形式傳入袁勺。

類似地雹食,ListState 的狀態(tài)描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致期丰。

2.2.3 映射狀態(tài)(MapState)

把一些鍵值對(key-value)作為狀態(tài)整體保存起來群叶,可以認(rèn)為就是一組 key-value 映射的列表。對應(yīng)的 MapState<UK, UV>接口中钝荡,就會有 UK街立、UV 兩個(gè)泛型,分別表示保存的 key 和 value 的類型埠通。同樣赎离,MapState 提供了操作映射狀態(tài)的方法,與 Map 的使用非常類似端辱。

  • UV get(UK key):傳入一個(gè) key 作為參數(shù)梁剔,查詢對應(yīng)的 value 值;
  • put(UK key, UV value):傳入一個(gè)鍵值對舞蔽,更新 key 對應(yīng)的 value 值荣病;
  • putAll(Map<UK, UV> map):將傳入的映射 map 中所有的鍵值對,全部添加到映射狀態(tài)中喷鸽;
  • remove(UK key):將指定 key 對應(yīng)的鍵值對刪除众雷;
  • boolean contains(UK key):判斷是否存在指定的 key,返回一個(gè) boolean 值。

另外砾省,MapState 也提供了獲取整個(gè)映射相關(guān)信息的方法:

  • Iterable<Map.Entry<UK, UV>> entries():獲取映射狀態(tài)中所有的鍵值對鸡岗;
  • Iterable<UK> keys():獲取映射狀態(tài)中所有的鍵(key),返回一個(gè)可迭代 Iterable 類型编兄;
  • Iterable<UV> values():獲取映射狀態(tài)中所有的值(value)轩性,返回一個(gè)可迭代 Iterable類型;
  • boolean isEmpty():判斷映射是否為空狠鸳,返回一個(gè) boolean 值揣苏。

2.2.4 歸約狀態(tài)(ReducingState)

類似于值狀態(tài)(Value),不過需要對添加進(jìn)來的所有數(shù)據(jù)進(jìn)行歸約件舵,將歸約聚合之后的值作為狀態(tài)保存下來卸察。ReducintState<T>這個(gè)接口調(diào)用的方法類似于 ListState,只不過它保存的只是一個(gè)聚合值铅祸,所以調(diào)用.add()方法時(shí)坑质,不是在狀態(tài)列表里添加元素,而是直接把新數(shù)據(jù)和之前的狀態(tài)進(jìn)行歸約临梗,并用得到的結(jié)果更新狀態(tài)涡扼。

歸約邏輯的定義,是在歸約狀態(tài)描述器(ReducingStateDescriptor)中盟庞,通過傳入一個(gè)歸約函數(shù)(ReduceFunction)來實(shí)現(xiàn)的吃沪。這里的歸約函數(shù),就是我們之前介紹 reduce 聚合算子時(shí)講到的 ReduceFunction什猖,所以狀態(tài)類型跟輸入的數(shù)據(jù)類型是一樣的票彪。

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
    ......
}

這里的描述器有三個(gè)參數(shù),其中第二個(gè)參數(shù)就是定義了歸約聚合邏輯的ReduceFunction卸伞, 另外兩個(gè)參數(shù)則是狀態(tài)的名稱和類型抹镊。

2.2.5 聚合狀態(tài)(AggregatingState)

與歸約狀態(tài)非常類似,聚合狀態(tài)也是一個(gè)值荤傲,用來保存添加進(jìn)來的所有數(shù)據(jù)的聚合結(jié)果。與 ReducingState 不同的是颈渊,它的聚合邏輯是由在描述器中傳入一個(gè)更加一般化的聚合函數(shù)(AggregateFunction)來定義的遂黍;這也就是之前我們講過的 AggregateFunction,里面通過一個(gè)累加器(Accumulator)來表示狀態(tài)俊嗽,所以聚合的狀態(tài)類型可以跟添加進(jìn)來的數(shù)據(jù)類型完全不同雾家,使用更加靈活。

同樣地绍豁,AggregatingState 接口調(diào)用方法也與ReducingState 相同芯咧,調(diào)用.add()方法添加元素時(shí),會直接使用指定的AggregateFunction 進(jìn)行聚合并更新狀態(tài)。

2.3 代碼實(shí)現(xiàn)

2.3.1 整體介紹

在 Flink 中敬飒,狀態(tài)始終是與特定算子相關(guān)聯(lián)的邪铲;算子在使用狀態(tài)前首先需要“注冊”,其實(shí)就是告訴 Flink 當(dāng)前上下文中定義狀態(tài)的信息无拗,這樣運(yùn)行時(shí)的 Flink 才能知道算子有哪些狀態(tài)带到。

狀態(tài)的注冊,主要是通過“狀態(tài)描述器”(StateDescriptor)來實(shí)現(xiàn)的英染。狀態(tài)描述器中最重要的內(nèi)容揽惹,就是狀態(tài)的名稱(name)和類型(type)。我們知道 Flink 中的狀態(tài)四康,可以認(rèn)為是加了一些復(fù)雜操作的內(nèi)存中的變量搪搏;而當(dāng)我們在代碼中聲明一個(gè)局部變量時(shí),都需要指定變量類型和名稱闪金,名稱就代表了變量在內(nèi)存中的地址疯溺,類型則指定了占據(jù)內(nèi)存空間的大小。同樣地毕泌, 我們一旦指定了名稱和類型喝检,F(xiàn)link 就可以在運(yùn)行時(shí)準(zhǔn)確地在內(nèi)存中找到對應(yīng)的狀態(tài),進(jìn)而返回狀態(tài)對象供我們使用了撼泛。所以在一個(gè)算子中挠说,我們也可以定義多個(gè)狀態(tài),只要它們的名稱不同就可以了愿题。

另外损俭,狀態(tài)描述器中還可能需要傳入一個(gè)用戶自定義函數(shù)(user-defined-function,UDF)潘酗,用來說明處理邏輯杆兵,比如前面提到的 ReduceFunction 和AggregateFunction。

以 ValueState 為例仔夺,我們可以定義值狀態(tài)描述器如下:

ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>( 
    "my state", // 狀態(tài)名稱
    Types.LONG // 狀態(tài)類型
);

這里我們定義了一個(gè)叫作“my state”的長整型 ValueState 的描述器琐脏。

代碼中完整的操作是,首先定義出狀態(tài)描述器缸兔;然后調(diào)用.getRuntimeContext()方法獲取運(yùn)行時(shí)上下文日裙;繼而調(diào)用 RuntimeContext 的獲取狀態(tài)的方法,將狀態(tài)描述器傳入惰蜜,就可以得到對應(yīng)的狀態(tài)了昂拂。

因?yàn)闋顟B(tài)的訪問需要獲取運(yùn)行時(shí)上下文,這只能在富函數(shù)類(Rich Function)中獲取到抛猖, 所以自定義的Keyed State 只能在富函數(shù)中使用格侯。當(dāng)然鼻听,底層的處理函數(shù)(Process Function) 本身繼承了AbstractRichFunction 抽象類,所以也可以使用联四。

在富函數(shù)中撑碴,調(diào)用.getRuntimeContext()方法獲取到運(yùn)行時(shí)上下文之后,RuntimeContext 有以下幾個(gè)獲取狀態(tài)的方法:

ValueState<T> getState(ValueStateDescriptor<T>) 
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>) 
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

對于不同結(jié)構(gòu)類型的狀態(tài)碎连,只要傳入對應(yīng)的描述器灰羽、調(diào)用對應(yīng)的方法就可以了。
獲取到狀態(tài)對象之后鱼辙,就可以調(diào)用它們各自的方法進(jìn)行讀寫操作了廉嚼。另外,所有類型的狀態(tài)都有一個(gè)方法.clear()倒戏,用于清除當(dāng)前狀態(tài)怠噪。

代碼中使用狀態(tài)的整體結(jié)構(gòu)如下:

public static class MyFlatMapFunction extends RichFlatMapFunction<Long, String> {
    // 聲明狀態(tài)
    private transient ValueState<Long> state;


    @Override
    public void open(Configuration config) {
        // 在 open 生命周期方法中獲取狀態(tài)
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "my state", // 狀態(tài)名稱
            Types.LONG // 狀態(tài)類型
        );

        state = getRuntimeContext().getState(descriptor);

    }


    @Override
    public void flatMap(Long input, Collector<String> out) throws Exception {
        // 訪問狀態(tài)
        Long currentState = state.value();
        currentState += 1;  // 狀態(tài)數(shù)值加 1
        // 更新狀態(tài)
        state.update(currentState); if (currentState >= 100) {
            out.collect(“state: ” + currentState); state.clear();   // 清空狀態(tài)
        }

    }
}

因?yàn)?RichFlatmapFunction 中的.flatmap()是每來一條數(shù)據(jù)都會調(diào)用一次的,所以我們不應(yīng)該在這里調(diào)用運(yùn)行時(shí)上下文的.getState()方法杜跷,而是在生命周期方法.open()中獲取狀態(tài)對象傍念。另外還有一個(gè)問題,我們獲取到的狀態(tài)對象也需要有一個(gè)變量名稱 state(注意這里跟狀態(tài)的名稱 my state 不同)葛闷,但這個(gè)變量不應(yīng)該在 open 中聲明——否則在.flatmap()里就訪問不到了憋槐。所以我們還需要在外面直接把它定義為類的屬性,這樣就可以在不同的方法中通用了淑趾。而在外部又不能直接獲取狀態(tài)阳仔,因?yàn)榫幾g時(shí)是無法拿到運(yùn)行時(shí)上下文的。所以最終的解決方案就變成了: 在外部聲明狀態(tài)對象扣泊,在open 生命周期方法中通過運(yùn)行時(shí)上下文獲取狀態(tài)近范。

這里需要注意,這種方式定義的都是 Keyed State延蟹,它對于每個(gè) key 都會保存一份狀態(tài)實(shí)例评矩。所以對狀態(tài)進(jìn)行讀寫操作時(shí),獲取到的狀態(tài)跟當(dāng)前輸入數(shù)據(jù)的 key 有關(guān)阱飘;只有相同 key 的數(shù)據(jù)斥杜,才會操作同一個(gè)狀態(tài),不同 key 的數(shù)據(jù)訪問到的狀態(tài)值是不同的沥匈。而且上面提到的.clear()方法果录,也只會清除當(dāng)前 key 對應(yīng)的狀態(tài)。

另外咐熙,狀態(tài)不一定都存儲在內(nèi)存中,也可以放在磁盤或其他地方辨萍,具體的位置是由一個(gè)可配置的組件來管理的棋恼,這個(gè)組件叫作“狀態(tài)后端”(State Backend)返弹。關(guān)于狀態(tài)后端,我們后面詳細(xì)介紹爪飘。

下面我們給出不同類型的狀態(tài)的應(yīng)用實(shí)例义起。

2.3.2 值狀態(tài)(ValueState)

我們這里會使用用戶 id 來進(jìn)行分流,然后分別統(tǒng)計(jì)每個(gè)用戶的 pv 數(shù)據(jù)师崎,由于我們并不想每次 pv 加一默终,就將統(tǒng)計(jì)結(jié)果發(fā)送到下游去,所以這里我們注冊了一個(gè)定時(shí)器犁罩,用來隔一段時(shí)間發(fā)送 pv 的統(tǒng)計(jì)結(jié)果齐蔽,這樣對下游算子的壓力不至于太大。具體實(shí)現(xiàn)方式是定義一個(gè)用來保存定時(shí)器時(shí)間戳的值狀態(tài)變量床估。當(dāng)定時(shí)器觸發(fā)并向下游發(fā)送數(shù)據(jù)以后含滴,便清空儲存定時(shí)器時(shí)間戳的狀態(tài)變量,這樣當(dāng)新的數(shù)據(jù)到來時(shí)丐巫,發(fā)現(xiàn)并沒有定時(shí)器存在谈况,就可以注冊新的定時(shí)器了, 注冊完定時(shí)器之后將定時(shí)器的時(shí)間戳繼續(xù)保存在狀態(tài)變量中递胧。

import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;


public class PeriodicPvExample{

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.print("input");

        // 統(tǒng)計(jì)每個(gè)用戶的pv碑韵,隔一段時(shí)間(10s)輸出一次結(jié)果
        stream.keyBy(data -> data.user)
                .process(new PeriodicPvResult())
                .print();

        env.execute();
    }

    // 注冊定時(shí)器,周期性輸出pv
    public static class PeriodicPvResult extends KeyedProcessFunction<String ,Event, String>{
        // 定義兩個(gè)狀態(tài)缎脾,保存當(dāng)前pv值祝闻,以及定時(shí)器時(shí)間戳
        ValueState<Long> countState;
        ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception {
            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
        }

        @Override
        public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
            // 更新count值
            Long count = countState.value();
            if (count == null){
                countState.update(1L);
            } else {
                countState.update(count + 1);
            }
            // 注冊定時(shí)器
            if (timerTsState.value() == null){
                ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
                timerTsState.update(value.timestamp + 10 * 1000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            out.collect(ctx.getCurrentKey() + " pv: " + countState.value());
            // 清空狀態(tài)
            timerTsState.clear();
        }
    }
}

2.3.3 列表狀態(tài)(ListState)

在 Flink SQL 中,支持兩條流的全量 Join赊锚,語法如下:

SELECT * FROM A INNER JOIN B WHERE A.id = B.id治筒;

這樣一條 SQL 語句要慎用,因?yàn)?Flink 會將 A 流和 B 流的所有數(shù)據(jù)都保存下來舷蒲,然后進(jìn)行 Join耸袜。不過在這里我們可以用列表狀態(tài)變量來實(shí)現(xiàn)一下這個(gè) SQL 語句的功能。代碼如下:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class TwoStreamFullJoinExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env
                .fromElements(
                        Tuple3.of("a", "stream-1", 1000L),
                        Tuple3.of("b", "stream-1", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                        return t.f2;
                                    }
                                })
                );

        SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env
                .fromElements(
                        Tuple3.of("a", "stream-2", 3000L),
                        Tuple3.of("b", "stream-2", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                        return t.f2;
                                    }
                                })
                );

        stream1.keyBy(r -> r.f0)
                .connect(stream2.keyBy(r -> r.f0))
                .process(new CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    private ListState<Tuple3<String, String, Long>> stream1ListState;
                    private ListState<Tuple3<String, String, Long>> stream2ListState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        stream1ListState = getRuntimeContext().getListState(
                                new ListStateDescriptor<Tuple3<String, String, Long>>("stream1-list", Types.TUPLE(Types.STRING, Types.STRING))
                        );
                        stream2ListState = getRuntimeContext().getListState(
                                new ListStateDescriptor<Tuple3<String, String, Long>>("stream2-list", Types.TUPLE(Types.STRING, Types.STRING))
                        );
                    }

                    @Override
                    public void processElement1(Tuple3<String, String, Long> left, Context context, Collector<String> collector) throws Exception {
                        stream1ListState.add(left);
                        for (Tuple3<String, String, Long> right : stream2ListState.get()) {
                            collector.collect(left + " => " + right);
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
                        stream2ListState.add(right);
                        for (Tuple3<String, String, Long> left : stream1ListState.get()) {
                            collector.collect(left + " => " + right);
                        }
                    }
                })
                .print();

        env.execute();
    }
}

2.3.4 映射狀態(tài)(MapState)

映射狀態(tài)的用法和 Java 中的 HashMap 很相似牲平。在這里我們可以通過 MapState 的使用來探索一下窗口的底層實(shí)現(xiàn)堤框,也就是我們要用映射狀態(tài)來完整模擬窗口的功能。這里我們模擬一個(gè)滾動窗口纵柿。我們要計(jì)算的是每一個(gè) url 在每一個(gè)窗口中的 pv 數(shù)據(jù)蜈抓。我們之前使用增量聚合和全窗口聚合結(jié)合的方式實(shí)現(xiàn)過這個(gè)需求。這里我們用 MapState 再來實(shí)現(xiàn)一下昂儒。

import com.atguigu.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.util.Collector;

import java.sql.Timestamp;


// 使用KeyedProcessFunction模擬滾動窗口
public class FakeWindowExample {


    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        // 統(tǒng)計(jì)每10s窗口內(nèi)沟使,每個(gè)url的pv
        stream.keyBy(data -> data.url)
                .process(new FakeWindowResult(10000L))
                .print();

        env.execute();
    }

    public static class FakeWindowResult extends KeyedProcessFunction<String, Event, String>{
        // 定義屬性,窗口長度
        private Long windowSize;

        public FakeWindowResult(Long windowSize) {
            this.windowSize = windowSize;
        }

        // 聲明狀態(tài)渊跋,用map保存pv值(窗口start腊嗡,count)
        MapState<Long, Long> windowPvMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            windowPvMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class));
        }

        @Override
        public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
            // 每來一條數(shù)據(jù)着倾,就根據(jù)時(shí)間戳判斷屬于哪個(gè)窗口
            Long windowStart = value.timestamp / windowSize * windowSize;
            Long windowEnd = windowStart + windowSize;

            // 注冊 end -1 的定時(shí)器,窗口觸發(fā)計(jì)算
            ctx.timerService().registerEventTimeTimer(windowEnd - 1);

            // 更新狀態(tài)中的pv值
            if (windowPvMapState.contains(windowStart)){
                Long pv = windowPvMapState.get(windowStart);
                windowPvMapState.put(windowStart, pv + 1);
            } else {
                windowPvMapState.put(windowStart, 1L);
            }
        }

        // 定時(shí)器觸發(fā)燕少,直接輸出統(tǒng)計(jì)的pv結(jié)果
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            Long windowEnd = timestamp + 1;
            Long windowStart = windowEnd - windowSize;
            Long pv = windowPvMapState.get(windowStart);
            out.collect( "url: " + ctx.getCurrentKey()
                    + " 訪問量: " + pv
                    + " 窗口:" + new Timestamp(windowStart) + " ~ " + new Timestamp(windowEnd));

            // 模擬窗口的銷毀卡者,清除map中的key
            windowPvMapState.remove(windowStart);
        }
    }    
}

2.3.5 聚合狀態(tài)(AggregatingState)

我們舉一個(gè)簡單的例子,對用戶點(diǎn)擊事件流每 5 個(gè)數(shù)據(jù)統(tǒng)計(jì)一次平均時(shí)間戳客们。這是一個(gè)類似計(jì)數(shù)窗口( CountWindow) 求平均值的計(jì)算崇决, 這里我們可以使用一個(gè)有聚合狀態(tài)的RichFlatMapFunction 來實(shí)現(xiàn)檐盟。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

public class AverageTimestampExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );


        // 統(tǒng)計(jì)每個(gè)用戶的點(diǎn)擊頻次秤掌,到達(dá)5次就輸出統(tǒng)計(jì)結(jié)果
        stream.keyBy(data -> data.user)
                .flatMap(new AvgTsResult())
                .print();

        env.execute();
    }

    public static class AvgTsResult extends RichFlatMapFunction<Event, String>{
        // 定義聚合狀態(tài),用來計(jì)算平均時(shí)間戳
        AggregatingState<Event, Long> avgTsAggState;

        // 定義一個(gè)值狀態(tài)汇鞭,用來保存當(dāng)前用戶訪問頻次
        ValueState<Long> countState;

        @Override
        public void open(Configuration parameters) throws Exception {
            avgTsAggState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
                    "avg-ts",
                    new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
                        @Override
                        public Tuple2<Long, Long> createAccumulator() {
                            return Tuple2.of(0L, 0L);
                        }

                        @Override
                        public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
                            return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1);
                        }

                        @Override
                        public Long getResult(Tuple2<Long, Long> accumulator) {
                            return accumulator.f0 / accumulator.f1;
                        }

                        @Override
                        public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
                            return null;
                        }
                    },
                    Types.TUPLE(Types.LONG, Types.LONG)
            ));

            countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
        }

        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
            Long count = countState.value();
            if (count == null){
                count = 1L;
            } else {
                count ++;
            }

            countState.update(count);
            avgTsAggState.add(value);

            // 達(dá)到5次就輸出結(jié)果凄敢,并清空狀態(tài)
            if (count == 5){
                out.collect(value.user + " 平均時(shí)間戳:" + new Timestamp(avgTsAggState.get()));
                countState.clear();
            }
        }
    }
}

2.4. 狀態(tài)生存時(shí)間(TTL)

實(shí)際應(yīng)用中碌冶,很多狀態(tài)會隨著時(shí)間的推移逐漸增長,如果不加以限制涝缝,最終就會導(dǎo)致存儲空間的耗盡扑庞。一個(gè)優(yōu)化的思路是直接在代碼中調(diào)用.clear()方法去清除狀態(tài),但是有時(shí)候我們的邏輯要求不能直接清除拒逮。這時(shí)就需要配置一個(gè)狀態(tài)的“生存時(shí)間”(time-to-live罐氨,TTL),當(dāng)狀態(tài)在內(nèi)存中存在的時(shí)間超出這個(gè)值時(shí)滩援,就將它清除栅隐。

具體實(shí)現(xiàn)上,如果用一個(gè)進(jìn)程不停地掃描所有狀態(tài)看是否過期玩徊,顯然會占用大量資源做無用功租悄。狀態(tài)的失效其實(shí)不需要立即刪除,所以我們可以給狀態(tài)附加一個(gè)屬性恩袱,也就是狀態(tài)的“失效時(shí)間”泣棋。狀態(tài)創(chuàng)建的時(shí)候,設(shè)置 失效時(shí)間 = 當(dāng)前時(shí)間 + TTL畔塔;之后如果有對狀態(tài)的訪問和修改潭辈,我們可以再對失效時(shí)間進(jìn)行更新;當(dāng)設(shè)置的清除條件被觸發(fā)時(shí)(比如澈吨,狀態(tài)被訪問的時(shí)候把敢,或者每隔一段時(shí)間掃描一次失效狀態(tài)),就可以判斷狀態(tài)是否失效谅辣、從而進(jìn)行清除了修赞。

配置狀態(tài)的 TTL 時(shí),需要創(chuàng)建一個(gè) StateTtlConfig 配置對象桑阶,然后調(diào)用狀態(tài)描述器的.enableTimeToLive()方法啟動TTL 功能榔组。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);

stateDescriptor.enableTimeToLive(ttlConfig);

這里用到了幾個(gè)配置項(xiàng):

  • .newBuilder()
    狀態(tài)TTL 配置的構(gòu)造器方法熙尉,必須調(diào)用,返回一個(gè) Builder 之后再調(diào)用.build()方法就可以得到 StateTtlConfig 了搓扯。方法需要傳入一個(gè) Time 作為參數(shù),這就是設(shè)定的狀態(tài)生存時(shí)間包归。

  • .setUpdateType()
    設(shè)置更新類型锨推。更新類型指定了什么時(shí)候更新狀態(tài)失效時(shí)間,這里的 OnCreateAndWrite 表示只有創(chuàng)建狀態(tài)和更改狀態(tài)(寫操作)時(shí)更新失效時(shí)間公壤。另一種類型OnReadAndWrite 則表示無論讀寫操作都會更新失效時(shí)間换可,也就是只要對狀態(tài)進(jìn)行了訪問,就表明它是活躍的厦幅,從而延長生存時(shí)間沾鳄。這個(gè)配置默認(rèn)為 OnCreateAndWrite。

  • .setStateVisibility()
    設(shè)置狀態(tài)的可見性确憨。所謂的“狀態(tài)可見性”译荞,是指因?yàn)榍宄僮鞑⒉皇菍?shí)時(shí)的,所以當(dāng)狀態(tài)過期之后還有可能基于存在休弃,這時(shí)如果對它進(jìn)行訪問吞歼,能否正常讀取到就是一個(gè)問題了。這里設(shè)置的NeverReturnExpired 是默認(rèn)行為塔猾,表示從不返回過期值篙骡,也就是只要過期就認(rèn)為它已經(jīng)被清除了,應(yīng)用不能繼續(xù)讀日傻椤糯俗;這在處理會話或者隱私數(shù)據(jù)時(shí)比較重要。對應(yīng)的另一種配置是ReturnExpireDefNotCleanedUp睦擂,就是如果過期狀態(tài)還存在得湘,就返回它的值。

除此之外祈匙,TTL 配置還可以設(shè)置在保存檢查點(diǎn)(checkpoint)時(shí)觸發(fā)清除操作忽刽,或者配置增量的清理(incremental cleanup),還可以針對 RocksDB 狀態(tài)后端使用壓縮過濾器(compaction filter)進(jìn)行后臺清理夺欲。

這里需要注意跪帝,目前的 TTL 設(shè)置只支持處理時(shí)間。另外些阅,所有集合類型的狀態(tài)(例如ListState伞剑、MapState)在設(shè)置 TTL 時(shí),都是針對每一項(xiàng)(per-entry)元素的市埋。也就是說黎泣,一個(gè)列表狀態(tài)中的每一個(gè)元素恕刘,都會以自己的失效時(shí)間來進(jìn)行清理,而不是整個(gè)列表一起清理抒倚。

三褐着、算子狀態(tài)(Operator State)

3.1 基本概念和特點(diǎn)

算子狀態(tài)(Operator State)就是一個(gè)算子并行實(shí)例上定義的狀態(tài),作用范圍被限定為當(dāng)前算子任務(wù)托呕。算子狀態(tài)跟數(shù)據(jù)的 key 無關(guān)含蓉,所以不同 key 的數(shù)據(jù)只要被分發(fā)到同一個(gè)并行子任務(wù), 就會訪問到同一個(gè)Operator State项郊。

算子狀態(tài)的實(shí)際應(yīng)用場景不如Keyed State 多馅扣,一般用在 Source 或 Sink 等與外部系統(tǒng)連接的算子上,或者完全沒有 key 定義的場景着降。比如 Flink 的 Kafka 連接器中差油,就用到了算子狀態(tài)。在我們給 Source 算子設(shè)置并行度后任洞,Kafka 消費(fèi)者的每一個(gè)并行實(shí)例蓄喇,都會為對應(yīng)的主題(topic)分區(qū)維護(hù)一個(gè)偏移量, 作為算子狀態(tài)保存起來侈咕。這在保證 Flink 應(yīng)用“精確一次”(exactly-once)狀態(tài)一致性時(shí)非常有用公罕。

當(dāng)算子的并行度發(fā)生變化時(shí),算子狀態(tài)也支持在并行的算子任務(wù)實(shí)例之間做重組分配耀销。根據(jù)狀態(tài)的類型不同楼眷,重組分配的方案也會不同。

3.2 狀態(tài)類型

算子狀態(tài)也支持不同的結(jié)構(gòu)類型熊尉,主要有三種:ListState罐柳、UnionListState 和 BroadcastState。

3.2.1 列表狀態(tài)(ListState)

與 Keyed State 中的 ListState 一樣狰住,將狀態(tài)表示為一組數(shù)據(jù)的列表张吉。

與 Keyed State 中的列表狀態(tài)的區(qū)別是:在算子狀態(tài)的上下文中,不會按鍵(key)分別處理狀態(tài)催植,所以每一個(gè)并行子任務(wù)上只會保留一個(gè)“列表”(list)肮蛹,也就是當(dāng)前并行子任務(wù)上所有狀態(tài)項(xiàng)的集合。列表中的狀態(tài)項(xiàng)就是可以重新分配的最細(xì)粒度创南,彼此之間完全獨(dú)立伦忠。

當(dāng)算子并行度進(jìn)行縮放調(diào)整時(shí),算子的列表狀態(tài)中的所有元素項(xiàng)會被統(tǒng)一收集起來稿辙,相當(dāng)于把多個(gè)分區(qū)的列表合并成了一個(gè)“大列表”昆码,然后再均勻地分配給所有并行任務(wù)。這種“均勻分配”的具體方法就是“輪詢”(round-robin),與之前介紹的 rebanlance 數(shù)據(jù)傳輸方式類似赋咽,是通過逐一“發(fā)牌”的方式將狀態(tài)項(xiàng)平均分配的旧噪。這種方式也叫作“平均分割重組”(even-split redistribution)。

算子狀態(tài)中不會存在“鍵組”(key group)這樣的結(jié)構(gòu)脓匿,所以為了方便重組分配淘钟,就把它直接定義成了“列表”( list)。這也就解釋了亦镶,為什么算子狀態(tài)中沒有最簡單的值狀態(tài)(ValueState)日月。

3.2.2 聯(lián)合列表狀態(tài)(UnionListState)

與 ListState 類似,聯(lián)合列表狀態(tài)也會將狀態(tài)表示為一個(gè)列表缤骨。它與常規(guī)列表狀態(tài)的區(qū)別在于,算子并行度進(jìn)行縮放調(diào)整時(shí)對于狀態(tài)的分配方式不同尺借。

UnionListState 的重點(diǎn)就在于“聯(lián)合”(union)绊起。在并行度調(diào)整時(shí),常規(guī)列表狀態(tài)是輪詢分配狀態(tài)項(xiàng)燎斩,而聯(lián)合列表狀態(tài)的算子則會直接廣播狀態(tài)的完整列表虱歪。這樣,并行度縮放之后的并行子任務(wù)就獲取到了聯(lián)合后完整的“大列表”栅表,可以自行選擇要使用的狀態(tài)項(xiàng)和要丟棄的狀態(tài)項(xiàng)笋鄙。這種分配也叫作“聯(lián)合重組”(union redistribution)。如果列表中狀態(tài)項(xiàng)數(shù)量太多怪瓶,為資源和效率考慮一般不建議使用聯(lián)合重組的方式萧落。

3.2.3 廣播狀態(tài)(BroadcastState)

有時(shí)我們希望算子并行子任務(wù)都保持同一份“全局”狀態(tài),用來做統(tǒng)一的配置和規(guī)則設(shè)定洗贰。這時(shí)所有分區(qū)的所有數(shù)據(jù)都會訪問到同一個(gè)狀態(tài)找岖,狀態(tài)就像被“廣播”到所有分區(qū)一樣,這種特殊的算子狀態(tài)敛滋,就叫作廣播狀態(tài)(BroadcastState)许布。

因?yàn)閺V播狀態(tài)在每個(gè)并行子任務(wù)上的實(shí)例都一樣,所以在并行度調(diào)整的時(shí)候就比較簡單绎晃, 只要復(fù)制一份到新的并行任務(wù)就可以實(shí)現(xiàn)擴(kuò)展蜜唾;而對于并行度縮小的情況,可以將多余的并行子任務(wù)連同狀態(tài)直接砍掉——因?yàn)闋顟B(tài)都是復(fù)制出來的庶艾,并不會丟失袁余。

在底層,廣播狀態(tài)是以類似映射結(jié)構(gòu)(map)的鍵值對(key-value)來保存的落竹,必須基于一個(gè)“廣播流”(BroadcastStream)來創(chuàng)建泌霍。

3.3 代碼實(shí)現(xiàn)

狀態(tài)從本質(zhì)上來說就是算子并行子任務(wù)實(shí)例上的一個(gè)特殊本地變量。它的特殊之處就在于 Flink 會提供完整的管理機(jī)制,來保證它的持久化保存朱转,以便發(fā)生故障時(shí)進(jìn)行狀態(tài)恢復(fù)蟹地;另外還可以針對不同的 key 保存獨(dú)立的狀態(tài)實(shí)例。按鍵分區(qū)狀態(tài)(Keyed State)對這兩個(gè)功能都要考慮藤为;而算子狀態(tài)(Operator State)并不考慮 key 的影響怪与,所以主要任務(wù)就是要讓 Flink 了解狀態(tài)的信息、將狀態(tài)數(shù)據(jù)持久化后保存到外部存儲空間缅疟。

看起來算子狀態(tài)的使用應(yīng)該更加簡單才對分别。不過仔細(xì)思考又會發(fā)現(xiàn)一個(gè)問題:我們對狀態(tài)進(jìn)行持久化保存的目的是為了故障恢復(fù);在發(fā)生故障存淫、重啟應(yīng)用后耘斩,數(shù)據(jù)還會被發(fā)往之前分配的分區(qū)嗎?顯然不是桅咆,因?yàn)椴⑿卸瓤赡馨l(fā)生了調(diào)整括授,不論是按鍵(key)的哈希值分區(qū),還是直接輪詢(round-robin)分區(qū)岩饼,數(shù)據(jù)分配到的分區(qū)都會發(fā)生變化荚虚。這很好理解,當(dāng)打牌的人數(shù)從 3 個(gè)增加到 4 個(gè)時(shí)籍茧,即使牌的次序不變版述,輪流發(fā)到每個(gè)人手里的牌也會不同。數(shù)據(jù)分區(qū)發(fā)生變化寞冯,帶來的問題就是渴析,怎么保證原先的狀態(tài)跟故障恢復(fù)后數(shù)據(jù)的對應(yīng)關(guān)系呢?

對于Keyed State 這個(gè)問題很好解決:狀態(tài)都是跟 key 相關(guān)的简十,而相同key 的數(shù)據(jù)不管發(fā)往哪個(gè)分區(qū)檬某,總是會全部進(jìn)入一個(gè)分區(qū)的;于是只要將狀態(tài)也按照 key 的哈希值計(jì)算出對應(yīng)的分區(qū)螟蝙,進(jìn)行重組分配就可以了恢恼。恢復(fù)狀態(tài)后繼續(xù)處理數(shù)據(jù)胰默,就總能按照 key 找到對應(yīng)之前的狀態(tài)场斑,就保證了結(jié)果的一致性。所以 Flink 對Keyed State 進(jìn)行了非常完善的包裝牵署,我們不需實(shí)現(xiàn)任何接口就可以直接使用漏隐。

而對于 Operator State 來說就會有所不同。因?yàn)椴淮嬖?key奴迅,所有數(shù)據(jù)發(fā)往哪個(gè)分區(qū)是不可預(yù)測的青责;也就是說,當(dāng)發(fā)生故障重啟之后,我們不能保證某個(gè)數(shù)據(jù)跟之前一樣脖隶,進(jìn)入到同一個(gè)并行子任務(wù)扁耐、訪問同一個(gè)狀態(tài)。所以 Flink 無法直接判斷該怎樣保存和恢復(fù)狀態(tài)产阱,而是提供了接口婉称,讓我們根據(jù)業(yè)務(wù)需求自行設(shè)計(jì)狀態(tài)的快照保存(snapshot)和恢復(fù)(restore)邏輯。

3.3.1 CheckpointedFunction 接口

在 Flink 中构蹬,對狀態(tài)進(jìn)行持久化保存的快照機(jī)制叫作“檢查點(diǎn)”(Checkpoint)王暗。于是使用算子狀態(tài)時(shí),就需要對檢查點(diǎn)的相關(guān)操作進(jìn)行定義庄敛,實(shí)現(xiàn)一個(gè) CheckpointedFunction 接口俗壹。

CheckpointedFunction 接口在源碼中定義如下:

public interface CheckpointedFunction {
// 保存狀態(tài)快照到檢查點(diǎn)時(shí),調(diào)用這個(gè)方法
void snapshotState(FunctionSnapshotContext context) throws Exception
// 初始化狀態(tài)時(shí)調(diào)用這個(gè)方法藻烤,也會在恢復(fù)狀態(tài)時(shí)調(diào)用
void    initializeState(FunctionInitializationContext context) throws Exception;

每次應(yīng)用保存檢查點(diǎn)做快照時(shí)策肝,都會調(diào)用.snapshotState()方法,將狀態(tài)進(jìn)行外部持久化隐绵。而在算子任務(wù)進(jìn)行初始化時(shí),會調(diào)用. initializeState()方法拙毫。這又有兩種情況:一種是整個(gè)應(yīng)用第一次運(yùn)行依许,這時(shí)狀態(tài)會被初始化為一個(gè)默認(rèn)值(default value);另一種是應(yīng)用重啟時(shí)缀蹄,從檢查點(diǎn)(checkpoint)或者保存點(diǎn)(savepoint)中讀取之前狀態(tài)的快照峭跳,并賦給本地狀態(tài)。所以缺前, 接口中的.snapshotState()方法定義了檢查點(diǎn)的快照保存邏輯蛀醉,而. initializeState()方法不僅定義了初始化邏輯,也定義了恢復(fù)邏輯衅码。

這里需要注意拯刁,CheckpointedFunction 接口中的兩個(gè)方法,分別傳入了一個(gè)上下文(context)作為參數(shù)逝段。不同的是垛玻,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext, 它可以提供檢查點(diǎn)的相關(guān)信息奶躯,不過無法獲取狀態(tài)句柄帚桩;而. initializeState()方法拿到的是FunctionInitializationContext,這是函數(shù)類進(jìn)行初始化時(shí)的上下文嘹黔,是真正的“運(yùn)行時(shí)上下文”账嚎。FunctionInitializationContext 中提供了“算子狀態(tài)存儲”(OperatorStateStore)和“按鍵分區(qū)狀態(tài)
存儲(” KeyedStateStore),在這兩個(gè)存儲對象中可以非常方便地獲取當(dāng)前任務(wù)實(shí)例中的 Operator State 和Keyed State。例如:

ListStateDescriptor<String> descriptor = new ListStateDescriptor<>(
    "buffered-elements",
    Types.of(String)
    );
ListState<String> checkpointedState = context.getOperatorStateStore().getListState(descriptor);

我們看到郭蕉,算子狀態(tài)的注冊和使用跟 Keyed State 非常類似疼邀,也是需要先定義一個(gè)狀態(tài)描述器(StateDescriptor),告訴 Flink 當(dāng)前狀態(tài)的名稱和類型恳不,然后從上下文提供的算子狀態(tài)存儲(OperatorStateStore)中獲取對應(yīng)的狀態(tài)對象檩小。如果想要從 KeyedStateStore 中獲取 Keyed State也是一樣的,前提是必須基于定義了 key 的KeyedStream烟勋,這和富函數(shù)類中的方式并不矛盾规求。通過這里的描述可以發(fā)現(xiàn),CheckpointedFunction 是 Flink 中非常底層的接口卵惦,它為有狀態(tài)的流處理提供了靈活且豐富的應(yīng)用阻肿。

3.3.2 示例代碼

接下來我們舉一個(gè)算子狀態(tài)的應(yīng)用案例。在下面的例子中沮尿,自定義的 SinkFunction 會在CheckpointedFunction 中進(jìn)行數(shù)據(jù)緩存丛塌,然后統(tǒng)一發(fā)送到下游。這個(gè)例子演示了列表狀態(tài)的平均分割重組(event-split redistribution)畜疾。

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

public class BufferingSinkExample {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(10000L);
//        env.setStateBackend(new EmbeddedRocksDBStateBackend());

//        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(""));

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.setMinPauseBetweenCheckpoints(500);
        checkpointConfig.setCheckpointTimeout(60000);
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        checkpointConfig.enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.enableUnalignedCheckpoints();


        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream.print("input");

        // 批量緩存輸出
        stream.addSink(new BufferingSink(10));

        env.execute();
    }

    public static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
        private final int threshold;
        private transient ListState<Event> checkpointedState;
        private List<Event> bufferedElements;

        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }

        @Override
        public void invoke(Event value, Context context) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Event element: bufferedElements) {
                    // 輸出到外部系統(tǒng)赴邻,這里用控制臺打印模擬
                    System.out.println(element);
                }
                System.out.println("==========輸出完畢=========");
                bufferedElements.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            // 把當(dāng)前局部變量中的所有元素寫入到檢查點(diǎn)中
            for (Event element : bufferedElements) {
                checkpointedState.add(element);
            }
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Event> descriptor = new ListStateDescriptor<>(
                    "buffered-elements",
                    Types.POJO(Event.class));

            checkpointedState = context.getOperatorStateStore().getListState(descriptor);

            // 如果是從故障中恢復(fù),就將ListState中的所有元素添加到局部變量中
            if (context.isRestored()) {
                for (Event element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
}

當(dāng)初始化好狀態(tài)對象后啡捶,我們可以通過調(diào)用. isRestored()方法判斷是否是從故障中恢復(fù)姥敛。在代碼中 BufferingSink 初始化時(shí),恢復(fù)出的 ListState 的所有元素會添加到一個(gè)局部變量bufferedElements 中瞎暑,以后進(jìn)行檢查點(diǎn)快照時(shí)就可以直接使用了彤敛。在調(diào)用.snapshotState()時(shí),直接清空 ListState了赌,然后把當(dāng)前局部變量中的所有元素寫入到檢查點(diǎn)中墨榄。

對于不同類型的算子狀態(tài),需要調(diào)用不同的獲取狀態(tài)對象的接口勿她,對應(yīng)地也就會使用不同的狀態(tài)分配重組算法袄秩。比如獲取列表狀態(tài)時(shí),調(diào)用.getListState() 會使用最簡單的 平均分割重組(even-split redistribution)算法嫂拴;而獲取聯(lián)合列表狀態(tài)時(shí)播揪,調(diào)用的是.getUnionListState() , 對應(yīng)就會使用聯(lián)合重組(union redistribution) 算法筒狠。

四猪狈、廣播狀態(tài)(Broadcast State)

4.1 基本用法

讓所有并行子任務(wù)都持有同一份狀態(tài),也就意味著一旦狀態(tài)有變化辩恼,所以子任務(wù)上的實(shí)例都要更新雇庙。什么時(shí)候會用到這樣的廣播狀態(tài)呢谓形?

一個(gè)最為普遍的應(yīng)用,就是“動態(tài)配置”或者“動態(tài)規(guī)則”疆前。我們在處理流數(shù)據(jù)時(shí)寒跳,有時(shí)會基于一些配置(configuration)或者規(guī)則(rule)。簡單的配置當(dāng)然可以直接讀取配置文件竹椒,一次加載童太,永久有效;但數(shù)據(jù)流是連續(xù)不斷的胸完,如果這配置隨著時(shí)間推移還會動態(tài)變化书释,那又該怎么辦呢?

一個(gè)簡單的想法是赊窥,定期掃描配置文件爆惧,發(fā)現(xiàn)改變就立即更新。但這樣就需要另外啟動一個(gè)掃描進(jìn)程锨能,如果掃描周期太長扯再,配置更新不及時(shí)就會導(dǎo)致結(jié)果錯誤;如果掃描周期太短址遇,又會耗費(fèi)大量資源做無用功熄阻。解決的辦法,還是流處理的“事件驅(qū)動”思路——我們可以將這動態(tài)的配置數(shù)據(jù)看作一條流倔约,將這條流和本身要處理的數(shù)據(jù)流進(jìn)行連接(connect)饺律,就可以實(shí)時(shí)地更新配置進(jìn)行計(jì)算了。

由于配置或者規(guī)則數(shù)據(jù)是全局有效的跺株,我們需要把它廣播給所有的并行子任務(wù)。而子任務(wù)需要把它作為一個(gè)算子狀態(tài)保存起來脖卖,以保證故障恢復(fù)后處理結(jié)果是一致的乒省。這時(shí)的狀態(tài),就是一個(gè)典型的廣播狀態(tài)畦木。我們知道袖扛,廣播狀態(tài)與其他算子狀態(tài)的列表(list)結(jié)構(gòu)不同,底層是以鍵值對(key-value)形式描述的十籍,所以其實(shí)就是一個(gè)映射狀態(tài)(MapState)蛆封。

在代碼上,可以直接調(diào)用 DataStream 的.broadcast()方法勾栗,傳入一個(gè)“映射狀態(tài)描述器”
(MapStateDescriptor)說明狀態(tài)的名稱和類型惨篱,就可以得到一個(gè)“廣播流”(BroadcastStream);進(jìn)而將要處理的數(shù)據(jù)流與這條廣播流進(jìn)行連接( connect )围俘, 就會得到“ 廣播連接流”(BroadcastConnectedStream)砸讳。注意廣播狀態(tài)只能用在廣播連接流中琢融。

關(guān)于廣播連接流,已經(jīng)在前面做過介紹簿寂,這里可以復(fù)習(xí)一下:

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<String> output = stream
        .connect(ruleBroadcastStream)
        .process( new BroadcastProcessFunction<>() {...} );

這里我們定義了一個(gè)“規(guī)則流”ruleStream漾抬,里面的數(shù)據(jù)表示了數(shù)據(jù)流 stream 處理的規(guī)則, 規(guī)則的數(shù)據(jù)類型定義為Rule常遂。于是需要先定義一個(gè) MapStateDescriptor 來描述廣播狀態(tài)纳令,然后傳入 ruleStream.broadcast()得到廣播流,接著用 stream 和廣播流進(jìn)行連接克胳。這里狀態(tài)描述器中的 key 類型為 String平绩,就是為了區(qū)分不同的狀態(tài)值而給定的 key 的名稱。

對 于 廣 播 連 接 流 調(diào) 用 .process() 方 法 毯欣, 可 以 傳 入 “ 廣 播 處 理 函 數(shù) ” KeyedBroadcastProcessFunction 或者BroadcastProcessFunction 來進(jìn)行處理計(jì)算馒过。廣播處理函數(shù)里面有兩個(gè)方法.processElement()和.processBroadcastElement(),源碼中定義如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT>   extends BaseBroadcastProcessFunction {
    ...

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    ...

}

這里的.processElement()方法酗钞,處理的是正常數(shù)據(jù)流腹忽,第一個(gè)參數(shù) value 就是當(dāng)前到來的流數(shù)據(jù);而.processBroadcastElement()方法就相當(dāng)于是用來處理廣播流的砚作,它的第一個(gè)參數(shù) value 就是廣播流中的規(guī)則或者配置數(shù)據(jù)窘奏。兩個(gè)方法第二個(gè)參數(shù)都是一個(gè)上下文 ctx,都可以通過調(diào)用.getBroadcastState()方法獲取到當(dāng)前的廣播狀態(tài)葫录;區(qū)別在于着裹,.processElement()方法里的上下文是“ 只讀” 的( ReadOnly ), 因此獲取到的廣播狀態(tài)也只能讀取不能更改米同;而.processBroadcastElement()方法里的 Context 則沒有限制骇扇,可以根據(jù)當(dāng)前廣播流中的數(shù)據(jù)更新狀態(tài)。

Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String,Types.POJO(Rule.class))).get("my rule");

通過調(diào)用 ctx.getBroadcastState()方法面粮,傳入一個(gè) MapStateDescriptor少孝,就可以得到當(dāng)前的叫作“rules”的廣播狀態(tài);調(diào)用它的.get()方法熬苍,就可以取出其中“my rule”對應(yīng)的值進(jìn)行計(jì)算處理稍走。

4.2 代碼實(shí)例

接下來我們舉一個(gè)廣播狀態(tài)的應(yīng)用案例〔竦祝考慮在電商應(yīng)用中婿脸,往往需要判斷用戶先后發(fā)生的行為的“組合模式”,比如“登錄-下單”或者“登錄-支付”柄驻,檢測出這些連續(xù)的行為進(jìn)行統(tǒng)計(jì)狐树,就可以了解平臺的運(yùn)用狀況以及用戶的行為習(xí)慣。

具體代碼如下:

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class BroadcastStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 讀取用戶行為事件流
        DataStreamSource<Action> actionStream = env.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "buy")
        );

        // 定義行為模式流鸿脓,代表了要檢測的標(biāo)準(zhǔn)
        DataStreamSource<Pattern> patternStream = env
                .fromElements(
                        new Pattern("login", "pay"),
                        new Pattern("login", "buy")
                );

        // 定義廣播狀態(tài)的描述器褪迟,創(chuàng)建廣播流
        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);

        // 將事件流和廣播流連接起來冗恨,進(jìn)行處理
        DataStream<Tuple2<String, Pattern>> matches = actionStream
                .keyBy(data -> data.userId)
                .connect(bcPatterns)
                .process(new PatternEvaluator());

        matches.print();

        env.execute();
    }

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {

        // 定義一個(gè)值狀態(tài),保存上一次用戶行為
        ValueState<String> prevActionState;

        @Override
        public void open(Configuration conf) {
            prevActionState = getRuntimeContext().getState(
                    new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        @Override
        public void processBroadcastElement(
                Pattern pattern,
                Context ctx,
                Collector<Tuple2<String, Pattern>> out) throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));

            // 將廣播狀態(tài)更新為當(dāng)前的pattern
            bcState.put(null, pattern);
        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx,
                                   Collector<Tuple2<String, Pattern>> out) throws Exception {
            Pattern pattern = ctx.getBroadcastState(
                    new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);

            String prevAction = prevActionState.value();
            if (pattern != null && prevAction != null) {
                // 如果前后兩次行為都符合模式定義味赃,輸出一組匹配
                if (pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }
            // 更新狀態(tài)
            prevActionState.update(action.action);
        }
    }

    // 定義用戶行為事件POJO類
    public static class Action {
        public String userId;
        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId=" + userId +
                    ", action='" + action + '\'' +
                    '}';
        }
    }

    // 定義行為模式POJO類掀抹,包含先后發(fā)生的兩個(gè)行為
    public static class Pattern {
        public String action1;
        public String action2;

        public Pattern() {
        }

        public Pattern(String action1, String action2) {
            this.action1 = action1;
            this.action2 = action2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "action1='" + action1 + '\'' +
                    ", action2='" + action2 + '\'' +
                    '}';
        }
    }
}

這里我們將檢測的行為模式定義為 POJO 類 Pattern,里面包含了連續(xù)的兩個(gè)行為心俗。由于廣播狀態(tài)中只保存了一個(gè) Pattern傲武,并不關(guān)心 MapState 中的 key,所以也可以直接將 key 的類型指定為 Void城榛,具體值就是 null揪利。在具體的操作過程中,我們將廣播流中的 Pattern 數(shù)據(jù)保存為廣播變量狠持;在行為數(shù)據(jù) Action 到來之后讀取當(dāng)前廣播變量疟位,確定行為模式,并將之前的一次行為保存為一個(gè) ValueState——這是針對當(dāng)前用戶的狀態(tài)保存喘垂,所以用到了 Keyed State甜刻。檢測到如果前一次行為與 Pattern 中的 action1 相同,而當(dāng)前行為與 action2 相同正勒,則發(fā)現(xiàn)了匹配模式的一組行為得院,輸出檢測結(jié)果。

五章贞、狀態(tài)持久化和狀態(tài)后端

在 Flink 的狀態(tài)管理機(jī)制中祥绞,很重要的一個(gè)功能就是對狀態(tài)進(jìn)行持久化(persistence)保存,這樣就可以在發(fā)生故障后進(jìn)行重啟恢復(fù)鸭限。Flink 對狀態(tài)進(jìn)行持久化的方式蜕径,就是將當(dāng)前所有分布式狀態(tài)進(jìn)行“快照”保存,寫入一個(gè)“檢查點(diǎn)”(checkpoint)或者保存點(diǎn)(savepoint)保存到外部存儲系統(tǒng)中败京。具體的存儲介質(zhì)丧荐,一般是分布式文件系統(tǒng)(distributed file system)。

5.1 檢查點(diǎn)(Checkpoint)

有狀態(tài)流應(yīng)用中的檢查點(diǎn)(checkpoint)景醇,其實(shí)就是所有任務(wù)的狀態(tài)在某個(gè)時(shí)間點(diǎn)的一個(gè)快照(一份拷貝)瑟俭。簡單來講,就是一次“存盤”,讓我們之前處理數(shù)據(jù)的進(jìn)度不要丟掉赔桌。在一個(gè)流應(yīng)用程序運(yùn)行時(shí),F(xiàn)link 會定期保存檢查點(diǎn)砾赔,在檢查點(diǎn)中會記錄每個(gè)算子的 id 和狀態(tài)旦事;如果發(fā)生故障,F(xiàn)link 就會用最近一次成功保存的檢查點(diǎn)來恢復(fù)應(yīng)用的狀態(tài)族吻,重新啟動處理流程帽借, 就如同“讀檔”一樣珠增。

如果保存檢查點(diǎn)之后又處理了一些數(shù)據(jù),然后發(fā)生了故障砍艾,那么重啟恢復(fù)狀態(tài)之后這些數(shù)據(jù)帶來的狀態(tài)改變會丟失蒂教。為了讓最終處理結(jié)果正確,我們還需要讓源(Source)算子重新讀取這些數(shù)據(jù)脆荷,再次處理一遍凝垛。這就需要流的數(shù)據(jù)源具有“數(shù)據(jù)重放”的能力,一個(gè)典型的例子就是Kafka蜓谋,我們可以通過保存消費(fèi)數(shù)據(jù)的偏移量梦皮、故障重啟后重新提交來實(shí)現(xiàn)數(shù)據(jù)的重放。這是對“至少一次”(at least once)狀態(tài)一致性的保證桃焕,如果希望實(shí)現(xiàn)“精確一次”(exactly once)的一致性剑肯,還需要數(shù)據(jù)寫入外部系統(tǒng)時(shí)的相關(guān)保證。關(guān)于這部分內(nèi)容我們會在后面繼續(xù)討論观堂。

默認(rèn)情況下让网, 檢查點(diǎn)是被禁用的, 需要在代碼中手動開啟型将。直接調(diào)用執(zhí)行環(huán)境的.enableCheckpointing()方法就可以開啟檢查點(diǎn)寂祥。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

env.enableCheckpointing(1000);

這里傳入的參數(shù)是檢查點(diǎn)的間隔時(shí)間,單位為毫秒七兜。關(guān)于檢查點(diǎn)的詳細(xì)配置丸凭,后面會詳細(xì)講。

除了檢查點(diǎn)之外腕铸,F(xiàn)link 還提供了“保存點(diǎn)”(savepoint)的功能惜犀。保存點(diǎn)在原理和形式上跟檢查點(diǎn)完全一樣,也是狀態(tài)持久化保存的一個(gè)快照狠裹;區(qū)別在于虽界,保存點(diǎn)是自定義的鏡像保存,所以不會由 Flink 自動創(chuàng)建涛菠,而需要用戶手動觸發(fā)莉御。這在有計(jì)劃地停止、重啟應(yīng)用時(shí)非常有用俗冻。

5.2 狀態(tài)后端(State Backends)

檢查點(diǎn)的保存離不開 JobManager 和 TaskManager礁叔,以及外部存儲系統(tǒng)的協(xié)調(diào)。在應(yīng)用進(jìn)行檢查點(diǎn)保存時(shí)迄薄,首先會由 JobManager 向所有 TaskManager 發(fā)出觸發(fā)檢查點(diǎn)的命令琅关; TaskManger 收到之后,將當(dāng)前任務(wù)的所有狀態(tài)進(jìn)行快照保存讥蔽,持久化到遠(yuǎn)程的存儲介質(zhì)中涣易; 完成之后向 JobManager 返回確認(rèn)信息画机。這個(gè)過程是分布式的,當(dāng) JobManger 收到所有TaskManager 的返回信息后新症,就會確認(rèn)當(dāng)前檢查點(diǎn)成功保存步氏,如圖所示。而這一切工作的協(xié)調(diào)账劲,就需要一個(gè)“專職人員”來完成戳护。

在 Flink 中,狀態(tài)的存儲瀑焦、訪問以及維護(hù)腌且,都是由一個(gè)可插拔的組件決定的,這個(gè)組件就叫作狀態(tài)后端(state backend)榛瓮。狀態(tài)后端主要負(fù)責(zé)兩件事:一是本地的狀態(tài)管理铺董,二是將檢查點(diǎn)(checkpoint)寫入遠(yuǎn)程的持久化存儲。

5.2.1 狀態(tài)后端的分類

狀態(tài)后端是一個(gè)“開箱即用”的組件禀晓,可以在不改變應(yīng)用程序邏輯的情況下獨(dú)立配置精续。Flink 中提供了兩類不同的狀態(tài)后端,一種是“哈希表狀態(tài)后端”(HashMapStateBackend)粹懒,另一種是“內(nèi)嵌 RocksDB 狀態(tài)后端”(EmbeddedRocksDBStateBackend)重付。如果沒有特別配置,系統(tǒng)默認(rèn)的狀態(tài)后端是HashMapStateBackend凫乖。

1确垫、哈希表狀態(tài)后端(HashMapStateBackend)

這種方式就是我們之前所說的,把狀態(tài)存放在內(nèi)存里帽芽。具體實(shí)現(xiàn)上删掀,哈希表狀態(tài)后端在內(nèi)部會直接把狀態(tài)當(dāng)作對象(objects),保存在 Taskmanager 的 JVM 堆(heap)上导街。普通的狀態(tài)披泪,以及窗口中收集的數(shù)據(jù)和觸發(fā)器(triggers),都會以鍵值對(key-value)的形式存儲起來搬瑰,所以底層是一個(gè)哈希表(HashMap)款票,這種狀態(tài)后端也因此得名。
對于檢查點(diǎn)的保存泽论,一般是放在持久化的分布式文件系統(tǒng)(file system)中艾少,也可以通過配置“檢查點(diǎn)存儲”(CheckpointStorage)來另外指定。

HashMapStateBackend 是將本地狀態(tài)全部放入內(nèi)存的佩厚,這樣可以獲得最快的讀寫速度,使計(jì)算性能達(dá)到最佳说订;代價(jià)則是內(nèi)存的占用抄瓦。它適用于具有大狀態(tài)潮瓶、長窗口、大鍵值狀態(tài)的作業(yè)钙姊, 對所有高可用性設(shè)置也是有效的毯辅。

2、內(nèi)嵌RocksDB 狀態(tài)后端(EmbeddedRocksDBStateBackend)

RocksDB 是一種內(nèi)嵌的 key-value 存儲介質(zhì)煞额,可以把數(shù)據(jù)持久化到本地硬盤思恐。配置EmbeddedRocksDBStateBackend 后,會將處理中的數(shù)據(jù)全部放入 RocksDB 數(shù)據(jù)庫中膊毁,RocksDB 默認(rèn)存儲在TaskManager 的本地?cái)?shù)據(jù)目錄里胀莹。

與 HashMapStateBackend 直接在堆內(nèi)存中存儲對象不同,這種方式下狀態(tài)主要是放在RocksDB 中的婚温。數(shù)據(jù)被存儲為序列化的字節(jié)數(shù)組(Byte Arrays)描焰,讀寫操作需要序列化/反序列化,因此狀態(tài)的訪問性能要差一些栅螟。另外荆秦,因?yàn)樽隽诵蛄谢琸ey 的比較也會按照字節(jié)進(jìn)行力图, 而不是直接調(diào)用.hashCode()和.equals()方法步绸。

對于檢查點(diǎn),同樣會寫入到遠(yuǎn)程的持久化文件系統(tǒng)中吃媒。
EmbeddedRocksDBStateBackend 始終執(zhí)行的是異步快照瓤介,也就是不會因?yàn)楸4鏅z查點(diǎn)而阻塞數(shù)據(jù)的處理;而且它還提供了增量式保存檢查點(diǎn)的機(jī)制晓折,這在很多情況下可以大大提升保存效率惑朦。

由于它會把狀態(tài)數(shù)據(jù)落盤,而且支持增量化的檢查點(diǎn)漓概,所以在狀態(tài)非常大漾月、窗口非常長、鍵/值狀態(tài)很大的應(yīng)用場景中是一個(gè)好選擇胃珍,同樣對所有高可用性設(shè)置有效梁肿。

5.2.2 如何選擇正確的狀態(tài)后端

HashMap 和RocksDB 兩種狀態(tài)后端最大的區(qū)別,就在于本地狀態(tài)存放在哪里:前者是內(nèi)存觅彰,后者是 RocksDB吩蔑。在實(shí)際應(yīng)用中,選擇那種狀態(tài)后端填抬,主要是需要根據(jù)業(yè)務(wù)需求在處理性能和應(yīng)用的擴(kuò)展性上做一個(gè)選擇烛芬。

HashMapStateBackend 是內(nèi)存計(jì)算,讀寫速度非常快赘娄;但是仆潮,狀態(tài)的大小會受到集群可用內(nèi)存的限制,如果應(yīng)用的狀態(tài)隨著時(shí)間不停地增長遣臼,就會耗盡內(nèi)存資源性置。

而 RocksDB 是硬盤存儲,所以可以根據(jù)可用的磁盤空間進(jìn)行擴(kuò)展揍堰,而且是唯一支持增量檢查點(diǎn)的狀態(tài)后端鹏浅,所以它非常適合于超級海量狀態(tài)的存儲。不過由于每個(gè)狀態(tài)的讀寫都需要做序列化/反序列化屏歹,而且可能需要直接從磁盤讀取數(shù)據(jù)隐砸,這就會導(dǎo)致性能的降低,平均讀寫性能要比HashMapStateBackend 慢一個(gè)數(shù)量級西采。

實(shí)際應(yīng)用就是權(quán)衡利弊后的取舍凰萨。最理想的當(dāng)然是處理速度快且內(nèi)存不受限制可以處理海量狀態(tài),那就需要非常大的內(nèi)存資源了械馆,這會導(dǎo)致成本超出項(xiàng)目預(yù)算胖眷。比起花更多的錢,稍慢的處理速度或者稍小的處理規(guī)模霹崎,更容易接受一點(diǎn)珊搀。

5.2.3 狀態(tài)后端的配置

在不做配置的時(shí)候,應(yīng)用程序使用的默認(rèn)狀態(tài)后端是由集群配置文件 flink-conf.yaml 中指定的尾菇,配置的鍵名稱為 state.backend境析。這個(gè)默認(rèn)配置對集群上運(yùn)行的所有作業(yè)都有效,我們可以通過更改配置值來改變默認(rèn)的狀態(tài)后端派诬。另外劳淆,我們還可以在代碼中為當(dāng)前作業(yè)單獨(dú)配置狀態(tài)后端,這個(gè)配置會覆蓋掉集群配置文件的默認(rèn)值默赂。

1沛鸵、配置默認(rèn)的狀態(tài)后端

在flink-conf.yaml中,可以使用 state.backend 來配置默認(rèn)狀態(tài)后端缆八。
配置項(xiàng)的可能值為hashmap曲掰,這樣配置的就是HashMapStateBackend;也可以是rocksdb奈辰,這樣配置的就是 EmbeddedRocksDBStateBackend栏妖。另外,也可以是一個(gè)實(shí)現(xiàn)了狀態(tài)后端工廠StateBackendFactory 的類的完全限定類名奖恰。
下面是一個(gè)配置HashMapStateBackend 的例子:

# 默認(rèn)狀態(tài)后端
state.backend: hashmap # 存放檢查點(diǎn)的文件路徑
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

這里的state.checkpoints.dir配置項(xiàng)吊趾,定義了狀態(tài)后端將檢查點(diǎn)和元數(shù)據(jù)寫入的目錄宛裕。

2、為每個(gè)作業(yè)(Per-job)單獨(dú)配置狀態(tài)后端

每個(gè)作業(yè)獨(dú)立的狀態(tài)后端论泛,可以在代碼中续滋,基于作業(yè)的執(zhí)行環(huán)境直接設(shè)置。代碼如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

上面代碼設(shè)置的是HashMapStateBackend孵奶,如果想要設(shè)置 EmbeddedRocksDBStateBackend,可以用下面的配置方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意蜡峰,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend了袁,需要為 Flink 項(xiàng)目添加依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.15.0</version>
</dependency>

而由于 Flink 發(fā)行版中默認(rèn)就包含了RocksDB,所以只要我們的代碼中沒有使用 RocksDB的相關(guān)內(nèi)容湿颅,就不需要引入這個(gè)依賴载绿。即使我們在 flink-conf.yaml 配置文件中設(shè)定了state.backend 為 rocksdb,也可以直接正常運(yùn)行油航,并且使用 RocksDB 作為狀態(tài)后端崭庸。

參考:
https://blog.csdn.net/mengxianglong123/article/details/123938304

https://blog.csdn.net/ks_1998/article/details/125424108

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市谊囚,隨后出現(xiàn)的幾起案子怕享,更是在濱河造成了極大的恐慌,老刑警劉巖镰踏,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件函筋,死亡現(xiàn)場離奇詭異,居然都是意外死亡奠伪,警方通過查閱死者的電腦和手機(jī)跌帐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绊率,“玉大人谨敛,你說我怎么就攤上這事÷朔瘢” “怎么了脸狸?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長顽聂。 經(jīng)常有香客問我肥惭,道長,這世上最難降的妖魔是什么紊搪? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任蜜葱,我火速辦了婚禮,結(jié)果婚禮上耀石,老公的妹妹穿的比我還像新娘牵囤。我一直安慰自己爸黄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布揭鳞。 她就那樣靜靜地躺著炕贵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪野崇。 梳的紋絲不亂的頭發(fā)上称开,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機(jī)與錄音乓梨,去河邊找鬼鳖轰。 笑死,一個(gè)胖子當(dāng)著我的面吹牛扶镀,可吹牛的內(nèi)容都是我干的蕴侣。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼臭觉,長吁一口氣:“原來是場噩夢啊……” “哼昆雀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蝠筑,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤狞膘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后什乙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體客冈,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年稳强,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了场仲。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡退疫,死狀恐怖渠缕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情褒繁,我是刑警寧澤亦鳞,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站棒坏,受9級特大地震影響燕差,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜坝冕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一徒探、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧喂窟,春花似錦测暗、人聲如沸央串。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽质和。三九已至,卻和暖如春稚字,著一層夾襖步出監(jiān)牢的瞬間饲宿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工胆描, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留褒傅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓袄友,卻偏偏與公主長得像,于是被迫代替她去往敵國和親霹菊。 傳聞我的和親對象是個(gè)殘疾皇子剧蚣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容