Flink CEP的基石:NFA-b自動(dòng)機(jī)原理簡(jiǎn)介

前言

Flink的復(fù)雜事件處理(complex event processing, CEP)庫(kù)能夠在無(wú)界數(shù)據(jù)流中通過(guò)匹配定義好的事件模式來(lái)發(fā)現(xiàn)一系列事件之間的關(guān)聯(lián)規(guī)律奸汇,從而有效支持趨勢(shì)分析、風(fēng)險(xiǎn)監(jiān)控既忆、欺詐檢測(cè)等業(yè)務(wù)場(chǎng)景柠座。它提供了一套簡(jiǎn)單易用邑雅、表達(dá)性強(qiáng)的API,例如愚隧,在10秒的時(shí)間窗口內(nèi)檢測(cè)事件的報(bào)警級(jí)別:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val partitionedInput = sourceStream.keyBy(event => event.getId)

// start[] -> middle[name = 'error'] -> .. -> end[name = 'critical'] within 10 secs
val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)
val alerts = patternStream.select(createAlert(_))

具體的用法可參見(jiàn)官方文檔蒂阱。

那么,F(xiàn)link CEP是采用什么方法匹配事件規(guī)則的呢狂塘?在源碼注釋中录煤,可以得知它是基于《Efficient Pattern Matching over Event Streams》這篇論文的思想實(shí)現(xiàn)的。該論文提出了一種在事件流上進(jìn)行高效模式匹配的方法荞胡,即帶匹配緩存的非確定有限狀態(tài)機(jī)妈踊,又稱為NFAb自動(dòng)機(jī)。本文先介紹NFAb自動(dòng)機(jī)的相關(guān)原理泪漂,在之后的文章中廊营,再結(jié)合源碼講解Flink CEP庫(kù)的具體實(shí)現(xiàn)歪泳。

NFAb自動(dòng)機(jī)的定義與構(gòu)造

在大學(xué)《形式語(yǔ)言與自動(dòng)機(jī)》課程中,我們都學(xué)習(xí)過(guò)非確定有限狀態(tài)機(jī)(NFA)露筒,用一句話概括就是:對(duì)于每個(gè)<狀態(tài),輸入符號(hào)>二元組呐伞,其狀態(tài)轉(zhuǎn)移可以有多個(gè),而不是確定的一個(gè)慎式。NFAb自動(dòng)機(jī)的形式化定義與普通NFA略有不同伶氢,為五元組:

A = (Q, E, θ, q1, F)

其中:

  • Q為狀態(tài)集合;
  • E為表示狀態(tài)轉(zhuǎn)移的有向邊集合瘪吏;
  • θ為表示狀態(tài)轉(zhuǎn)移的公式集合癣防,與E共同作用;
  • q1表示起始狀態(tài)掌眠;
  • F表示結(jié)束狀態(tài)蕾盯。

下面通過(guò)實(shí)例來(lái)構(gòu)造NFAb自動(dòng)機(jī)。先通過(guò)SASE+語(yǔ)言(一種專門用來(lái)描述CEP pattern的通用語(yǔ)言)定義如下的股票趨勢(shì)匹配模式:

PATTERN SEQ(Stock+ a[], Stock b)
WHERE skip_till_next_match(a[], b) {
        [symbol]  // 表示只考慮相同的事件類型蓝丙,此處恒為真
    AND a[1].volume > 1000
    AND a[i].price > avg(a[..i-1].price)
    AND b.volume < 80% * a[a.LEN].volume
} WITHIN 1 hour   // 時(shí)間窗口

該模式以1小時(shí)作為時(shí)間窗口的長(zhǎng)度级遭,以“交易量大于1000”作為匹配序列的起始,且要求序列中股票的最近價(jià)格必須高于之前所有交易價(jià)格的均值迅腔。當(dāng)檢測(cè)到該股票的交易量下跌到最近一次交易量的80%以下時(shí)装畅,匹配成功結(jié)束靠娱。

根據(jù)上面的條件沧烈,構(gòu)造出NFAb自動(dòng)機(jī),如下圖所示像云。這也是Flink CEP中NFACompiler組件需要做的事情锌雀。

注意∧符號(hào)表示與,∨符號(hào)表示或迅诬,┐符號(hào)表示非

匹配序列a[]的生成實(shí)際上就是構(gòu)造符合謂詞約束的事件的正閉包Stock+ a[](克林閉包去掉ε)腋逆。也就是說(shuō),a[1]是上述自動(dòng)機(jī)的起始狀態(tài)(交易量大于1000)侈贷,a[i]是正在構(gòu)造正閉包的狀態(tài)(最近價(jià)格高于之前所有交易價(jià)格的均值)惩歉。而b是從閉包中跳出并匹配下一事件的狀態(tài)(交易量下跌到最近一次交易量的80%以下)。

