logback通過配置自動發(fā)送kafka消息并存入es

步驟

logback的AppenderBase和UnsynchronizedAppenderBase

先來段logback配置

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>%d [%-5level][%t][%c][%X{tenant}][%X{requestId}] %m%n</Pattern>
        </encoder>
    </appender>

    <appender name="logfile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>${LOG_HOME}/xxx-%d{yyyy-MM-dd}.log</FileNamePattern>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>[%d{HH:mm:ss:SSS}][%5p][%c:%L] %m%n</pattern>
        </layout>
    </appender>
    
<!--    <appender name="kafka" class="com.xxx.util.logback.UnblockedKafkaAppender"> -->
<!--        <needFilter>true</needFilter> -->
<!--        <includingPackage>com.xxx.mirc.redis.dubbo</includingPackage> -->
<!--    </appender> -->
    <root level="info">
<!--         <appender-ref ref="kafka"/> -->
        <appender-ref ref="logfile"/>
<!--         <appender-ref ref="STDOUT"/> -->
    </root>
</configuration>

上述的appender標(biāo)簽就是配置的logback處理類。有使用logback提供的ConsoleAppender稻据,RollingFileAppender蕾域。同時(shí)也可以自定義擴(kuò)展appender。
logback提供的抽象處理類馋袜。AppenderBase,UnsynchronizedAppenderBase舶斧,用來提供擴(kuò)展支持欣鳖。分析下源碼。

abstract public class UnsynchronizedAppenderBase<E> extends ContextAwareBase implements
    Appender<E> {
  private ThreadLocal<Boolean> guard = new ThreadLocal<Boolean>();
  public void doAppend(E eventObject) {
 }

  abstract protected void append(E eventObject);
}
abstract public class AppenderBase<E> extends ContextAwareBase implements
    Appender<E> {
   private boolean guard = false;
  public void doAppend(E eventObject) {
 }

  public synchronized void doAppend(E eventObject);
}

其實(shí)這兩個(gè)類茴厉,大多代碼都一樣泽台。

實(shí)現(xiàn)的功能都是記錄Status狀態(tài)什荣,然后檢查Appender上的Filter是否滿足條件,最后再調(diào)用子類的doAppend方法怀酷。用到設(shè)計(jì)模式:模板方法稻爬。

但是區(qū)別在于Appender的doAppend方法是synchronized的,UnsynchronizedAppenderBase則是用ThreadLocal的方式存儲guard狀態(tài)值蜕依。
自定義一個(gè)擴(kuò)展類桅锄,實(shí)現(xiàn)發(fā)送kafka消息:

public class UnblockedKafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent>{
    BaseKafkaProducer<LogBackKafkaVo> producer;
    private static Set<String> includeSet = new HashSet<String>();
    private String includingPackage;
    private String kafkaBrokerPath;

    private boolean needFilter=true;

    public boolean isNeedFilter() {
        return needFilter;
    }

