flume自定義Inteceptor

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.jd.common.Location;
import com.jd.constant.DataBaseConstant;
import com.jd.constant.Globals;
import com.jd.datasource.MysqlManager;
import com.jd.datasource.MysqlPoolManager;
import com.jd.entity.HoneypotAttack;
import com.jd.util.DateUtils;
import com.jd.util.GeoHelper;
import com.jd.util.IDGeneratorUtils;
import com.jd.util.ListUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.source.kafka.KafkaSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class InternalAssetInterceptor implements Interceptor {
    private static final Logger logger = LoggerFactory
            .getLogger(InternalAssetInterceptor.class);

    private Charset charset = Charsets.UTF_8;
    private Context context;
    private MysqlPoolManager mysqlPool = null;
    int expiredTime = 3600;
    private static Map<String, String> internalAssetMap = new HashMap<>();



    public InternalAssetInterceptor(Context context){
        Integer expired = context.getInteger("dataexpired");
        if (expired != null) {
            expiredTime = expired;
        }
        this.context = context;
    }

    @Override
    public void initialize() {
        try {
            mysqlPool = new MysqlPoolManager(context);
            mysqlPool.init();
            initAttackDic();
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(expiredTime * 1000);
                } catch (Exception e) {}
                logger.info("start sync attackInfo");
                initAttackDic();
            });
            t.setDaemon(true);
            t.start();
        } catch (Exception e) {
            logger.error("flume agent init failed", e);
            throw new RuntimeException(e);
        }
    }

    private void initAttackDic() {
        List<Map<String, Object>> mapList = new MysqlManager(mysqlPool).queryForList("select * from op_np_pin");
        for (Map<String, Object> map: mapList) {
            try {
                String pin = map.get("pin").toString();
                String ip = map.get("ip").toString();
                internalAssetMap.put(ip, pin);
            } catch (Exception e) {
                logger.error("sync interval asset error {}", map);
            }
        }
        mapList = null;
        logger.info("sync internal asset success");
    }

    @Override
    public Event intercept(Event event){
        try {
            String oldBody = new String(event.getBody(), charset);
            JSONObject body = JSON.parseObject(oldBody);
            JSONObject jdcloud_alert = body.getJSONObject("jdcloud_alert");
            String floattingIp = jdcloud_alert.getString("floating_ip");
            String pin = jdcloud_alert.getString("pin");
            if (StringUtils.isEmpty(pin)) {
                String depart = internalAssetMap.getOrDefault(floattingIp, "");
                jdcloud_alert.put("pin", depart);
            }
            event.setBody(body.toJSONString().getBytes(charset));
            return event;
        }catch (Exception e) {
            logger.error("Failed to process event " + event, e);
            event = null;
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> out = Lists.newArrayList();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        private Context context;

        @Override
        public Interceptor build() {
            return new InternalAssetInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌谓厘,老刑警劉巖均唉,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡健蕊,警方通過查閱死者的電腦和手機菱阵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缩功,“玉大人晴及,你說我怎么就攤上這事〉招浚” “怎么了虑稼?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長势木。 經(jīng)常有香客問我蛛倦,道長,這世上最難降的妖魔是什么啦桌? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任溯壶,我火速辦了婚禮,結(jié)果婚禮上甫男,老公的妹妹穿的比我還像新娘且改。我一直安慰自己,他們只是感情好查剖,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布钾虐。 她就那樣靜靜地躺著,像睡著了一般笋庄。 火紅的嫁衣襯著肌膚如雪效扫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天直砂,我揣著相機與錄音菌仁,去河邊找鬼。 笑死静暂,一個胖子當著我的面吹牛济丘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播洽蛀,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼摹迷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了郊供?” 一聲冷哼從身側(cè)響起峡碉,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎驮审,沒想到半個月后鲫寄,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吉执,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年地来,在試婚紗的時候發(fā)現(xiàn)自己被綠了戳玫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡未斑,死狀恐怖咕宿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情颂碧,我是刑警寧澤荠列,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站载城,受9級特大地震影響肌似,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜诉瓦,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一川队、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧睬澡,春花似錦固额、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至昔脯,卻和暖如春啄糙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背云稚。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工隧饼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人静陈。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓燕雁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鲸拥。 傳聞我的和親對象是個殘疾皇子拐格,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容