統(tǒng)計X小時內(nèi)重復的異常操作

????現(xiàn)在在接手一個BI分析項目,最近呢有一個需求培愁,要提供2小時以內(nèi)缓窜,同一客戶對同一功能重復操作十次的日志統(tǒng)計私股,我們在查詢相關(guān)的服務(wù)類上添加了aop恩掷,獲取用戶名稱黄娘,ip优床,操作時間羔巢,操作的報表等等信息竿秆,因為這個需求偏向于審計或者內(nèi)審需求幽钢,并沒有要求完成報表的設(shè)計匪燕,只需要提供異常的日志帽驯,所以利凑,實現(xiàn)的方式就比較的靈活了嫌术,在我尋思咋去做這件事情的時候割按,我們的業(yè)務(wù)小哥适荣,要去了完整的統(tǒng)計信息束凑,EXCEL大法好废恋,花了半小時給整出來了鱼鼓,汗顏拟烫,大體意思上,先按照小時進行統(tǒng)計迄本,后按照相鄰小時累加統(tǒng)計硕淑,但每個月都要手動整理,也迫使我去完成了這部分的需求(羅列幾種方式)嘉赎。


sql直接完成

????因為我們是將這種操作信息存在了數(shù)據(jù)表中置媳,第一反應(yīng)就是直接寫sql(基于mysql的sql)

SELECT
 rfcl1.OP_NAME,
 rfcl1.USER_NAME,
 rfcl1.UNIT_CODE,
 rfcl1.SYSTEM_CODE,
 count(*)
FROM
fr_cpt_log rfcl1,
fr_cpt_log rfcl2
where 
 rfcl1.ip=rfcl2.ip and rfcl1.OP_NAME = rfcl2.OP_NAME
 and ADDTIME(rfcl1.OP_DATE,'2:0:0')>=rfcl2.OP_DATE
 and rfcl1.id<rfcl2.id
 group by  
 rfcl1.OP_NAME,
 rfcl1.USER_NAME,
 rfcl1.UNIT_CODE,
 rfcl1.SYSTEM_CODE
 having count(*)>=10

????sql的核心處理方式,使用自連接的方式以ip和操作名稱為限制條件進行累加計算公条,但這種自連接的累計方式有很大的問題拇囊,我當時是在測試環(huán)境測試并沒有發(fā)現(xiàn)問題,操作的日志只有幾w左右寥袭,但實際在生產(chǎn)環(huán)境進行的時候,發(fā)現(xiàn)跑不出來,不按月切割的情況下報表但操作記錄大概有幾百萬行,而直接進行自連接做笛卡爾積的話窗声,這個數(shù)據(jù)量可以上hadoop了见剩,看了下羹呵,大概數(shù)據(jù)量有個小十萬就不用考慮這種方式去進行了盈简。
????當然去按照時間進行切分,多加一些中間臨時表去做這些事情肯定也是能搞定了坚俗,這里就不多說了。


用緩存計數(shù)

????這個方案偏向于實時累加計算處理了,之前呢為了做限流操作翅楼,寫了個在redis中緩存同一ip操作同一個功能一分鐘超過5次就進行消息提醒的東西(沒辦法BI渲染太耗cpu),這里涉及線上代碼就不貼了管嬉,但實現(xiàn)但方式是相似但胎挎,以IP+OP_NAME為key,進行累加蹬竖,TTL給個2小時旦装,在aop里獲取超過10次就刷到異常表里就ok了眨八。
????這里就是個redis的簡單使用段誊,但是呢,說白了這種日志統(tǒng)計功能沒必要有這么強的實時性,或者說去消耗緩存和服務(wù)器資源青灼,所以并沒去在服務(wù)器上做這些事情檀夹,動生產(chǎn)環(huán)境的代碼和生產(chǎn)環(huán)境加表過于麻煩了。


拉取日志文件督赤,java解決

????因為獲取異常信息,整個過程可能不僅僅是獲取病蛉,還需要考慮的是呈現(xiàn)的方式,所以這里完成了一個獲取生成excel的邏輯析恋,因為查詢?nèi)藛T的組織不同,按照組織信息將操作的異常記錄進行切分
具體的實現(xiàn)步驟為:
1.文件切割


1576671859974.jpg