    @Override
    protected void append(ILoggingEvent eventObject) {
        if (needFilter) {
            boolean flag=false;
            if(CollectionUtils.isNotEmpty(includeSet)){
                for(String regex:includeSet){
                    if(eventObject.getLoggerName().matches(regex)){
                        flag=true;
                        break;
                    }
                }
            }
            if(!flag)
                return;
        }
        LogBackKafkaVo vo = new LogBackKafkaVo().build(eventObject);
        if (producer != null)
            try {
                producer.sendMsg(vo);
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
    

    @Override
    public void start() {
        super.start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                initProducer();
            }
        }).start();
    }
    
    private void initProducer(){
        while (!FileReaderUtils.existsFile("kafka.properties")) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
        if (needFilter) {
            if (StringUtils.isBlank(includingPackage))
                return;
            for (String s : includingPackage.split(",")) {
                includeSet.add(s+".*");
            }
        }
        producer = new LogBackLoggerProducer();
        try {
            producer.kafkaProducerConfig=producer.initConfig(kafkaBrokerPath);
            producer.setProducer_type(KafkaConstant.ASYNC);
            producer.init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka操作/ 生產(chǎn)者為例

定義底層kafka操作類

public class ProducerProxy<T1, T2> {
    public ProducerProxy(ProducerConfig producerConfig,int size) {
        for(int i=0;i<size;i++){
            Producer<T1, T2> producer=new Producer<T1, T2>(producerConfig);
            prodMap.put(i, producer);
            queue.offer(producer);
        }
    }

    private Map<Integer,Producer<T1, T2>> prodMap=new HashMap<Integer,Producer<T1, T2>>();
    private ConcurrentLinkedQueue<Producer<T1, T2>> queue=new ConcurrentLinkedQueue<Producer<T1, T2>>();
    public void send(List<KeyedMessage<T1, T2>> messages) {
        if (prodMap.isEmpty())
            throw new IllegalStateException("prodMap can not be null");
            int i = java.util.concurrent.ThreadLocalRandom.current().nextInt(
                    prodMap.size());
            prodMap.get(i).send(messages);
    }
    public void close() {
        for(Producer<T1, T2> prod:prodMap.values())
            prod.close();
    }
    
    public void send(KeyedMessage<T1, T2> msg) {
        if (prodMap.isEmpty())
            throw new IllegalStateException("prodMap can not be null");
            int i = java.util.concurrent.ThreadLocalRandom.current().nextInt(
                    prodMap.size());
            prodMap.get(i).send(msg);
    }
    public Producer<T1, T2> pollProducer() {
        return queue.poll();
    }
}

ProducerProxy意義在于:創(chuàng)建多個(gè)producer,調(diào)用時(shí)样眠,隨機(jī)分配友瘤;

public class AbstractKafkaProducer {
    protected static ProducerProxy<String, String> producer;
    protected static ProducerProxy<byte[], byte[]> byteProducer;
    protected static Map<String,AsyncKafkaMessageProducer> kafkaMessageCacheMap = new HashMap<String,AsyncKafkaMessageProducer>(2);
    static Map<String,AbstractKafkaProducer> mapProducer=new HashMap<String,AbstractKafkaProducer>(2);
    private static AbstractKafkaProducer abstractKafkaProducer=new AbstractKafkaProducer();
    public static AbstractKafkaProducer getInstance(){
        return abstractKafkaProducer;
    }
    protected AbstractKafkaProducer(){
        if(!mapProducer.isEmpty())
            return;
        mapProducer.put(KafkaConstant.STRING, StringKafkaProducer.getInstance());
    }
    public void setProducer(ProducerProxy<String, String> producer) {
        AbstractKafkaProducer.producer = producer;
    }
    public void setByteProducer(ProducerProxy<byte[], byte[]> byteProducer) {
        AbstractKafkaProducer.byteProducer = byteProducer;
    }
    public void sendMsg(String prodtype,String serializerType,Object msg,String topic,String... key) throws IOException{
        mapProducer.get(serializerType).sendMsg(prodtype,msg,topic,key);
    }
    
    protected void sendMsg(String prodtype,Object msg,String topic,String... key) throws IOException{
    }
}

提供基礎(chǔ)kafka操作類。

提供基礎(chǔ)kafka操作類

public abstract class BaseKafkaProducer<T> implements KafkaProducer<T> {
    public static ProducerProxy<String, String> getProducer() {
        return producer;
    }

    public static ProducerProxy<byte[], byte[]> getByteProducer() {
        return byteProducer;
    }

    protected Properties initProducer(KafkaProducerConfig kafkaProducerConfig,
            String... home) {
        final Properties props = new Properties();
        
        props.put(KafkaConstant.Producer.metadata_broker_list, kafkaProducerConfig.getMetadata_broker_list());
        ...
        return props;
    }

    /**
     * 發(fā)送管理事件
     * 
     * @throws Exception
     */
    @Override
    public boolean sendMsg(final T t,String... key) throws Exception {
        if (t == null) {
            return false;
        }
        try {
            resetTopic();
            String type = producer_type;
            Object o = generateMsg(t);
            AbstractKafkaProducer
                    .getInstance()
                    .sendMsg(
                            type,
                            KafkaConstant.BYTEENCODER
                                    .equals(produce_serilize_class) ? KafkaConstant.BYTE
                                    : KafkaConstant.STRING, o, topic,key);
            return true;
        } catch (final Exception e) {
            logger.error("send msg to jump mq exception:", e);
            throw e;
        } catch (final Error e) {
            logger.error("send msg to jump mq error:", e);
            throw e;
        }
    }

    public void init(){}

    protected abstract void resetTopic();

    protected Object generateMsg(T t) {
        return t;
    }
}

暴露使用方式

@Configuration
public class DubboKafkaProducerConfiguration  {
    @Bean(name = "dubboLoggerProducer")
    public DubboLoggerProducer dubboLoggerProducer() throws IOException {
        BaseKafkaProducer<DubboInvokeDetail> dubboProducer=new DubboLoggerProducer();
        try {
            dubboProducer.setProducer_type(KafkaConstant.ASYNC);
            dubboProducer.init();
            KafkaDubboUtil.setLogSender((KafkaProducer<DubboInvokeDetail>) dubboProducer);
        } catch (Exception e) {
            LOG.error(e.getMessage(),e);
            return null;
        }
        return (DubboLoggerProducer) dubboProducer;
    }

}

es操作

寫個(gè)kafka消費(fèi)程序檐束,寫入es即可辫秧。

結(jié)果說明

{
    "_index":"logback-2017",
    "_type":"com.xxx.util.logback.LogBackKafkaVo",
    "_id":"hacyp0pdrvtt",
    "_version":1,
    "_score":1,
    "_source":{
        "argumentArray":"[]/r/n",
        "callerDataArray":"",
        "formattedMessage":"不存在memberno[]",
        "level":"INFO",
        "loggerContextVO":"LoggerContextVO{name='default', propertyMap={HOSTNAME=wx-test}, birthTime=1495017602946}",
        "loggerName":"com.xxx.Object",
        "shardId":25,
        "status":0,
        "threadName":"ConsumeMessageThread_10",
        "timeStamp":"2017-05-17 18:52:26"
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市厢塘,隨后出現(xiàn)的幾起案子茶没,更是在濱河造成了極大的恐慌,老刑警劉巖晚碾,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抓半,死亡現(xiàn)場離奇詭異,居然都是意外死亡格嘁,警方通過查閱死者的電腦和手機(jī)笛求,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來糕簿,“玉大人探入,你說我怎么就攤上這事《” “怎么了蜂嗽?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長殃恒。 經(jīng)常有香客問我植旧,道長,這世上最難降的妖魔是什么离唐? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任病附,我火速辦了婚禮,結(jié)果婚禮上亥鬓,老公的妹妹穿的比我還像新娘完沪。我一直安慰自己,他們只是感情好嵌戈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布覆积。 她就那樣靜靜地躺著听皿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪宽档。 梳的紋絲不亂的頭發(fā)上写穴,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機(jī)與錄音雌贱,去河邊找鬼。 笑死偿短,一個(gè)胖子當(dāng)著我的面吹牛欣孤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播昔逗,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼降传,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了勾怒?” 一聲冷哼從身側(cè)響起婆排,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎笔链,沒想到半個(gè)月后段只,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鉴扫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年赞枕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坪创。...
    茶點(diǎn)故事閱讀 39,981評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡炕婶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出莱预,到底是詐尸還是另有隱情柠掂,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布依沮,位于F島的核電站涯贞,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏悉抵。R本人自食惡果不足惜肩狂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望姥饰。 院中可真熱鬧傻谁,春花似錦、人聲如沸列粪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至态蒂,卻和暖如春杭措,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背钾恢。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工手素, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瘩蚪。 一個(gè)月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓泉懦,卻偏偏與公主長得像,于是被迫代替她去往敵國和親疹瘦。 傳聞我的和親對象是個(gè)殘疾皇子崩哩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)言沐,斷路器邓嘹,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 在應(yīng)用程序中添加日志記錄總的來說基于三個(gè)目的:監(jiān)視代碼中變量的變化情況,周期性的記錄到文件中供其他應(yīng)用進(jìn)行統(tǒng)計(jì)分析...
    時(shí)待吾閱讀 5,049評論 1 13
  • 在應(yīng)用程序中添加日志記錄總的來說基于三個(gè)目的:監(jiān)視代碼中變量的變化情況险胰,周期性的記錄到文件中供其他應(yīng)用進(jìn)行統(tǒng)計(jì)分析...
    時(shí)待吾閱讀 4,985評論 0 6
  • 這次分享的內(nèi)容是最近在孩子學(xué)校家長會上的一次發(fā)言汹押。 在接到這個(gè)光榮的“政治任務(wù)”時(shí),正好在埋頭苦寫部門工作總結(jié)及思...
    牛爸與波波俠閱讀 333評論 0 0
  • 人生即將過半,回想前半生缨睡,除了學(xué)業(yè)不斷精進(jìn)與家庭幸福以外鸟悴,其他很多生命的維度都是非常欠缺的。最近奖年,對自己過往的人生...
    Sunny之生命管理閱讀 1,263評論 7 15