(1)簡介及應(yīng)用場景:
復(fù)雜事件處理(CEP)既是把不同的數(shù)據(jù)看做不同的事件,并且通過分析事件之間的關(guān)系建立起一套事件關(guān)系序列庫适刀。利用過濾,聚合煤蹭,關(guān)聯(lián)性笔喉,依賴,層次等技術(shù)硝皂,最終實(shí)現(xiàn)由簡單關(guān)系產(chǎn)生高級(jí)事件關(guān)系常挚。
復(fù)雜事件主要應(yīng)用場景:主要用于信用卡欺詐檢測、用戶風(fēng)險(xiǎn)檢測稽物、設(shè)備故障檢測奄毡、攻擊行為分析等領(lǐng)域。
Flink CEP能夠利用的場景較多贝或,在實(shí)際業(yè)務(wù)場景中也有了廣泛的使用案例與經(jīng)驗(yàn)積累吼过。比如
在可編程方面,F(xiàn)link同時(shí)推出了Flink SQL CEP傀缩,開發(fā)者可以通過較為屬性的SQL語法快速構(gòu)建各類CEP事件組合應(yīng)用那先。
Flink CEP原理說明:
(2)Flink CEP匹配模式介紹:
在Flink CEP中匹配模式分為嚴(yán)格近鄰模式和寬松近鄰模式。嚴(yán)格近鄰模式的事件必須是緊密連接的赡艰,寬松近鄰事件可以無需緊密連接售淡,如下圖:
(3)Flink CEP SQL語法介紹:
(3.1)Flink CEP SQL樣例:
String sql = "SELECT * " +
"FROM CEP_SQL_3 " +
" MATCH_RECOGNIZE ( " +
" PARTITION BY symbol " + //分組
" ORDER BY rowtime " + //排序
" MEASURES " + //定義如何根據(jù)匹配成功的輸入事件構(gòu)造輸出事件
" LISTAGG(CAST(e3.id as varchar),',') as ids,"+
" AVG(e1.price) as avgPrice,"+
// " START_ROW.rowtime AS start_tstamp, " +
" LAST(e1.rowtime) AS bottom_tstamp, " + //第一次的事件時(shí)間為end_timestamp
" LAST(e3.rowtime) AS end_tstamp " + //最新的事件時(shí)間為end_timestamp
" ONE ROW PER MATCH " + //匹配成功輸出一條
" AFTER MATCH SKIP PAST LAST ROW " + //匹配后跳轉(zhuǎn)到下一行
" PATTERN ( e1 e2 e3{1}) WITHIN INTERVAL '2' MINUTE" + //定義事件組
" DEFINE " + //定義每個(gè)事件的匹配條件
" e1 AS " +
" e1.price = 25 , " +
" e2 AS " +
" e2.price = 18 ," +
" e3 AS " +
" e3.price = 15 " +
" ) MR";
(3.2)Flink CEP匹配規(guī)則:貪婪詞量和勉強(qiáng)詞量
Concatenation-像(AB)這樣的模式意味著A和B之間的連接是嚴(yán)格的。因此,在它們之間不能存在沒有映射到A或B的行揖闸。
Quantifiers-修改可以映射到模式變量的行數(shù)揍堕。
- 0或者多行
-
1或者多行
? 0或者1行
{n} 嚴(yán)格n行(n>0)
{n,} n或者更多行(n≥O)
{n,m} 在n到m(包含)行之間(0≤n≤m,0<m)
{,m}一在0到m(包含)行之間(m>0)
(3.3)匹配策略
SKIP PAST LAST ROW -匹配成功之后汤纸,從匹配成功的事件序列中的最后?個(gè)事件的下?個(gè)事件開始進(jìn)?下?次匹配衩茸。
SKIP TO NEXT ROW -匹配成功之后,從匹配成功的事件序列中的第?個(gè)事件的下?個(gè)事件開始進(jìn)?下?次匹配贮泞。(默認(rèn)模式)
SKIP TO LAST variable -匹配成功之后楞慈,從匹配成功的事件序列中最后?個(gè)對(duì)應(yīng)于變量的事件開始進(jìn)行下?次匹 配。
SKIP TO FIRST variable -匹配成功之后啃擦,從匹配成功的事件序列中第?個(gè)對(duì)應(yīng)于變量的事件開始進(jìn)行下?次匹配囊蓝。