我這邊將查詢的日志進行規(guī)整挠唆,用戶名患膛,查詢時間布疙,查詢操作顿涣,查詢的對象4列酝豪,ip因為內(nèi)部跳轉(zhuǎn)原因獲取的不對毙籽,這里不做顯示。
這里我們將文件做切割處理(應(yīng)對巨量的log文件)垮衷,以用戶名+查詢對象做hash計算,這里假定分布的文件為10個雌隅,那么我們輸入輸出流的形式讀取每一行蹦渣,以每一行除10取模,以這個模為新文件名創(chuàng)建文件恕酸,相同的操作+用戶名必定分配到同一文件堪滨,這樣哪小文件進行增量的文件計算可避免oom

 private void preProcess() throws IOException {
        BufferedInputStream fis = null;
        BufferedReader reader = null;  
        try{
            //Path newfile = FileSystems.getDefault().getPath(filename);
            fis = new BufferedInputStream(new FileInputStream(new File(filename)));
            // 用5M的緩沖讀取文本文件
            reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),_5M);

            //假設(shè)文件是10G,那么先根據(jù)hashcode拆成小文件蕊温,再進行讀寫判斷
            //如果不拆分文件袱箱,將ip地址當做key,訪問時間當做value存到hashmap時义矛,
            //當來訪的ip地址足夠多的情況下发笔,內(nèi)存開銷吃不消
            //存放ip的hashcode->accessTimes集合
            Map<String, List<String>> hashcodeMap = new HashMap<String,List<String>>();
            String line = "";
            int count = 0;
            while((line = reader.readLine()) != null){
                line=line.replaceAll("\t","");
                String split[] = line.split(delimiter);
                String reportName;
                String username;
                if(split != null && split.length >= 2){
                    //根據(jù)username的hashcode這樣拆分文件,拆分后的文件大小在1G上下波動
                    //ip+操作內(nèi)容 取哈希
                    username = split[0].replaceAll("\"","");
                    reportName = split[3].replaceAll("\"","");
                    /*magic 為拆分的細粒度*/
                    int serial = (username+reportName).hashCode() % MAGIC;

                    String splitFilename = FILE_PRE + serial;
                    List<String> lines = hashcodeMap.get(splitFilename);
                    if(lines == null){
                        lines = new ArrayList<String>();

                        hashcodeMap.put(splitFilename, lines);
                    }
                    lines.add(line);
                }

                count ++;
                if(count > 0 && count % BATCH_MAGIC == 0){
                    //每1000行刷一次文件
                    for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
                        //System.out.println(entry.getKey()+"--->"+entry.getValue());
                        //key是hashcode value是本行的字符串
                        DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
                    }
                    //一次操作1000之后清空凉翻,重新執(zhí)行
                    hashcodeMap.clear();
                }
            }
        }finally {
            reader.close();
            fis.close();
        }
    }

文件append方法

    /**
     * 根據(jù)給出的數(shù)據(jù)了讨,往給定的文件形參中追加一行或者幾行數(shù)據(jù)
     *
     * @param splitFilename
     * @throws IOException
     */
    public static Path appendFile(String splitFilename,
                                  Iterable<? extends CharSequence> accessTimes, Charset cs) throws IOException {
        if(accessTimes != null){
            Path target = Paths.get(splitFilename);
            File file = new File(splitFilename);
            if(!file.exists()){
                createFile(splitFilename);
            }
            return Files.write(target, accessTimes, cs, StandardOpenOption.APPEND);
        }

        return null;
    }

2.讀取制定文件目錄下的小文件,累計2小時內(nèi)的重復操作


1576673347588.jpg
   /**
    * 遞歸執(zhí)行制轰,將2小時內(nèi)訪問超過閾值的ip找出來
    *
    * @param parent
    * @return
    * @throws IOException
    */
   private void recurseFile(Path parent, Map<String,List<Date>> resMap) throws IOException{
       //Path target = Paths.get(dir);
       if(!Files.exists(parent) || !Files.isDirectory(parent)){
           return;
       }
       List<File> fileList= Arrays.asList(new File(root).listFiles());
       for(File file:fileList){
           if(file.getName().startsWith(FILE_PRE)){
               List<String> lines = Files.readAllLines(file.toPath(), Charset.forName("UTF-8"));
               judgeAndcollection(lines,resMap);
/*
               這里的resMap可以里立即處理了前计,沒必要一直進行迭代,結(jié)果集還是全量的垃杖,這里增量的append到excel中
*/
               if(!CollectionUtils.isEmpty(resMap)){
                   Map<String,List<OperationInfo>> operationInfoMap = new HashMap<>();
                   transerToObject(resMap,operationInfoMap);
                   if(!CollectionUtils.isEmpty(operationInfoMap)){
                       operationInfoMap.entrySet().forEach(stringEntry -> {
                           String fileName = result + stringEntry.getKey()+"異常操作日志.xlsx";
                           File fileCheck = new File(fileName);
                           if(fileCheck.exists()){
                               try {
                                   EasyexcelUtils.addExcel(fileName,stringEntry.getValue());
                               } catch (IOException e) {
                                   e.printStackTrace();
                               } catch (InvalidFormatException e) {
                                   e.printStackTrace();
                               }
                           }else{
                               EasyExcel.write(fileName, OperationInfo.class)
                                        .sheet(stringEntry.getKey()).doWrite(stringEntry.getValue());
                           }
                       });
                   }
                   resMap.clear();
               }
           }
       }
   }

