Flink CEP

1 復(fù)雜事件

1.1 引入

在Flink RichFunction&state這篇博文中我們一起學(xué)習(xí)了下如何結(jié)合使用keyBy state和TreeSet在一條無界流中進(jìn)行全局的分組求top n操作,可以解決一些實(shí)時(shí)看板相關(guān)的業(yè)務(wù)問題。在Flink BroadcastStream這篇博文中我們也學(xué)習(xí)到了如何使用廣播流來處理監(jiān)控規(guī)則經(jīng)常變更的程序日志監(jiān)控業(yè)務(wù)糜芳。

那么現(xiàn)在我們又遇到了一個(gè)新的業(yè)務(wù)需求:判斷一個(gè)用戶在點(diǎn)擊了商品之后是否立即進(jìn)行了下單付款操作沧踏,如果是的話將用戶名和點(diǎn)擊時(shí)間以及下單付款時(shí)間都輸出到控制臺呻拌。
我們思考了一下磷蜀,發(fā)現(xiàn)繼續(xù)使用keyBy state操作好像也能夠?qū)崿F(xiàn)這個(gè)業(yè)務(wù)需求包雀,只是我們的判斷邏輯和代碼都會(huì)比較復(fù)雜一些谢谦。邏輯思路如下

1.將數(shù)據(jù)整理成UserAction(ip: String, timestamp: Long, name: String, action: String)
2.通過name對UserAction進(jìn)行keyBy操作,得到KeyedStream[UserAction, String]
3.KeyedStream.process(xxx)此時(shí)在每個(gè)name分組中愿阐,需要一個(gè)keyBy state同時(shí)保存該name對應(yīng)的"click"操作的UserAction的信息
  使用一個(gè)ValueState[UserAction]來保存"click"信息微服,"buy"信息不需要保存
4.這個(gè)時(shí)候碰到的問題就是在每個(gè)name分組如何判斷"click","buy"的UserAction是有順序來到該分組
5.針對于4的問題,可以通過ValueState存儲的信息來進(jìn)行判斷
  a."click","buy"先后來到該name分組
    "click"來到該name分組時(shí)缨历,直接將"click"添加更新進(jìn)ValueState
    "buy"來到該name分組時(shí)以蕴,判斷ValueState是否有存儲"click"
        有則將"click"取出來和當(dāng)前"buy"整合并發(fā)送到下游,且要清空ValueState信息
        沒有直接清空ValueState信息辛孵,不發(fā)送任何數(shù)據(jù)到下游
  b.如果有非"click","buy"信息來到name分組丛肮,均直接清空ValueState信息,不發(fā)送任何數(shù)據(jù)到下游

當(dāng)我們遇到的業(yè)務(wù)并不是很復(fù)雜的時(shí)候使用上邊的思路完全可以解決魄缚,但是如果要是碰到更復(fù)雜一些的業(yè)務(wù)需求可能判斷邏輯就會(huì)越來越復(fù)雜:"click"和"buy"之間可以有"show"操作宝与;"click"和"buy"之間可以有零或者多個(gè)"click"操作。

那怎么能夠使用更簡潔的邏輯和代碼來實(shí)現(xiàn)這種越來越復(fù)雜的問題呢冶匹?

1.2 復(fù)雜事件

我們先來給這樣的業(yè)務(wù)來定這樣一個(gè)描述:檢測和發(fā)現(xiàn)無界事件流中多個(gè)記錄的關(guān)聯(lián)規(guī)則习劫,也就是從無界事件流中得到滿足規(guī)則的復(fù)雜事件。

CEP(Complex Event Processing)就是在無界事件流中檢測事件模式嚼隘,讓我們掌握數(shù)據(jù)中重要的部分诽里。flink CEP是在flink中實(shí)現(xiàn)的復(fù)雜事件處理庫。

01_show.png
目標(biāo):從簡單事件流中發(fā)現(xiàn)一些高階特征
輸入:一個(gè)或者多個(gè)簡單事件構(gòu)成的事件流
處理:檢測簡單事件之間的聯(lián)系飞蛹,多個(gè)事件組合一起符合匹配規(guī)則须肆,將該多個(gè)事件構(gòu)成復(fù)雜事件
輸出:符合規(guī)則的復(fù)雜事件

2 Pattern API

我們先來認(rèn)識一下CEP中Pattern模式吧,也就是規(guī)則的制定桩皿。

