Flink-8.Flink CEP-惡意登錄檢測

package com.ctgu.flink.project;

import com.ctgu.flink.entity.LoginEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.List;
import java.util.Map;


public class Flink_Sql_Login_CEP {
    public static void main(String[] args) throws Exception {

        long start = System.currentTimeMillis();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> dataStream = env.readTextFile("data/LoginLog.csv");

        DataStream<LoginEvent> loginDataStream = dataStream
                .filter(line -> line.split(",").length >= 4)
                .map(line -> {
                    String[] s = line.split(",");
                    return new LoginEvent(Long.valueOf(s[0]), s[1], s[2], Long.valueOf(s[3]) * 1000);
                }).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp()));

        Pattern<LoginEvent, LoginEvent> loginFailPattern =
                Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getLoginState());
                    }
                }).next("secondFail").where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return "fail".equals(loginEvent.getLoginState());
                    }
                }).within(Time.seconds(2));

        PatternStream<LoginEvent> patternStream =
                CEP.pattern(loginDataStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        patternStream.select(new MyPatternSelectFunction()).print();

        env.execute("Table SQL");

        System.out.println("耗時: " + (System.currentTimeMillis() - start) / 1000);

    }

    public static class MyPatternSelectFunction
            implements PatternSelectFunction<LoginEvent, Tuple4<Long, Long, Long, String>> {

        @Override
        public Tuple4<Long, Long, Long, String> select(Map<String, List<LoginEvent>> map) throws Exception {
            LoginEvent firstFail = map.get("firstFail").iterator().next();
            LoginEvent secondFail = map.get("secondFail").get(0);
            return new Tuple4<>(firstFail.getUserId(), firstFail.getTimestamp(),
                    secondFail.getTimestamp(), "fail "+ map.size() + " times");
        }
    }

}

測試data

5402,83.149.11.115,success,1558430815
23064,66.249.3.15,fail,1558430826
5692,80.149.25.29,fail,1558430833
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,success,1558430845
1035,83.149.9.216,fail,1558430843
1035,83.149.9.216,success,1558430846
1035,83.149.24.26,fail,1558430844
7328,193.114.45.13,success,1558430848
29607,66.249.73.135,success,1558430847
2133,50.16.19.13,success,1558430857
6745,66.249.73.185,success,1558430859
76456,110.136.166.128,success,1558430853
8345,46.105.14.53,success,1558430855
76456,110.136.166.128,success,1558430857
76456,110.136.166.128,success,1558430854
76456,110.136.166.128,fail,1558430859
76456,110.136.166.128,success,1558430861
3464,123.125.71.35,success,1558430860
76456,110.136.166.128,success,1558430865
65322,50.150.204.184,success,1558430866
23565,207.241.237.225,fail,1558430862
8455,200.49.190.101,success,1558430867
8455,200.49.190.100,success,1558430865
8455,200.49.190.101,success,1558430869
8455,200.49.190.101,success,1558430872
32031,66.249.73.185,success,1558430875
12018,66.249.73.135,success,1558430874
12018,66.249.73.135,success,1558430879
12018,66.249.73.135,success,1558430881
21419,67.214.178.190,success,1558430882
21419,67.214.178.190,success,1558430880
23565,207.241.237.220,success,1558430881
2386,46.105.14.53,success,1558430883
23565,207.241.237.227,success,1558430884
83419,91.177.205.119,success,1558430881
83419,91.177.205.119,fail,1558430882
83419,91.177.205.119,success,1558430885
83419,91.177.205.119,fail,1558430886
83419,91.177.205.119,success,1558430884
83419,91.177.205.119,success,1558430886
4325,26.249.73.15,success,1558430888
2123,207.241.237.228,success,1558430887
21083,207.241.237.101,success,1558430889
13490,87.169.99.232,success,1558430886
93765,209.85.238.199,success,1558430890
93765,209.85.238.199,success,1558430892
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市麸锉,隨后出現(xiàn)的幾起案子韩脏,更是在濱河造成了極大的恐慌,老刑警劉巖咱揍,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機挤巡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來酷麦,“玉大人矿卑,你說我怎么就攤上這事∥秩模” “怎么了母廷?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長糊肤。 經(jīng)常有香客問我琴昆,道長,這世上最難降的妖魔是什么馆揉? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任业舍,我火速辦了婚禮,結(jié)果婚禮上升酣,老公的妹妹穿的比我還像新娘舷暮。我一直安慰自己,他們只是感情好噩茄,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布下面。 她就那樣靜靜地躺著,像睡著了一般绩聘。 火紅的嫁衣襯著肌膚如雪沥割。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天凿菩,我揣著相機與錄音驯遇,去河邊找鬼。 笑死蓄髓,一個胖子當著我的面吹牛叉庐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播会喝,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼陡叠,長吁一口氣:“原來是場噩夢啊……” “哼玩郊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起枉阵,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤译红,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后兴溜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體侦厚,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年拙徽,在試婚紗的時候發(fā)現(xiàn)自己被綠了刨沦。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡膘怕,死狀恐怖想诅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情岛心,我是刑警寧澤来破,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站忘古,受9級特大地震影響徘禁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜髓堪,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一晌坤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧旦袋,春花似錦骤菠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至祭阀,卻和暖如春鹉戚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背专控。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工抹凳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人伦腐。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓赢底,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子幸冻,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容