????現(xiàn)在在接手一個BI分析項目,最近呢有一個需求培愁,要提供2小時以內(nèi)缓窜,同一客戶對同一功能重復操作十次的日志統(tǒng)計私股,我們在查詢相關(guān)的服務(wù)類上添加了aop恩掷,獲取用戶名稱黄娘,ip优床,操作時間羔巢,操作的報表等等信息竿秆,因為這個需求偏向于審計或者內(nèi)審需求幽钢,并沒有要求完成報表的設(shè)計匪燕,只需要提供異常的日志帽驯,所以利凑,實現(xiàn)的方式就比較的靈活了嫌术,在我尋思咋去做這件事情的時候割按,我們的業(yè)務(wù)小哥适荣,要去了完整的統(tǒng)計信息束凑,EXCEL大法好废恋,花了半小時給整出來了鱼鼓,汗顏拟烫,大體意思上,先按照小時進行統(tǒng)計迄本,后按照相鄰小時累加統(tǒng)計硕淑,但每個月都要手動整理,也迫使我去完成了這部分的需求(羅列幾種方式)嘉赎。
sql直接完成
????因為我們是將這種操作信息存在了數(shù)據(jù)表中置媳,第一反應(yīng)就是直接寫sql(基于mysql的sql)
SELECT
rfcl1.OP_NAME,
rfcl1.USER_NAME,
rfcl1.UNIT_CODE,
rfcl1.SYSTEM_CODE,
count(*)
FROM
fr_cpt_log rfcl1,
fr_cpt_log rfcl2
where
rfcl1.ip=rfcl2.ip and rfcl1.OP_NAME = rfcl2.OP_NAME
and ADDTIME(rfcl1.OP_DATE,'2:0:0')>=rfcl2.OP_DATE
and rfcl1.id<rfcl2.id
group by
rfcl1.OP_NAME,
rfcl1.USER_NAME,
rfcl1.UNIT_CODE,
rfcl1.SYSTEM_CODE
having count(*)>=10
????sql的核心處理方式,使用自連接的方式以ip和操作名稱為限制條件進行累加計算公条,但這種自連接的累計方式有很大的問題拇囊,我當時是在測試環(huán)境測試并沒有發(fā)現(xiàn)問題,操作的日志只有幾w左右寥袭,但實際在生產(chǎn)環(huán)境進行的時候,發(fā)現(xiàn)跑不出來,不按月切割的情況下報表但操作記錄大概有幾百萬行,而直接進行自連接做笛卡爾積的話窗声,這個數(shù)據(jù)量可以上hadoop了见剩,看了下羹呵,大概數(shù)據(jù)量有個小十萬就不用考慮這種方式去進行了盈简。
????當然去按照時間進行切分,多加一些中間臨時表去做這些事情肯定也是能搞定了坚俗,這里就不多說了。
用緩存計數(shù)
????這個方案偏向于實時累加計算處理了,之前呢為了做限流操作翅楼,寫了個在redis中緩存同一ip操作同一個功能一分鐘超過5次就進行消息提醒的東西(沒辦法BI渲染太耗cpu),這里涉及線上代碼就不貼了管嬉,但實現(xiàn)但方式是相似但胎挎,以IP+OP_NAME為key,進行累加蹬竖,TTL給個2小時旦装,在aop里獲取超過10次就刷到異常表里就ok了眨八。
????這里就是個redis的簡單使用段誊,但是呢,說白了這種日志統(tǒng)計功能沒必要有這么強的實時性,或者說去消耗緩存和服務(wù)器資源青灼,所以并沒去在服務(wù)器上做這些事情檀夹,動生產(chǎn)環(huán)境的代碼和生產(chǎn)環(huán)境加表過于麻煩了。
拉取日志文件督赤,java解決
????因為獲取異常信息,整個過程可能不僅僅是獲取病蛉,還需要考慮的是呈現(xiàn)的方式,所以這里完成了一個獲取生成excel的邏輯析恋,因為查詢?nèi)藛T的組織不同,按照組織信息將操作的異常記錄進行切分
具體的實現(xiàn)步驟為:
1.文件切割
我這邊將查詢的日志進行規(guī)整挠唆,用戶名患膛,查詢時間布疙,查詢操作顿涣,查詢的對象4列酝豪,ip因為內(nèi)部跳轉(zhuǎn)原因獲取的不對毙籽,這里不做顯示。
這里我們將文件做切割處理(應(yīng)對巨量的log文件)垮衷,以用戶名+查詢對象做hash計算,這里假定分布的文件為10個雌隅,那么我們輸入輸出流的形式讀取每一行蹦渣,以每一行除10取模,以這個模為新文件名創(chuàng)建文件恕酸,相同的操作+用戶名必定分配到同一文件堪滨,這樣哪小文件進行增量的文件計算可避免oom
private void preProcess() throws IOException {
BufferedInputStream fis = null;
BufferedReader reader = null;
try{
//Path newfile = FileSystems.getDefault().getPath(filename);
fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的緩沖讀取文本文件
reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),_5M);
//假設(shè)文件是10G,那么先根據(jù)hashcode拆成小文件蕊温,再進行讀寫判斷
//如果不拆分文件袱箱,將ip地址當做key,訪問時間當做value存到hashmap時义矛,
//當來訪的ip地址足夠多的情況下发笔,內(nèi)存開銷吃不消
//存放ip的hashcode->accessTimes集合
Map<String, List<String>> hashcodeMap = new HashMap<String,List<String>>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
line=line.replaceAll("\t","");
String split[] = line.split(delimiter);
String reportName;
String username;
if(split != null && split.length >= 2){
//根據(jù)username的hashcode這樣拆分文件,拆分后的文件大小在1G上下波動
//ip+操作內(nèi)容 取哈希
username = split[0].replaceAll("\"","");
reportName = split[3].replaceAll("\"","");
/*magic 為拆分的細粒度*/
int serial = (username+reportName).hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List<String> lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList<String>();
hashcodeMap.put(splitFilename, lines);
}
lines.add(line);
}
count ++;
if(count > 0 && count % BATCH_MAGIC == 0){
//每1000行刷一次文件
for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){
//System.out.println(entry.getKey()+"--->"+entry.getValue());
//key是hashcode value是本行的字符串
DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));
}
//一次操作1000之后清空凉翻,重新執(zhí)行
hashcodeMap.clear();
}
}
}finally {
reader.close();
fis.close();
}
}
文件append方法
/**
* 根據(jù)給出的數(shù)據(jù)了讨,往給定的文件形參中追加一行或者幾行數(shù)據(jù)
*
* @param splitFilename
* @throws IOException
*/
public static Path appendFile(String splitFilename,
Iterable<? extends CharSequence> accessTimes, Charset cs) throws IOException {
if(accessTimes != null){
Path target = Paths.get(splitFilename);
File file = new File(splitFilename);
if(!file.exists()){
createFile(splitFilename);
}
return Files.write(target, accessTimes, cs, StandardOpenOption.APPEND);
}
return null;
}
2.讀取制定文件目錄下的小文件,累計2小時內(nèi)的重復操作
/**
* 遞歸執(zhí)行制轰,將2小時內(nèi)訪問超過閾值的ip找出來
*
* @param parent
* @return
* @throws IOException
*/
private void recurseFile(Path parent, Map<String,List<Date>> resMap) throws IOException{
//Path target = Paths.get(dir);
if(!Files.exists(parent) || !Files.isDirectory(parent)){
return;
}
List<File> fileList= Arrays.asList(new File(root).listFiles());
for(File file:fileList){
if(file.getName().startsWith(FILE_PRE)){
List<String> lines = Files.readAllLines(file.toPath(), Charset.forName("UTF-8"));
judgeAndcollection(lines,resMap);
/*
這里的resMap可以里立即處理了前计,沒必要一直進行迭代,結(jié)果集還是全量的垃杖,這里增量的append到excel中
*/
if(!CollectionUtils.isEmpty(resMap)){
Map<String,List<OperationInfo>> operationInfoMap = new HashMap<>();
transerToObject(resMap,operationInfoMap);
if(!CollectionUtils.isEmpty(operationInfoMap)){
operationInfoMap.entrySet().forEach(stringEntry -> {
String fileName = result + stringEntry.getKey()+"異常操作日志.xlsx";
File fileCheck = new File(fileName);
if(fileCheck.exists()){
try {
EasyexcelUtils.addExcel(fileName,stringEntry.getValue());
} catch (IOException e) {
e.printStackTrace();
} catch (InvalidFormatException e) {
e.printStackTrace();
}
}else{
EasyExcel.write(fileName, OperationInfo.class)
.sheet(stringEntry.getKey()).doWrite(stringEntry.getValue());
}
});
}
resMap.clear();
}
}
}
}
此處的核心處理邏輯就是在遍歷小文件的時候男杈,判斷并收集超過操作閾值的對象
/**
* 根據(jù)從較小文件讀上來的每行ip accessTimes進行判斷符合條件的ip
* 并放入resMap
*
* @param lines
* @param resMap
*/
private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) {
if(lines != null){
//ip->List<String>accessTimes
Map<String,List<String>> judgeMap = new HashMap<String,List<String>>();
for(String line : lines){
line = line.trim();
line=line.replaceAll("\t","");
String split[] = line.split(delimiter);
String userName =split[0];
String opt =split[3].replaceAll("\"","");
List<String> accessTimes = judgeMap.get(userName+"#"+opt);
if(accessTimes == null){
accessTimes = new ArrayList<String>();
}
accessTimes.add(split[1]);
judgeMap.put(userName+"#"+opt, accessTimes);
}
if(judgeMap.size() == 0){
return;
}
for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){
List<String> acessTimes = entry.getValue();
//相同ip,先判斷整體大于10個
if(acessTimes != null && acessTimes.size() >= 10){
//開始判斷在List集合中调俘,120分鐘內(nèi)訪問超過MAGIC=10
List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 120 * 60 * 1000, 10);
if(attackTimes != null){
resMap.put(entry.getKey(), attackTimes);
}
}
}
}
}
將小文件的行進行格式化為map<user_name+opt,op_time>
再在hashmap中進行遍歷匯總
public static List<Date> attackList(List<String> dateStrs,long intervalDate,int magic){
if(dateStrs == null || dateStrs.size() < magic){
return null;
}
List<Date> dates = new ArrayList<Date>();
for(String date : dateStrs){
if(date != null && !"".equals(date))
dates.add(stringToDate(date,"dd/MM/yyyy hh:mm:ss"));
}
Collections.sort(dates);
return attackTimes(dates,intervalDate,magic);
}
這里對日期格式進行了轉(zhuǎn)換
/**
* 判斷在間隔時間內(nèi)伶棒,是否有大于magic的上限的數(shù)據(jù)集合,
* 如果有彩库,則返回滿足條件的集合
* 如果找不到滿足條件的肤无,就返回null
*
* @param sequenceDates 已經(jīng)按照時間順序排序了的數(shù)組
* @param intervalDate
* @param magic
* @return
*/
public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){
if(sequenceDates == null || sequenceDates.size() < magic){
return null;
}
List<Date> res = new ArrayList<Date>();
for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){
Date souceDate = sequenceDates.get(x);
Date dateAfter5 = new Date(souceDate.getTime() + intervalDate);
res.add(souceDate);
for(int i = x + 1;i< sequenceDates.size();i++){
Date compareDate = sequenceDates.get(i);
if(compareDate.before(dateAfter5)){
res.add(compareDate);
}else
break;
}
if(res.size() >= magic)
return res;
else
res.clear();
}
return null;
}
然后就是個循環(huán)累計的過程
3.拿到獲取信息進行文件渲染
利用hadoop進行處理
手頭現(xiàn)在剛好有個大數(shù)據(jù)集群,目前這部分數(shù)據(jù)的實時性不高侧巨,可以利用ETL拉到hive里跑舅锄,目前沒有需求去做,準備做司忱。