val start: Pattern[X, X] = Pattern.begin[X]("start")

Pattern根據(jù)模式的組合種類,分為了三種

2.1 個(gè)體模式(Individual Patterns)

組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義幢炸,就是“個(gè)體模式”

start.where(condition: F => Boolean)
  • 量詞

個(gè)體模式根據(jù)接收同一種事件的數(shù)量又可以分為“單例模式”和“循環(huán)模式“泄隔,我們通過一個(gè)“量詞”來指定接受同一種事件的數(shù)量。

start.times(2)                      // 必須2次
start.times(2, 5)                   // 2宛徊,3佛嬉,4或者5次都可以
start.times(2, 5).greedy            // 2逻澳,3,4或者5次暖呕,并且盡可能的重復(fù)匹配
start.times(2).optional             // 0或者2次
start.oneOrMore                     // 1次或者多次
start.timesOrMore(2).optional.greedy// 0, 2或者多次斜做,并且盡可能的重復(fù)匹配
  • 條件

個(gè)體模式的條件,可以在一個(gè)個(gè)體模式上判斷使用多個(gè)條件湾揽,只有當(dāng)條件都滿足的情況下才算匹配成功

// 組合條件 (參考sql中的 where or)
.where()    // 條件相連為and
.or()       // 條件相連為or
// 終止條件
.until()    // 當(dāng)使用了oneOrMore或者oneOrMore.optional時(shí)需要進(jìn)行終止瓤逼,以便清楚狀態(tài)
// 迭代條件
.where(condition: (F, Context[F]) => Boolean) // 調(diào)用上下文對前邊接收的事件進(jìn)行處理
// ctx.getEventsForPattern("start")

2.2 組合模式(Combining Patterns)

多個(gè)個(gè)體模式組合起來就形成了一個(gè)組合模式

Pattern.begin[X]("start").where(condition: F => Boolean)
        .next("middle").where(condition: F => Boolean)

注意:組合模式必須由一個(gè)“開始個(gè)體模式”開始

val start: Pattern[X, X] = Pattern.begin[X]("start")

2.3 模式組(Groups of Patterns)

將一個(gè)組合模式作為條件嵌套在個(gè)體模式里,成為一組模式

3 近鄰

當(dāng)我們在對事件流進(jìn)行復(fù)雜事件處理库物,有時(shí)我們需要兩個(gè)事件在流中必須相連霸旗,有時(shí)只要兩個(gè)事件出現(xiàn)在流中不必須相連中間可以間隔一個(gè)其他事件。也就是我們對多個(gè)事件組成規(guī)則嚴(yán)格性的寬容度戚揭,那么如何來表達(dá)這個(gè)容忍度呢诱告?使用近鄰。

嚴(yán)格近鄰:所有事件嚴(yán)格按照順序進(jìn)行民晒,中間沒有任何不匹配的事件精居。使用next()指定

對于模式"a next b",事件流[a, c, b, d]潜必,沒有匹配
02_yange.png

寬松近鄰:允許中間出現(xiàn)不匹配事件靴姿。使用followedBy()指定

對于模式"a followedBy b",事件流[a, c, b, d]刮便,匹配為 [a, b]
03_kuansong.png

非確定寬松近鄰:一個(gè)匹配的事件能夠再次使用空猜。使用followedByAny()指定

對于模式"a followedByAny b",事件流[a, c, b1, b2]恨旱,匹配為 [a, b1],[a, b2]
04_feiqueding_kuansong.png

4 示例

此處我們使用flink CEP來實(shí)現(xiàn) 1.1引入 中提到的業(yè)務(wù)需求

1 CEP并不包含在flink中辈毯,使用前需要自己導(dǎo)入cep jar包

    <!-- 使用cep需要引入該jar包,
        scala語言編寫要導(dǎo)入 artifactId前綴為 flink-cep-scala -->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>

2 定義Pattern模式且必須要以Pattern.begin("xxx")開始搜贤,"xxx"為別名谆沃,在后邊PatternStream.select()中獲取符合模式的數(shù)據(jù)時(shí)會(huì)使用到。