NFAb自動(dòng)機(jī)的每個(gè)狀態(tài)都有各自的匹配緩存俏蛮,用于在運(yùn)行時(shí)存儲(chǔ)當(dāng)前的匹配結(jié)果撑蚌。關(guān)于匹配緩存的細(xì)節(jié),后文會(huì)講到搏屑。

狀態(tài)轉(zhuǎn)移語(yǔ)義

復(fù)雜事件的匹配過(guò)程本質(zhì)上就是輸入事件流驅(qū)動(dòng)NFAb自動(dòng)機(jī)進(jìn)行狀態(tài)轉(zhuǎn)移的過(guò)程争涌。根據(jù)θ集合定義的條件,在有向邊集合E上可以定義4種狀態(tài)轉(zhuǎn)移語(yǔ)義辣恋。

  • begin:消費(fèi)輸入事件亮垫,存入緩存模软,并轉(zhuǎn)移到下一個(gè)狀態(tài);
  • take:消費(fèi)輸入事件饮潦,存入緩存燃异,并保持當(dāng)前狀態(tài);
  • ignore:忽略輸入事件继蜡,不存入緩存特铝,并保持當(dāng)前狀態(tài);
  • proceed:感知輸入事件壹瘟,轉(zhuǎn)移到下一個(gè)狀態(tài)鲫剿,同時(shí)保留該事件給下一個(gè)狀態(tài)處理。

結(jié)合這4種狀態(tài)轉(zhuǎn)移語(yǔ)義稻轨,我們就可以讀懂上圖中的轉(zhuǎn)移公式了灵莲。Flink CEP的StateTransitionAction定義中沒(méi)有begin語(yǔ)義,僅有take殴俱、ignore和proceed語(yǔ)義政冻,但是它和NFAb自動(dòng)機(jī)是等價(jià)的,之后分析源碼時(shí)將會(huì)看到线欲。

事件選擇策略

所謂事件選擇策略明场,就是指選擇符合條件的事件進(jìn)入正閉包——即擴(kuò)展匹配序列的方法。在時(shí)間窗口的限制之內(nèi)李丰,常用的有以下三種策略苦锨。

  • strict(嚴(yán)格連續(xù)):嚴(yán)格按順序選擇所有符合條件的事件,途中不能出現(xiàn)不符合條件的事件趴泌,對(duì)應(yīng)Flink CEP API中的Pattern.next()/notNext()舟舒;
  • skip till next match(寬松連續(xù)):按順序選擇所有符合條件的事件,而途中不符合條件的事件被忽略嗜憔,對(duì)應(yīng)Flink CEP API中的Pattern.followedBy()/notFollowedBy()秃励。上述SASE+語(yǔ)言描述的pattern使用的就是這個(gè)策略;
  • skip till any match(可變寬松連續(xù)):在skip till next match的基礎(chǔ)上吉捶,還允許忽略一些符合條件的事件夺鲜,以盡量延長(zhǎng)匹配序列的長(zhǎng)度,對(duì)應(yīng)Flink CEP API中的Pattern.followedByAny()呐舔。

以skip till next match策略為例币励,給出如下的示例數(shù)據(jù),可以產(chǎn)生3個(gè)匹配序列R1滋早、R2榄审、R3,如圖所示杆麸。

共享版本匹配緩存

仍然考慮上一節(jié)的圖搁进,回顧一下a[i]狀態(tài)的take和proceed轉(zhuǎn)移邏輯:

θ*a[i]_take = θa[i]_take ∧ a[i].time<a[1].time+1 hour
θ*a[i]_proceed = θb_begin ∨ (?θ*a[i]_take ∧ ?θ*a[i]_ignore)

可見(jiàn)浪感,在e6到達(dá)NFA時(shí),可以同時(shí)滿足a[i]_take和a[i]_proceed的轉(zhuǎn)移(這里正好體現(xiàn)出了NFA的不確定性)饼问,所以原本的一個(gè)序列會(huì)在此分裂成兩個(gè):其中一個(gè)(R1)終止匹配影兽,另一個(gè)(R3)繼續(xù)匹配。同理莱革,當(dāng)e3到達(dá)NFA時(shí)峻堰,同時(shí)滿足a[1]_begin和a[i]_take的轉(zhuǎn)移,所以又會(huì)出現(xiàn)一個(gè)序列R2盅视。

