[TOC]
一寂殉、概念
什么是 CEP:
- 復(fù)合事件處理(Complex Event Processing,CEP)
- Flink cep 是在 flink 中實(shí)現(xiàn)的一個(gè)復(fù)雜事件處理庫(kù)
- 一個(gè)或多個(gè)簡(jiǎn)單事件構(gòu)成的事件流通過一定的規(guī)則匹配毁嗦,然后輸出用戶得到的數(shù)據(jù)--滿足規(guī)則的復(fù)雜事件
CEP 的特征如下:
目標(biāo):從有序的簡(jiǎn)單事件流中發(fā)現(xiàn)一些高階特征
輸入:一個(gè)或多個(gè)簡(jiǎn)單事件構(gòu)成的事件流
處理:識(shí)別簡(jiǎn)單事件之間的內(nèi)在聯(lián)系往枷,多個(gè)符合一定規(guī)則的簡(jiǎn)單事件構(gòu)成復(fù)雜事件佛嬉;
輸出:滿足規(guī)則的復(fù)雜事件
市場(chǎng)上有多種 CEP 的解決方案涂乌,例如Spark械筛、Samza敢艰、Beam等舵匾,但他們都沒有提供專門的庫(kù)支持俊抵。然而,F(xiàn)link提供了專門的CEP庫(kù)坐梯。
Flink CEP 包含如下組件:Event Stream徽诲、Pattern定義、Pattern檢測(cè)和生成Alert吵血。
- 首先谎替,開發(fā)人員要在 DataStream 流上定義出模式條件
- 之后 Flink CEP 引擎進(jìn)行模式檢測(cè),必要時(shí)生成警告
簡(jiǎn)單來說一下蹋辅,其實(shí)可以把使用 flink CEP 當(dāng)做平時(shí)用的正則表達(dá)式钱贯,cep中的 Pattern 就是定義的正則表達(dá)式,flink 中的DataStream 就是正則表達(dá)式中待匹配的字符串侦另,flink 通過DataStream 和 自定義的Pattern進(jìn)行匹配喷舀,生成一個(gè)經(jīng)過過濾之后的DataStream。
基于自定義的pattern淋肾,可以做很多工作硫麻,比如監(jiān)控報(bào)警、風(fēng)控樊卓、反爬等等拿愧。
二、核心--Pattern API
處理事件的規(guī)則碌尔,被叫作模式(Pattern)浇辜。Flink CEP提供了Pattern API用于對(duì)輸入流數(shù)據(jù)進(jìn)行復(fù)雜事件規(guī)則定義,用來提取符合規(guī)則的事件序列唾戚。
模式大致分為兩類:
個(gè)體模式(Individual Patterns):組成復(fù)雜規(guī)則的每一個(gè)單獨(dú)的模式定義柳洋,就是個(gè)體模式。
start.times(3).where(_.behavior.startsWith("fav"))
組合模式(Combining Patterns叹坦,也叫模式序列):很多個(gè)體模式組合起來熊镣,就形成了整個(gè)的模式序列。
val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
2.1募书、個(gè)體模式
個(gè)體模式包括單例模式和循環(huán)模式绪囱。單例模式只接收一個(gè)事件,而循環(huán)模式可以接收多個(gè)事件莹捡。
2.1.1鬼吵、量詞
可以在一個(gè)個(gè)體模式后追加量詞,也就是指定循環(huán)次數(shù)篮赢。
// 匹配出現(xiàn)4次
start.time(4)
// 匹配出現(xiàn)0次或4次
start.time(4).optional
// 匹配出現(xiàn)2齿椅、3或4次
start.time(2,4)
// 匹配出現(xiàn)2琉挖、3或4次,并且盡可能多地重復(fù)匹配
start.time(2,4).greedy
// 匹配出現(xiàn)1次或多次
start.oneOrMore
// 匹配出現(xiàn)0涣脚、2或多次示辈,并且盡可能多地重復(fù)匹配
start.timesOrMore(2).optional.greedy
2.1.2、條件
每個(gè)模式都需要指定觸發(fā)條件涩澡,作為模式是否接受事件進(jìn)入的判斷依據(jù)顽耳。
CEP中的個(gè)體模式主要通過調(diào)用.where()、.or()和.until()來指定條件妙同。按不同的調(diào)用方式射富,可以分成以下幾類:
簡(jiǎn)單條件:通過.where()方法對(duì)事件中的字段進(jìn)行判斷篩選,決定是否接收該事件
start.where(event=>event.getName.startsWith("foo"))
組合條件:將簡(jiǎn)單的條件進(jìn)行合并粥帚,or()方法表示或邏輯相連胰耗,where的直接組合就相當(dāng)于與and。
Pattern.where(event => …/*some condition*/).or(event => /*or condition*/)
終止條件:如果使用了oneOrMore或者oneOrMore.optional芒涡,建議使用.until()作為終止條件柴灯,以便清理狀態(tài)。
迭代條件:能夠?qū)δJ街八薪邮盏氖录M(jìn)行處理费尽,調(diào)用.where((value,ctx) => {…})赠群,可以調(diào)用ctx.getEventForPattern("name")
2.2、組合模式
了解了獨(dú)立模式旱幼,現(xiàn)在看看如何將它們組合成一個(gè)完整的模式序列查描。
模式序列必須以初始模式開始,如下所示:
val start : Pattern[Event, _] = Pattern.begin("start")
接下來柏卤,可以通過指定它們之間所需的連續(xù)條件冬三,為模式序列添加更多模式。 Flink CEP 支持事件之間以下形式的鄰接:
- 嚴(yán)格連續(xù)性(Strict Contiguity):預(yù)期所有匹配事件一個(gè)接一個(gè)地出現(xiàn)缘缚,中間沒有任何不匹配的事件勾笆。
- 寬松連續(xù)性(Relaxed Contiguity):忽略匹配的事件之間出現(xiàn)的不匹配事件。
- 非確定性寬松連續(xù)性(Non-Deterministic Relaxed Contiguity):進(jìn)一步放寬鄰接桥滨,允許忽略一些匹配事件的其他匹配窝爪。
要在連續(xù)模式之間應(yīng)用它們,可以使用:
- next():用于嚴(yán)格連續(xù)
- followBy():用于寬松連續(xù)性
- followAyAny():用于非確定性寬松連續(xù)性
除了以上模式序列外该园,還可以定義“不希望出現(xiàn)某種近鄰關(guān)系”:
- notNext():不想讓某個(gè)事件嚴(yán)格緊鄰前一個(gè)事件發(fā)生
- notFollowedBy():不想讓某個(gè)事件在兩個(gè)其他事件類型之間的任何位置
需要注意:
- 所有模式序列必須以.begin()開始
- 模式序列不能以.notFollowedBy()結(jié)束
- “not”類型的模式不能被optional所修飾
- 可以為模式指定時(shí)間約束酸舍,用來要求在多長(zhǎng)時(shí)間內(nèi)匹配有效:next.within(Time.seconds(10))
val start : Pattern[Event, _] = Pattern.begin("start")
// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)
// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
寬松的連續(xù)性(Relaxed contiguity )意味著僅匹配第一個(gè)匹配事件,而具有非確定性的松弛連續(xù)性(non-deterministic relaxed contiguity)里初,將為同一個(gè)開始發(fā)出多個(gè)匹配。 例如模式“a b”忽舟,給定事件序列“a”双妨,“c”淮阐,“b1”,“b2”將給出以下結(jié)果:
- “a”和“b”之間的嚴(yán)格連續(xù)性:{}(不匹配)刁品,“a”之后的“c”導(dǎo)致“a”被丟棄泣特。
- “a”和“b”之間的寬松連續(xù)性:{a b1},因?yàn)閷捤傻倪B續(xù)性被視為“跳過非匹配事件直到下一個(gè)匹配事件”挑随。
- “a”和“b”之間的非確定性寬松連續(xù)性:{a b1}状您,{a b2},因?yàn)檫@是最一般的形式兜挨。
也可以為模式定義時(shí)間約束以使其有效膏孟。 例如,可以通過pattern.within()方法定義模式應(yīng)在10秒內(nèi)發(fā)生拌汇。 處理和事件時(shí)間都支持時(shí)間模式柒桑。
注意模式:序列只能有一個(gè)時(shí)間約束。 如果在不同的單獨(dú)模式上定義了多個(gè)這樣的約束噪舀,則應(yīng)用最小的約束魁淳。
2.2.1、循環(huán)模式中的連續(xù)性
可以在循環(huán)模式中應(yīng)用與上一節(jié)中討論的相同的連續(xù)條件与倡。
連續(xù)性將應(yīng)用于接受到這種模式的元素之間界逛。 為了舉例說明上述情況,模式序列“a b + c”(“a”后跟一個(gè)或多個(gè)“b”的任何(非確定性寬松)序列纺座,后跟“c”)息拜,輸入“a” “,”“b1”比驻,“d1”该溯,“b2”,“d2”别惦,“b3”“c”將產(chǎn)生以下結(jié)果:
- 嚴(yán)格連續(xù)性:{a b3 c} - “b1”之后的“d1”導(dǎo)致“b1”被丟棄狈茉,“b2”因“d2”而發(fā)生同樣的情況。
- 寬松的連續(xù)性:{a b1 c}掸掸,{a b1 b2 c}氯庆,{a b1 b2 b3 c},{a b2 c}扰付,{a b2 b3 c}堤撵,{a b3 c} - “d”被忽略。
- 非確定性寬松鄰接:{a b1 c}羽莺,{a b1 b2 c}实昨,{a b1 b3 c},{a b1 b2 b3 c}盐固,{a b2 c}荒给,{a b2 b3 c}丈挟,{a b3 c} - 注意{a b1 b3 c},這是寬松“b”之間鄰接的結(jié)果志电。
對(duì)于循環(huán)模式(例如oneOrMore()和times())曙咽,默認(rèn)是寬松的連續(xù)性。 如果想要嚴(yán)格的連續(xù)性挑辆,必須使用continuous()調(diào)用顯式指定它例朱,如果想要非確定性的松弛連續(xù)性,可以使用allowCombinations()調(diào)用鱼蝉。
2.2.2洒嗤、模式操作
consecutive()
與oneOrMore()和times()結(jié)合使用,并在匹配事件之間強(qiáng)加嚴(yán)格的連續(xù)性蚀乔,即任何不匹配的元素都會(huì)中斷匹配(像next())烁竭。
如果不應(yīng)用,則使用松弛的連續(xù)性(如followBy())吉挣。
Pattern.begin("start").where(_.getName().equals("c"))
.followedBy("middle").where(_.getName().equals("a"))
.oneOrMore().consecutive()
.followedBy("end1").where(_.getName().equals("b"))
為輸入序列生成以下匹配項(xiàng):C D A1 A2 A3 D A4 B.
- 嚴(yán)格連續(xù)應(yīng)用:{C A1 B}派撕,{C A1 A2 B},{C A1 A2 A3 B}
- 沒有嚴(yán)格連續(xù)應(yīng)用:{C A1 B}睬魂,{C A1 A2 B}终吼,{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
allowCombinations()
與oneOrMore()和times()一起使用氯哮,并在匹配事件之間強(qiáng)加非確定性的松散連續(xù)性(像followAyAny())际跪。
如果不應(yīng)用,則使用寬松的連續(xù)性(像followBy())喉钢。
Pattern.begin("start").where(_.getName().equals("c"))
.followedBy("middle").where(_.getName().equals("a"))
.oneOrMore().allowCombinations()
.followedBy("end1").where(_.getName().equals("b"))
將為輸入序列生成以下匹配項(xiàng):C D A1 A2 A3 D A4 B.
- 啟用combinations:{C A1 B}姆打,{C A1 A2 B},{C A1 A3 B}肠虽,{C A1 A4 B}幔戏,{C A1 A2 A3 B},{C A1 A2 A4 B}税课,{C A1 A3 A4 B}闲延,{C A1 A2 A3 A4 B}
- 未啟用combinations:{C A1 B},{C A1 A2 B}韩玩,{C A1 A2 A3 B}垒玲,{C A1 A2 A3 A4 B}
2.3、模式的檢測(cè)
指定要查找的模式序列后找颓,就可以將其應(yīng)用于輸入流以檢測(cè)潛在匹配合愈。調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個(gè)PatternStream想暗。
val input:DataStream[Event] = …
val pattern:Pattern[Event,_] = …
val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
2.4妇汗、匹配事件的提取
創(chuàng)建PatternStream之后帘不,就可以應(yīng)用select或者flatSelect方法说莫,從檢測(cè)到的事件序列中提取事件了。
select()方法需要輸入一個(gè)select function作為參數(shù)寞焙,每個(gè)成功匹配的事件序列都會(huì)調(diào)用它储狭。
select()以一個(gè)Map[String,Iterable[IN]]來接收匹配到的事件序列,其中key就是每個(gè)模式的名稱捣郊,而value就是所有接收到的事件的Iterable類型辽狈。
def selectFn(pattern : Map[String,Iterable[IN]]):OUT={
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
OUT(startEvent, endEvent)
}
2.5、處理超時(shí)部分模式
每當(dāng)模式具有通過within關(guān)鍵字附加的窗口長(zhǎng)度時(shí)呛牲,部分事件序列可能因?yàn)槌^窗口長(zhǎng)度而被丟棄刮萌。 要對(duì)超時(shí)的部分匹配進(jìn)行操作,可以使用TimedOutPartialMatchHandler接口娘扩。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
@Override
public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
...
}
@Override
public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
IN startEvent = match.get("start").get(0);
ctx.output(outputTag, T(startEvent));
}
}
三着茸、demo
這是來自尚硅谷的一個(gè)例子:檢測(cè)一個(gè)用戶在3秒內(nèi)連續(xù)登陸失敗。
首先要導(dǎo)入依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
// 輸入的登錄事件樣例類
case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
// 輸出的異常報(bào)警信息樣例類
case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)
/**
* @author w1992wishes 2020/7/28 16:29
*/
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 1. 讀取事件數(shù)據(jù)琐旁,創(chuàng)建簡(jiǎn)單事件流
val loginEventStream = env.readTextFile("E:\\project\\my_project\\daily-summary\\flink\\flink-details\\src\\main\\resources\\LoginLog.csv")
.map(data => {
val dataArray = data.split(",")
LoginEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L
})
.keyBy(_.userId)
// 2. 定義匹配模式
val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
.next("next").where(_.eventType == "fail")
.within(Time.seconds(3))
// 3. 在事件流上應(yīng)用模式涮阔,得到一個(gè)pattern stream
val patternStream = CEP.pattern(loginEventStream, loginFailPattern)
// 4. 從pattern stream上應(yīng)用select function,檢出匹配事件序列
val LoginFailDataStream = patternStream.select(new LoginFailMatch())
LoginFailDataStream.print()
env.execute("login fail with cep job")
}
}
class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning] {
override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
val firstFail = map.get("begin").iterator().next()
val lastFail = map.get("next").iterator().next()
Warning(firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!")
}
}
LoginLog.csv 內(nèi)容如下:
5402,83.149.11.115,success,1558430815
23064,66.249.3.15,fail,1558430826
5692,80.149.25.29,fail,1558430833
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,fail,1558430846
1035,83.149.9.216,fail,1558430843
1035,83.149.24.26,fail,1558430844
7328,193.114.45.13,success,1558430848
29607,66.249.73.135,success,1558430847
2133,50.16.19.13,success,1558430857
6745,66.249.73.185,success,1558430859
76456,110.136.166.128,success,1558430853
8345,46.105.14.53,success,1558430855
76456,110.136.166.128,success,1558430857
76456,110.136.166.128,success,1558430854
76456,110.136.166.128,fail,1558430859
76456,110.136.166.128,success,1558430861
3464,123.125.71.35,success,1558430860
76456,110.136.166.128,success,1558430865
65322,50.150.204.184,success,1558430866
23565,207.241.237.225,fail,1558430862
8455,200.49.190.101,success,1558430867
8455,200.49.190.100,success,1558430865
8455,200.49.190.101,success,1558430869
8455,200.49.190.101,success,1558430872
32031,66.249.73.185,success,1558430875
12018,66.249.73.135,success,1558430874
12018,66.249.73.135,success,1558430879
12018,66.249.73.135,success,1558430881
21419,67.214.178.190,success,1558430882
21419,67.214.178.190,success,1558430880
23565,207.241.237.220,success,1558430881
2386,46.105.14.53,success,1558430883
23565,207.241.237.227,success,1558430884
83419,91.177.205.119,success,1558430881
83419,91.177.205.119,fail,1558430882
83419,91.177.205.119,success,1558430885
83419,91.177.205.119,fail,1558430886
83419,91.177.205.119,success,1558430884
83419,91.177.205.119,success,1558430886
4325,26.249.73.15,success,1558430888
2123,207.241.237.228,success,1558430887
21083,207.241.237.101,success,1558430889
13490,87.169.99.232,success,1558430886
93765,209.85.238.199,success,1558430890
93765,209.85.238.199,success,1558430892