val pattern = Pattern.begin[T]("start").where(...).next("middle").where(...)
    // 定義一個(gè)cep pattern模式仪芒。此處復(fù)雜的事務(wù)為:用戶click后馬上進(jìn)行buy操作
    val pattern: Pattern[UserAction, UserAction] =
      Pattern.begin[UserAction]("start").where(new SimpleCondition[UserAction] {
        override def filter(t: UserAction): Boolean = t.action.equals("click")
      })
        .next("middle").where(new SimpleCondition[UserAction] {
        override def filter(t: UserAction): Boolean = t.action.equals("buy")
      })
    // 獲取一個(gè)普通的流
    val input: KeyedStream[UserAction, String] = env.addSource(
        new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
      .map { line => 
        val strs: Array[String] = line.split(",")
        UserAction(strs(0), strs(1).toLong, strs(2), strs(3)) // 將記錄轉(zhuǎn)換為UserAction類型
      }.keyBy(_.name)
    // 將我們定義好的cep pattern應(yīng)用于這個(gè)普通的流
    val patternStream: PatternStream[UserAction] = CEP.pattern(input, pattern)
    // 通過select算子獲取符合pattern的事務(wù)數(shù)據(jù)唁影,并打印結(jié)果
    patternStream.select(new PatternSelectFunction[UserAction, String] {
      override def select(map: util.Map[String, util.List[UserAction]]): String = {
        val click: UserAction = map.get("start").iterator().next()
        val buy: UserAction = map.get("middle").iterator().next()
        // 打印用戶的名稱,點(diǎn)擊和購買的時(shí)間
        s"name: ${click.name}, click: ${click.timestamp}, buy: ${buy.timestamp}"
      }
    }).print()
05_cep_demo.png

示例代碼中在Pattern指定時(shí)掂名,我們使用next()來表示需要得到是符合嚴(yán)格近鄰的復(fù)雜事件据沈,所以"click","order","buy"這樣的復(fù)雜事件并不符合模式也不會(huì)被輸出打印到控制臺。

此博文饺蔑,我們只是簡單的介紹了下CEP及其使用方式锌介,但是CEP的知識遠(yuǎn)不遠(yuǎn)不止這些。后邊有機(jī)會(huì)的話我們再一起深入學(xué)習(xí),此處就不進(jìn)行展開了孔祸。
下一篇博文我們就來一起學(xué)習(xí)一下flink中的三種時(shí)間語義和窗口機(jī)制吧隆敢。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市崔慧,隨后出現(xiàn)的幾起案子拂蝎,更是在濱河造成了極大的恐慌,老刑警劉巖惶室,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件温自,死亡現(xiàn)場離奇詭異,居然都是意外死亡拇涤,警方通過查閱死者的電腦和手機(jī)捣作,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鹅士,“玉大人券躁,你說我怎么就攤上這事〉糁眩” “怎么了也拜?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長趾痘。 經(jīng)常有香客問我慢哈,道長,這世上最難降的妖魔是什么永票? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任卵贱,我火速辦了婚禮,結(jié)果婚禮上侣集,老公的妹妹穿的比我還像新娘键俱。我一直安慰自己,他們只是感情好世分,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布编振。 她就那樣靜靜地躺著,像睡著了一般臭埋。 火紅的嫁衣襯著肌膚如雪踪央。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天瓢阴,我揣著相機(jī)與錄音畅蹂,去河邊找鬼。 笑死荣恐,一個(gè)胖子當(dāng)著我的面吹牛魁莉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼旗唁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了痹束?” 一聲冷哼從身側(cè)響起检疫,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎祷嘶,沒想到半個(gè)月后屎媳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡论巍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年烛谊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嘉汰。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡丹禀,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鞋怀,到底是詐尸還是另有隱情双泪,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布密似,位于F島的核電站琼锋,受9級特大地震影響盯漂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一艳悔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧附较,春花似錦刃泡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至掖看,卻和暖如春匣距,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背哎壳。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工毅待, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人归榕。 一個(gè)月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓尸红,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子外里,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 光消失在地平線 街燈明了怎爵, 星星睜開了眼睛, 夜盅蝗,如稠墨般鳖链。 唯獨(dú), 璀璨的燈迷失了我的眼墩莫。 觸手的光明芙委, 一片冰...
    風(fēng)中的旅行閱讀 197評論 0 0
  • 坐標(biāo)/蛟橋園 最近看了一些書和電影,沒人交談總感覺悶悶的狂秦,想到了簡書灌侣,重新裝回來,也想把每天的一些小事情記錄下來裂问,...
    就是蘇蘇呀閱讀 121評論 1 1