此處的核心處理邏輯就是在遍歷小文件的時候男杈,判斷并收集超過操作閾值的對象

    /**
     * 根據(jù)從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
     * 并放入resMap
     *
     * @param lines
     * @param resMap
     */
    private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
        if(lines != null){
            //ip->List<String>accessTimes
            Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
            for(String line : lines){
                line = line.trim();
                line=line.replaceAll("\t","");
                String split[] = line.split(delimiter);

                String userName =split[0];
                String opt =split[3].replaceAll("\"","");
                List<String> accessTimes = judgeMap.get(userName+"#"+opt);
                if(accessTimes == null){
                    accessTimes = new ArrayList<String>();
                }
                accessTimes.add(split[1]);
                judgeMap.put(userName+"#"+opt, accessTimes);
            }

            if(judgeMap.size() == 0){
                return;
            }

            for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
                List<String> acessTimes = entry.getValue();
                //相同ip,先判斷整體大于10個
                if(acessTimes != null && acessTimes.size() >= 10){
                    //開始判斷在List集合中调俘,120分鐘內(nèi)訪問超過MAGIC=10
                    List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 120 * 60 * 1000, 10);
                    if(attackTimes != null){
                        resMap.put(entry.getKey(), attackTimes);
                    }
                }
            }
        }
    }

將小文件的行進行格式化為map<user_name+opt,op_time>
再在hashmap中進行遍歷匯總

    public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
        if(dateStrs == null || dateStrs.size() < magic){
            return null;
        }

        List<Date> dates = new ArrayList<Date>();
        for(String date : dateStrs){
            if(date != null && !"".equals(date))
                dates.add(stringToDate(date,"dd/MM/yyyy hh:mm:ss"));
        }

        Collections.sort(dates);
        return attackTimes(dates,intervalDate,magic);
    }

這里對日期格式進行了轉(zhuǎn)換

    /**
     * 判斷在間隔時間內(nèi)伶棒,是否有大于magic的上限的數(shù)據(jù)集合,
     * 如果有彩库,則返回滿足條件的集合
     * 如果找不到滿足條件的肤无,就返回null
     *
     * @param sequenceDates 已經(jīng)按照時間順序排序了的數(shù)組
     * @param intervalDate
     * @param magic
     * @return
     */
    public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
        if(sequenceDates == null || sequenceDates.size() < magic){
            return null;
        }

        List<Date> res = new ArrayList<Date>();
        for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
            Date souceDate = sequenceDates.get(x);
            Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
            res.add(souceDate);
            for(int i = x + 1;i< sequenceDates.size();i++){
                Date compareDate = sequenceDates.get(i);
                if(compareDate.before(dateAfter5)){
                    res.add(compareDate);
                }else
                    break;
            }
            if(res.size() >= magic)
                return res;
            else
                res.clear();
        }
        return null;
    }

然后就是個循環(huán)累計的過程

3.拿到獲取信息進行文件渲染


1576673147212.jpg

利用hadoop進行處理

手頭現(xiàn)在剛好有個大數(shù)據(jù)集群,目前這部分數(shù)據(jù)的實時性不高侧巨,可以利用ETL拉到hive里跑舅锄,目前沒有需求去做,準備做司忱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末皇忿,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子坦仍,更是在濱河造成了極大的恐慌鳍烁,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件繁扎,死亡現(xiàn)場離奇詭異幔荒,居然都是意外死亡糊闽,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進店門爹梁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來右犹,“玉大人,你說我怎么就攤上這事姚垃∧盍矗” “怎么了?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵积糯,是天一觀的道長掂墓。 經(jīng)常有香客問我,道長看成,這世上最難降的妖魔是什么君编? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮川慌,結(jié)果婚禮上吃嘿,老公的妹妹穿的比我還像新娘。我一直安慰自己窘游,他們只是感情好唠椭,可當我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著忍饰,像睡著了一般贪嫂。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上艾蓝,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天力崇,我揣著相機與錄音,去河邊找鬼赢织。 笑死亮靴,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的于置。 我是一名探鬼主播茧吊,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼八毯!你這毒婦竟也來了搓侄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤话速,失蹤者是張志新(化名)和其女友劉穎讶踪,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泊交,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡乳讥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年柱查,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片云石。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡唉工,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出留晚,到底是詐尸還是另有隱情酵紫,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布错维,位于F島的核電站,受9級特大地震影響橄唬,放射性物質(zhì)發(fā)生泄漏赋焕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一仰楚、第九天 我趴在偏房一處隱蔽的房頂上張望隆判。 院中可真熱鬧,春花似錦僧界、人聲如沸侨嘀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咬腕。三九已至,卻和暖如春葬荷,著一層夾襖步出監(jiān)牢的瞬間涨共,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工宠漩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留举反,地道東北人。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓扒吁,卻偏偏與公主長得像火鼻,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子雕崩,可洞房花燭夜當晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容