Flink 作為底層的流處理框架袄简。主要出于以下幾點(diǎn)原因:
第一钩骇,F(xiàn)link 是一個(gè)純流式系統(tǒng)豺谈,吞吐量實(shí)際測(cè)試可達(dá) 100K EPS郑象。而不像某些框架是用 mini batch 的模式來達(dá)到所謂的流式處理的;
第二茬末,面對(duì)不同的用戶數(shù)據(jù)格式厂榛,我們必須支持多種數(shù)據(jù)源,這一點(diǎn)上 Flink 內(nèi)置的對(duì)多種數(shù)據(jù)源的支持(CSV丽惭,Kafka击奶,Hbase,Text责掏,Socket 數(shù)據(jù)等)也為用戶數(shù)據(jù)的接入提供了便利柜砾;
第三,F(xiàn)link 強(qiáng)大的窗口機(jī)制(包括翻轉(zhuǎn)窗口换衬,滑動(dòng)窗口痰驱,session 窗口,全窗口以及允許用戶自定義窗口)可以滿足復(fù)雜的業(yè)務(wù)邏輯瞳浦,使得用戶可以編寫復(fù)雜的業(yè)務(wù)規(guī)則担映;
第四,F(xiàn)link 內(nèi)置的 RocksDB 數(shù)據(jù)存儲(chǔ)格式使其數(shù)據(jù)處速度快且資源消耗少叫潦,在 Checkpoint 上起到了至關(guān)重要的作用蝇完;
第五,F(xiàn)link 對(duì)算子(operator)的高可控性诅挑,使得用戶可以靈活添加刪除或更改算子行為四敞。這一點(diǎn)對(duì)于動(dòng)態(tài)部署有著至關(guān)重要的意義。
規(guī)則引擎方面我們有兩個(gè)選擇:
Flink 原生 CEP 組件和 Drools 規(guī)則引擎拔妥。那么兩者各有什么優(yōu)勢(shì)和劣勢(shì)呢忿危?首先我們看一下 Flink CEP。當(dāng)前穩(wěn)定的 Flink1.3 版本的 CEP 是一套極具通用性没龙、易于使用的實(shí)時(shí)流式事件處理方案铺厨。作為 Flink 的原生組件,省去了第三方庫與 Flink 配合使用時(shí)可能會(huì)導(dǎo)致的各種問題硬纤。但其功能現(xiàn)階段看來還比較基礎(chǔ)解滓,不能表達(dá)復(fù)雜的業(yè)務(wù)場(chǎng)景,同時(shí)它不能夠做到動(dòng)態(tài)更新(這是一個(gè)痛點(diǎn))筝家。具體如何解決我們稍后會(huì)看到洼裤。
什么是 Drools?
Drools 是一套基于 JVM 的溪王,實(shí)現(xiàn)了 RETE 算法的規(guī)則引擎腮鞍。它可以將多變的規(guī)則從硬編碼中解放出來值骇,以規(guī)則腳本的形式存在。右邊圖中顯示的是一個(gè)典型的 Drools 規(guī)則的定義方式移国≈ù瘢可以看到,其語義與 Java 非常類似迹缀。既可以導(dǎo)入既有的 Java POJO(圖中 Person 類)使碾,也可以在規(guī)則文件中直接定義類(EventA)。when 語句中是具體的判斷條件祝懂,then 語句中是滿足判斷條件之后所做的操作票摇。操作可以是任意的,不僅限于對(duì)滿足條件的那個(gè)對(duì)象進(jìn)行操作嫂易。比如你可以在 then 里調(diào)用某個(gè) Java 類的方法兄朋,或者調(diào)用某個(gè)全局變量×担總之,可以在 Drools 規(guī)則文件中 import Java 類傅事,然后對(duì)其進(jìn)行操作缕允。
Drools 有些什么優(yōu)缺點(diǎn)呢?
它最大優(yōu)勢(shì)在于語法規(guī)則簡單蹭越,類似 Java障本,編寫門檻不高、能夠無縫化與 Java 集成响鹃,且用戶可以對(duì) Drools 規(guī)則進(jìn)行動(dòng)態(tài)配置驾霜。但這套方案也存在著自己的不足之處:例如其內(nèi)置聚合功能速度緩慢,不適合我們自身或者客戶使用場(chǎng)景下的大量聚合操作任務(wù)买置。另外粪糙,其內(nèi)置事件序列處理機(jī)制也需要消耗大量內(nèi)存資源。
下面我們來看一個(gè)具體的例子忿项。
可以看到我們這里有一條檢測(cè) VPN 可疑行為的規(guī)則蓉冈。規(guī)則當(dāng)中包含三條判斷條件。
第一條 metric 用來判斷一小時(shí)能登錄失敗的次數(shù)轩触。
第二條演示的是用戶與設(shè)備之間的實(shí)體關(guān)系寞酿,表達(dá)式 expression == “[vpn.user, vpn.device]”說明了這一點(diǎn)。
第三條演示的是在序列算法下異常值大于 50 的行為脱柱。
最后會(huì)將滿足條件的三個(gè)行為收集起來發(fā)送給下游的模塊伐弹。下游模塊可以是另一個(gè)算子,或者是持久化結(jié)果的 DB榨为。
有了 Flink 作為流計(jì)算引擎惨好,有了 Drools 作為規(guī)則引擎煌茴,那么我們?nèi)绾螌烧呓Y(jié)合放到一個(gè)系統(tǒng)里發(fā)揮作用呢。我們需要做的是將源數(shù)據(jù)輸入到 Flink 生成所謂的事件流昧狮,同時(shí)將 Drools 規(guī)則文本讀取到 Flink 生成所謂的規(guī)則流景馁。而 Flink 中提供了一個(gè) CoFlatMapFunction 可以將兩個(gè)流結(jié)合起來進(jìn)行分析。在這個(gè) function 里我們所要做的就是將在 Flink 里結(jié)合機(jī)器學(xué)習(xí)算法計(jì)算出來的結(jié)果與 Drools 規(guī)則進(jìn)行匹配逗鸣。
但事實(shí)上合住,這個(gè)方案在實(shí)際運(yùn)行當(dāng)中會(huì)有一些性能上的問題。這些問題主要表現(xiàn)在長周期行為的分析上撒璧。比如透葛,機(jī)器學(xué)習(xí)算法需要對(duì)長周期行為(數(shù)據(jù)往往跨越三個(gè)月)進(jìn)行計(jì)算,得出異常值卿樱。那么這種情況下我們需要維護(hù)算法生成的長周期行為的狀態(tài)僚害。具體方法可以是直接保存在 Drools Engine 中,或者將其保存在外部 DB 中繁调,再或者可以利用 Flink 的 stateful operator 來維護(hù)狀態(tài)萨蚕。但現(xiàn)有情況下,每種方法都多多少少會(huì)有一些問題蹄胰。接下來我們看看具體問題都有哪些岳遥。
需要保存過往窗口的狀態(tài),作為中間結(jié)果送入 Drools 規(guī)則引擎進(jìn)行計(jì)算裕寨。Flink 內(nèi)置的窗口機(jī)制在窗口結(jié)束時(shí)會(huì)清除窗口狀態(tài)浩蓉。 Flink內(nèi)置的RocksDB存儲(chǔ)結(jié)構(gòu)在窗口清理時(shí)會(huì)自動(dòng)刪除數(shù)據(jù)。 Flink產(chǎn)生的長周期聚合結(jié)果被送入 Drool 規(guī)則引擎進(jìn)行匹配的時(shí)候往往會(huì)消耗大量內(nèi)存宾袜∧硌蓿可以看到,主要的痛點(diǎn)就在于中間結(jié)果的維護(hù)和資源消耗的問題庆猫。面對(duì)這些問題我們可以嘗試以下的做法认轨。
首先想到的是用 redis,memcached 之類的 KV store 來保存中間結(jié)果阅悍。但實(shí)際測(cè)試結(jié)果表明好渠,它們的性能趕不上 Flink 的速度。所以在追求高吞吐量的情況下节视,此方法行不通拳锚。其次,可以通過修改 Flink RockDB backend 的源碼來解決窗口清理時(shí)自動(dòng)刪除數(shù)據(jù)的問題寻行。同時(shí)為了保證過期數(shù)據(jù)不擠壓霍掺,需要引入“TTL”(time to live)屬性,是的 rocksdb 在超時(shí)的時(shí)候自動(dòng)刪除過期數(shù)據(jù)。內(nèi)存問題主要是由 Drools 引擎引起的杆烁。因?yàn)槊恳粭l事件與規(guī)則匹配都會(huì)生成一個(gè) Fact牙丽,默認(rèn)情況下 fact 無論是否匹配,Drools 都不會(huì)立刻刪除它兔魂。你必須手動(dòng)的刪除它烤芦。但當(dāng)事件數(shù)量過大或者規(guī)則數(shù)量過大時(shí),即使你手動(dòng)刪除沒有匹配的 fact析校,可能也會(huì)出現(xiàn)某一時(shí)間段大量 fact 存在于內(nèi)存中的情況构罗。所以可行的辦法是設(shè)定閾值來控制內(nèi)存中允許同時(shí)存在的 fact 的數(shù)量,同時(shí)清理失效的 fact智玻∷爝螅或者也可以盡量保持規(guī)則簡單化。復(fù)雜的聚合規(guī)則交給 Flink 去做吊奢。
可以看到盖彭,以上方案所產(chǎn)生的性能問題主要在于 Drools。其實(shí)除了以上的方案页滚,我們還有一個(gè) Plan B召边。Flink1.4 Snapshot 版本增加了一些新功能。利用這些新功能裹驰,我們可以直接使用 Flink CEP 并做到動(dòng)態(tài)更新掌实。這些功能主要包括:新版本加入了對(duì)算子粒度的操作。我們可以 checkpoint 某一個(gè)算子的狀態(tài)邦马。同時(shí) Flink CEP 中新增了 pattern group 的概念⊙缏簦可以將多個(gè)規(guī)則 pattern 歸為同一個(gè) group滋将。這樣增加了規(guī)則的表達(dá)能力。利用這些功能症昏,我們重新設(shè)計(jì)了一個(gè)系統(tǒng)來實(shí)現(xiàn)規(guī)則的動(dòng)態(tài)更新随闽。下面我們來看一下新設(shè)計(jì)的工作流程。
簡單來講肝谭,整個(gè)工作流程就是用戶更新規(guī)則掘宪,新規(guī)則被翻譯成 Java 源碼,然后編譯并打包成可執(zhí)行 jar攘烛,這個(gè)時(shí)候系統(tǒng)將觸發(fā) Flink 的 Savepoint魏滚,保存當(dāng)前 operator 的狀態(tài),然后 cancel 當(dāng)前運(yùn)行的 Flink Job坟漱,然后把新生成的 jar 發(fā)布到 Flink 上去鼠次,同時(shí)讀取最新的 operator 狀態(tài),恢復(fù)整個(gè)系統(tǒng)的運(yùn)行。值得提出的一點(diǎn)是腥寇,根據(jù)規(guī)則文件里規(guī)則的數(shù)量和復(fù)雜度成翩。我們可以劃分規(guī)則生成多個(gè) jar 發(fā)布到 Flink 上。這樣單個(gè) job 的負(fù)載就不至于過高赦役。這種動(dòng)態(tài)生成規(guī)則代碼的方式擴(kuò)展性和并發(fā)性更出色麻敌,不存在單一大負(fù)載算子。缺陷在于從 Savepoint 到整個(gè)流程恢復(fù)會(huì)有數(shù)秒延遲掂摔。