由上可知捐名,這些序列之間的重合是比較大的,如果都按原樣存儲(chǔ)在匹配緩存中闹击,會(huì)造成比較大的膨脹镶蹋。為了避免這個(gè)問(wèn)題,論文中設(shè)計(jì)了一種科學(xué)的緩存結(jié)構(gòu)赏半,稱為shared versioned match buffer贺归,即“共享版本匹配緩存”,如下圖所示断箫。

其中圖a拂酣、b、c是原始的R1仲义、R2婶熬、R3緩存,圖d則是整合在一起的共享版本緩存光坝。它會(huì)將所有序列的前向指針附加上一個(gè)版本號(hào)(采用杜威十進(jìn)制法尸诽,點(diǎn)號(hào)分隔),并且遵循以下兩個(gè)規(guī)則:

  • 遷移到下一個(gè)狀態(tài)時(shí)盯另,版本號(hào)增加一位,如a[1]狀態(tài)的版本號(hào)是1(為了符合習(xí)慣寫作1.0)洲赵,a[i]狀態(tài)的版本號(hào)是1.0鸳惯、1.1,b狀態(tài)的版本號(hào)是1.0.0叠萍、1.1.0……以此類推芝发;
  • 當(dāng)序列發(fā)生分裂時(shí),處于當(dāng)前狀態(tài)的版本號(hào)位加1苛谷。例如e3事件產(chǎn)生了2.0版本辅鲸,e6事件產(chǎn)生了1.1版本。

依照這種規(guī)則腹殿,就可以根據(jù)前向指針上版本號(hào)的遞增規(guī)律和前綴來(lái)回溯出正確的序列了独悴。Flink CEP中將此緩存設(shè)計(jì)為SharedBuffer類例书,但是版本的設(shè)計(jì)有些不同,之后再提刻炒。

計(jì)算狀態(tài)

對(duì)于每一個(gè)序列决采,NFAb自動(dòng)機(jī)還需要維護(hù)一些最基礎(chǔ)的狀態(tài)數(shù)據(jù),以方便執(zhí)行狀態(tài)轉(zhuǎn)移和匹配邏輯坟奥,論文中將其稱為computation state树瞭,即計(jì)算狀態(tài)“基礎(chǔ)的計(jì)算狀態(tài)結(jié)構(gòu)如下圖所示晒喷,包含以下數(shù)據(jù)項(xiàng):

  • 當(dāng)前的版本號(hào);
  • 當(dāng)前的狀態(tài)访敌;
  • 指向匹配緩存中最近一個(gè)事件的指針厨埋;
  • 整個(gè)序列的起始時(shí)間;
  • 其他必要的上下文數(shù)據(jù)存儲(chǔ)捐顷。以股票趨勢(shì)數(shù)據(jù)為例荡陷,會(huì)維護(hù)正閉包內(nèi)的事件數(shù)、價(jià)格之和以及交易量等迅涮。

Flink CEP框架用ComputationState類來(lái)維護(hù)計(jì)算狀態(tài)废赞,大體思路與論文相同。

The End

有一段時(shí)間沒(méi)認(rèn)真讀過(guò)論文了叮姑,大腦還是需要鍛煉的唉地。

民那晚安。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末传透,一起剝皮案震驚了整個(gè)濱河市耘沼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌朱盐,老刑警劉巖群嗤,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異兵琳,居然都是意外死亡狂秘,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門躯肌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)者春,“玉大人,你說(shuō)我怎么就攤上這事清女∏蹋” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)拴袭。 經(jīng)常有香客問(wèn)我读第,道長(zhǎng),這世上最難降的妖魔是什么稻扬? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任卦方,我火速辦了婚禮,結(jié)果婚禮上泰佳,老公的妹妹穿的比我還像新娘盼砍。我一直安慰自己,他們只是感情好逝她,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布浇坐。 她就那樣靜靜地躺著,像睡著了一般黔宛。 火紅的嫁衣襯著肌膚如雪近刘。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天臀晃,我揣著相機(jī)與錄音觉渴,去河邊找鬼。 笑死徽惋,一個(gè)胖子當(dāng)著我的面吹牛案淋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播险绘,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼踢京,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了宦棺?” 一聲冷哼從身側(cè)響起瓣距,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎代咸,沒(méi)想到半個(gè)月后蹈丸,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡侣背,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年白华,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贩耐。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖厦取,靈堂內(nèi)的尸體忽然破棺而出潮太,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布铡买,位于F島的核電站更鲁,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏奇钞。R本人自食惡果不足惜澡为,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望景埃。 院中可真熱鬧媒至,春花似錦、人聲如沸谷徙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)完慧。三九已至谋旦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間屈尼,已是汗流浹背册着。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留脾歧,地道東北人甲捏。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像涨椒,于是被迫代替她去往敵國(guó)和親摊鸡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354