1 復(fù)雜事件
1.1 引入
在Flink RichFunction&state這篇博文中我們一起學(xué)習(xí)了下如何結(jié)合使用keyBy state和TreeSet在一條無界流中進(jìn)行全局的分組求top n操作,可以解決一些實(shí)時(shí)看板相關(guān)的業(yè)務(wù)問題。在Flink BroadcastStream這篇博文中我們也學(xué)習(xí)到了如何使用廣播流來處理監(jiān)控規(guī)則經(jīng)常變更的程序日志監(jiān)控業(yè)務(wù)糜芳。
那么現(xiàn)在我們又遇到了一個(gè)新的業(yè)務(wù)需求:判斷一個(gè)用戶在點(diǎn)擊了商品之后是否立即進(jìn)行了下單付款操作沧踏,如果是的話將用戶名和點(diǎn)擊時(shí)間以及下單付款時(shí)間都輸出到控制臺呻拌。
我們思考了一下磷蜀,發(fā)現(xiàn)繼續(xù)使用keyBy state操作好像也能夠?qū)崿F(xiàn)這個(gè)業(yè)務(wù)需求包雀,只是我們的判斷邏輯和代碼都會(huì)比較復(fù)雜一些谢谦。邏輯思路如下
1.將數(shù)據(jù)整理成UserAction(ip: String, timestamp: Long, name: String, action: String)
2.通過name對UserAction進(jìn)行keyBy操作,得到KeyedStream[UserAction, String]
3.KeyedStream.process(xxx)此時(shí)在每個(gè)name分組中愿阐,需要一個(gè)keyBy state同時(shí)保存該name對應(yīng)的"click"操作的UserAction的信息
使用一個(gè)ValueState[UserAction]來保存"click"信息微服,"buy"信息不需要保存
4.這個(gè)時(shí)候碰到的問題就是在每個(gè)name分組如何判斷"click","buy"的UserAction是有順序來到該分組
5.針對于4的問題,可以通過ValueState存儲的信息來進(jìn)行判斷
a."click","buy"先后來到該name分組
"click"來到該name分組時(shí)缨历,直接將"click"添加更新進(jìn)ValueState
"buy"來到該name分組時(shí)以蕴,判斷ValueState是否有存儲"click"
有則將"click"取出來和當(dāng)前"buy"整合并發(fā)送到下游,且要清空ValueState信息
沒有直接清空ValueState信息辛孵,不發(fā)送任何數(shù)據(jù)到下游
b.如果有非"click","buy"信息來到name分組丛肮,均直接清空ValueState信息,不發(fā)送任何數(shù)據(jù)到下游
當(dāng)我們遇到的業(yè)務(wù)并不是很復(fù)雜的時(shí)候使用上邊的思路完全可以解決魄缚,但是如果要是碰到更復(fù)雜一些的業(yè)務(wù)需求可能判斷邏輯就會(huì)越來越復(fù)雜:"click"和"buy"之間可以有"show"操作宝与;"click"和"buy"之間可以有零或者多個(gè)"click"操作。
那怎么能夠使用更簡潔的邏輯和代碼來實(shí)現(xiàn)這種越來越復(fù)雜的問題呢冶匹?
1.2 復(fù)雜事件
我們先來給這樣的業(yè)務(wù)來定這樣一個(gè)描述:檢測和發(fā)現(xiàn)無界事件流中多個(gè)記錄的關(guān)聯(lián)規(guī)則习劫,也就是從無界事件流中得到滿足規(guī)則的復(fù)雜事件。
CEP(Complex Event Processing)就是在無界事件流中檢測事件模式嚼隘,讓我們掌握數(shù)據(jù)中重要的部分诽里。flink CEP是在flink中實(shí)現(xiàn)的復(fù)雜事件處理庫。
目標(biāo):從簡單事件流中發(fā)現(xiàn)一些高階特征
輸入:一個(gè)或者多個(gè)簡單事件構(gòu)成的事件流
處理:檢測簡單事件之間的聯(lián)系飞蛹,多個(gè)事件組合一起符合匹配規(guī)則须肆,將該多個(gè)事件構(gòu)成復(fù)雜事件
輸出:符合規(guī)則的復(fù)雜事件
2 Pattern API
我們先來認(rèn)識一下CEP中Pattern模式吧,也就是規(guī)則的制定桩皿。
val start: Pattern[X, X] = Pattern.begin[X]("start")
Pattern根據(jù)模式的組合種類,分為了三種
2.1 個(gè)體模式(Individual Patterns)
組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義幢炸,就是“個(gè)體模式”
start.where(condition: F => Boolean)
- 量詞
個(gè)體模式根據(jù)接收同一種事件的數(shù)量又可以分為“單例模式”和“循環(huán)模式“泄隔,我們通過一個(gè)“量詞”來指定接受同一種事件的數(shù)量。
start.times(2) // 必須2次
start.times(2, 5) // 2宛徊,3佛嬉,4或者5次都可以
start.times(2, 5).greedy // 2逻澳,3,4或者5次暖呕,并且盡可能的重復(fù)匹配
start.times(2).optional // 0或者2次
start.oneOrMore // 1次或者多次
start.timesOrMore(2).optional.greedy// 0, 2或者多次斜做,并且盡可能的重復(fù)匹配
- 條件
個(gè)體模式的條件,可以在一個(gè)個(gè)體模式上判斷使用多個(gè)條件湾揽,只有當(dāng)條件都滿足的情況下才算匹配成功
// 組合條件 (參考sql中的 where or)
.where() // 條件相連為and
.or() // 條件相連為or
// 終止條件
.until() // 當(dāng)使用了oneOrMore或者oneOrMore.optional時(shí)需要進(jìn)行終止瓤逼,以便清楚狀態(tài)
// 迭代條件
.where(condition: (F, Context[F]) => Boolean) // 調(diào)用上下文對前邊接收的事件進(jìn)行處理
// ctx.getEventsForPattern("start")
2.2 組合模式(Combining Patterns)
多個(gè)個(gè)體模式組合起來就形成了一個(gè)組合模式
Pattern.begin[X]("start").where(condition: F => Boolean)
.next("middle").where(condition: F => Boolean)
注意:組合模式必須由一個(gè)“開始個(gè)體模式”開始
val start: Pattern[X, X] = Pattern.begin[X]("start")
2.3 模式組(Groups of Patterns)
將一個(gè)組合模式作為條件嵌套在個(gè)體模式里,成為一組模式
3 近鄰
當(dāng)我們在對事件流進(jìn)行復(fù)雜事件處理库物,有時(shí)我們需要兩個(gè)事件在流中必須相連霸旗,有時(shí)只要兩個(gè)事件出現(xiàn)在流中不必須相連中間可以間隔一個(gè)其他事件。也就是我們對多個(gè)事件組成規(guī)則嚴(yán)格性的寬容度戚揭,那么如何來表達(dá)這個(gè)容忍度呢诱告?使用近鄰。
嚴(yán)格近鄰:所有事件嚴(yán)格按照順序進(jìn)行民晒,中間沒有任何不匹配的事件精居。使用next()指定
對于模式"a next b",事件流[a, c, b, d]潜必,沒有匹配
寬松近鄰:允許中間出現(xiàn)不匹配事件靴姿。使用followedBy()指定
對于模式"a followedBy b",事件流[a, c, b, d]刮便,匹配為 [a, b]
非確定寬松近鄰:一個(gè)匹配的事件能夠再次使用空猜。使用followedByAny()指定
對于模式"a followedByAny b",事件流[a, c, b1, b2]恨旱,匹配為 [a, b1],[a, b2]
4 示例
此處我們使用flink CEP來實(shí)現(xiàn) 1.1引入 中提到的業(yè)務(wù)需求
1 CEP并不包含在flink中辈毯,使用前需要自己導(dǎo)入cep jar包
<!-- 使用cep需要引入該jar包,
scala語言編寫要導(dǎo)入 artifactId前綴為 flink-cep-scala -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.12</artifactId>
<version>1.9.1</version>
</dependency>
2 定義Pattern模式且必須要以Pattern.begin("xxx")開始搜贤,"xxx"為別名谆沃,在后邊PatternStream.select()中獲取符合模式的數(shù)據(jù)時(shí)會(huì)使用到。
val pattern = Pattern.begin[T]("start").where(...).next("middle").where(...)
// 定義一個(gè)cep pattern模式仪芒。此處復(fù)雜的事務(wù)為:用戶click后馬上進(jìn)行buy操作
val pattern: Pattern[UserAction, UserAction] =
Pattern.begin[UserAction]("start").where(new SimpleCondition[UserAction] {
override def filter(t: UserAction): Boolean = t.action.equals("click")
})
.next("middle").where(new SimpleCondition[UserAction] {
override def filter(t: UserAction): Boolean = t.action.equals("buy")
})
// 獲取一個(gè)普通的流
val input: KeyedStream[UserAction, String] = env.addSource(
new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), initProps()))
.map { line =>
val strs: Array[String] = line.split(",")
UserAction(strs(0), strs(1).toLong, strs(2), strs(3)) // 將記錄轉(zhuǎn)換為UserAction類型
}.keyBy(_.name)
// 將我們定義好的cep pattern應(yīng)用于這個(gè)普通的流
val patternStream: PatternStream[UserAction] = CEP.pattern(input, pattern)
// 通過select算子獲取符合pattern的事務(wù)數(shù)據(jù)唁影,并打印結(jié)果
patternStream.select(new PatternSelectFunction[UserAction, String] {
override def select(map: util.Map[String, util.List[UserAction]]): String = {
val click: UserAction = map.get("start").iterator().next()
val buy: UserAction = map.get("middle").iterator().next()
// 打印用戶的名稱,點(diǎn)擊和購買的時(shí)間
s"name: ${click.name}, click: ${click.timestamp}, buy: ${buy.timestamp}"
}
}).print()
示例代碼中在Pattern指定時(shí)掂名,我們使用next()來表示需要得到是符合嚴(yán)格近鄰的復(fù)雜事件据沈,所以"click","order","buy"這樣的復(fù)雜事件并不符合模式也不會(huì)被輸出打印到控制臺。
此博文饺蔑,我們只是簡單的介紹了下CEP及其使用方式锌介,但是CEP的知識遠(yuǎn)不遠(yuǎn)不止這些。后邊有機(jī)會(huì)的話我們再一起深入學(xué)習(xí),此處就不進(jìn)行展開了孔祸。
下一篇博文我們就來一起學(xué)習(xí)一下flink中的三種時(shí)間語義和窗口機(jī)制吧隆敢。