import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.jd.common.Location;
import com.jd.constant.DataBaseConstant;
import com.jd.constant.Globals;
import com.jd.datasource.MysqlManager;
import com.jd.datasource.MysqlPoolManager;
import com.jd.entity.HoneypotAttack;
import com.jd.util.DateUtils;
import com.jd.util.GeoHelper;
import com.jd.util.IDGeneratorUtils;
import com.jd.util.ListUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.source.kafka.KafkaSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class InternalAssetInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory
.getLogger(InternalAssetInterceptor.class);
private Charset charset = Charsets.UTF_8;
private Context context;
private MysqlPoolManager mysqlPool = null;
int expiredTime = 3600;
private static Map<String, String> internalAssetMap = new HashMap<>();
public InternalAssetInterceptor(Context context){
Integer expired = context.getInteger("dataexpired");
if (expired != null) {
expiredTime = expired;
}
this.context = context;
}
@Override
public void initialize() {
try {
mysqlPool = new MysqlPoolManager(context);
mysqlPool.init();
initAttackDic();
Thread t = new Thread(() -> {
try {
Thread.sleep(expiredTime * 1000);
} catch (Exception e) {}
logger.info("start sync attackInfo");
initAttackDic();
});
t.setDaemon(true);
t.start();
} catch (Exception e) {
logger.error("flume agent init failed", e);
throw new RuntimeException(e);
}
}
private void initAttackDic() {
List<Map<String, Object>> mapList = new MysqlManager(mysqlPool).queryForList("select * from op_np_pin");
for (Map<String, Object> map: mapList) {
try {
String pin = map.get("pin").toString();
String ip = map.get("ip").toString();
internalAssetMap.put(ip, pin);
} catch (Exception e) {
logger.error("sync interval asset error {}", map);
}
}
mapList = null;
logger.info("sync internal asset success");
}
@Override
public Event intercept(Event event){
try {
String oldBody = new String(event.getBody(), charset);
JSONObject body = JSON.parseObject(oldBody);
JSONObject jdcloud_alert = body.getJSONObject("jdcloud_alert");
String floattingIp = jdcloud_alert.getString("floating_ip");
String pin = jdcloud_alert.getString("pin");
if (StringUtils.isEmpty(pin)) {
String depart = internalAssetMap.getOrDefault(floattingIp, "");
jdcloud_alert.put("pin", depart);
}
event.setBody(body.toJSONString().getBytes(charset));
return event;
}catch (Exception e) {
logger.error("Failed to process event " + event, e);
event = null;
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> out = Lists.newArrayList();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
private Context context;
@Override
public Interceptor build() {
return new InternalAssetInterceptor(context);
}
@Override
public void configure(Context context) {
this.context = context;
}
}
}
flume自定義Inteceptor
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缩功,“玉大人晴及,你說我怎么就攤上這事〉招浚” “怎么了虑稼?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長势木。 經(jīng)常有香客問我蛛倦,道長,這世上最難降的妖魔是什么啦桌? 我笑而不...
- 正文 為了忘掉前任溯壶,我火速辦了婚禮,結(jié)果婚禮上甫男,老公的妹妹穿的比我還像新娘且改。我一直安慰自己,他們只是感情好查剖,可當我...
- 文/花漫 我一把揭開白布钾虐。 她就那樣靜靜地躺著,像睡著了一般笋庄。 火紅的嫁衣襯著肌膚如雪效扫。 梳的紋絲不亂的頭發(fā)上,一...
- 文/蒼蘭香墨 我猛地睜開眼摹迷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了郊供?” 一聲冷哼從身側(cè)響起峡碉,我...
- 正文 年R本政府宣布,位于F島的核電站载城,受9級特大地震影響肌似,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜诉瓦,卻給世界環(huán)境...
- 文/蒙蒙 一川队、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧睬澡,春花似錦固额、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昔脯,卻和暖如春啄糙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背云稚。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 1.1.自定義Sink說明 需求如下:從網(wǎng)絡(luò)端口當中發(fā)送數(shù)據(jù)禁荒,自定義sink,使用sink從網(wǎng)絡(luò)端口接收數(shù)據(jù)角撞,然后...
- 1.1.自定義Source說明 官方提供了自定義source的接口說明: https://flume.apache...
- 1.背景介紹 Flume提供對數(shù)據(jù)進行簡單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力劣领。Flume有各種自帶的攔截器...
- 1、在官方文檔中查看自定義 2村生、pom.xml 3惊暴、編寫自己的sink 4、 編寫自己的source 5趁桃、打包測試...
- 官網(wǎng)和github上都提供了source包卫病,借助提供的kafkasink代碼油啤,在其中增加分區(qū)邏輯,實現(xiàn)flume讀...