spring+kafka 實(shí)戰(zhàn)開發(fā)例子

一啄巧、注意事項(xiàng)

1沮榜,在config/server.properties文件中有兩個(gè)端口

發(fā)布消息需要指定broker-list:9092(默認(rèn))
修改發(fā)布消息端口在 config/server.properties增加port=端口萍丐,參數(shù)來修改默認(rèn)的9092端口
接收消息是zookeeper的2181端口(默認(rèn))
1顶燕,修改接收消息端口在 config/server.properties的zookeeper.connect蝙寨,將IP和端口修改成
2,同時(shí)找到 zookeeper.properties修改clientPort喜德。
以上1,2兩處修改要確保端口一致山橄,并且要和發(fā)布消息broker端口不能相同。

2舍悯,localhost和9092是producers和consumers默認(rèn)IP和端口

所以如果你要遠(yuǎn)程連接航棱,需要修改默認(rèn)IP地址睡雇,找到config/server.properties
增加:advertised.host.name=123.57.218.39(如果需要遠(yuǎn)程連接,則需要修改IP)

3饮醇,enable.auto.commit參數(shù)問題

在kafka-consumer中enable.auto.commit參數(shù)為FALSE它抱,表示不自動(dòng)提交offset
offset是kafka發(fā)送消息監(jiān)聽,如果為FALSE朴艰,則需要手動(dòng)提交观蓄,否則消息發(fā)送成功后就默認(rèn)TRUE
本例中暫時(shí)沒找到如何手動(dòng)提交。
該場景應(yīng)用于發(fā)送消息成功后祠墅,因?yàn)槌绦虺隽藛栴}蜘腌,會(huì)導(dǎo)致發(fā)送消息的數(shù)據(jù)丟失的BUG


二、異常情況

1饵隙,org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

這種情況一參考注意事項(xiàng)撮珠,就是producers默認(rèn)的localhost,所以你遠(yuǎn)程連接超時(shí)金矛,配置advertised.host.name就可以了

2芯急,F(xiàn)ailed to send messages after 3 tries

這種情況是端口有問題,這里要注意發(fā)布消息端口和接收消息端口要區(qū)分好驶俊,不要混淆娶耍,具體參考注意事項(xiàng)



三、spring+kafka實(shí)例

maven-pom.xml配置信息:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>


<dependency>
<groupId> org.springframework.integration </groupId>
<artifactId> spring-integration-kafka </artifactId>
<version> 2.1.0.RELEASE </version>
</dependency>


<dependency>
<groupId> org.springframework </groupId>
<artifactId> spring-webmvc </artifactId>
<version> 4.3.0.RELEASE </version>
</dependency>


<dependency>
<groupId> org.springframework.kafka </groupId>
<artifactId> spring-kafka </artifactId>
<version>1.1.1.RELEASE</version>
</dependency>



producer生產(chǎn)者配置:
1.如果你的topic沒有設(shè)置名稱饼酿,按照默認(rèn)的topic的名字生成對應(yīng)的數(shù)據(jù)文件夾榕酒。
2.producerListener用來判斷kafka發(fā)送數(shù)據(jù)是否成功以及發(fā)送反饋信息。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 定義producer的參數(shù) -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="123.57.218.39:3456" /><!-- 發(fā)布者的IP和端口 -->
                <entry key="group.id" value="0" />
                <entry key="retries" value="1" />
                <entry key="batch.size" value="16384" />
                <entry key="linger.ms" value="1" />
                <entry key="buffer.memory" value="33554432" />
                <entry key="key.serializer"
                value="org.apache.kafka.common.serialization.StringSerializer" />
                <entry key="value.serializer"
                value="org.apache.kafka.common.serialization.StringSerializer" />
            </map>
        </constructor-arg>
    </bean>

    <!-- 創(chuàng)建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory"
        class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties" />
        </constructor-arg>
    </bean>

    <!-- 創(chuàng)建kafkatemplate bean故俐,使用的時(shí)候想鹰,只需要注入這個(gè)bean,即可使用template的send消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory" />
        <constructor-arg name="autoFlush" value="true" />
        <property name="defaultTopic" value="defaultTopic" />
        <property name="producerListener" ref="producerListener"/>
    </bean>
    
    <bean id="producerListener" class="com.livzon.ydw.kafka.KafkaProducerListener" /> 
</beans>



consumer配置:
1.使用kafka的listener進(jìn)行消息消費(fèi)監(jiān)聽药版,如果有消費(fèi)消息進(jìn)入會(huì)自動(dòng)調(diào)用OnMessage方法進(jìn)行消息消費(fèi)以及后續(xù)業(yè)務(wù)處理辑舷。
2.如果要配置多個(gè)topic,需要?jiǎng)?chuàng)建新的消費(fèi)者容器槽片,然后統(tǒng)一指向listner的消息處理類何缓,統(tǒng)一讓這個(gè)類進(jìn)行后續(xù)業(yè)務(wù)處理。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/tx 
     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
     http://www.springframework.org/schema/jee 
     http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 
     http://www.springframework.org/schema/context 
      http://www.springframework.org/schema/context/spring-context-3.0.xsd">
       
    
    <!-- 定義consumer的參數(shù) -->
     <bean id="consumerProperties" class="java.util.HashMap">
         <constructor-arg>
             <map>
                 <entry key="bootstrap.servers" value="123.57.218.39:3456"/><!-- 消費(fèi)者的IP和端口 -->
                 <entry key="group.id" value="0"/>
                 <entry key="enable.auto.commit" value="false"/>
                 <entry key="auto.commit.interval.ms" value="1000"/>
                 <entry key="session.timeout.ms" value="15000"/>
                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
             </map>
         </constructor-arg>
     </bean>
     
     <!-- 創(chuàng)建consumerFactory bean -->
     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
         <constructor-arg>
             <ref bean="consumerProperties"/>
         </constructor-arg>
     </bean>
     
     <!-- 實(shí)際執(zhí)行消息消費(fèi)的類 -->
     <bean id="messageListernerConsumerService" class="com.livzon.ydw.kafka.KafkaConsumerServer"/>
     
     <!-- 消費(fèi)者容器配置信息还栓,如果有多個(gè)TOPIC則配置多個(gè)容器信息碌廓,如下圖注釋處 -->
     <bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
         <constructor-arg value="wyyMsgCallBack"/><!-- 這里寫的是發(fā)布者的TOPIC -->
         <property name="messageListener" ref="messageListernerConsumerService"/>
     </bean>
     <!-- <bean id="containerProperties_other" class="org.springframework.kafka.listener.config.ContainerProperties">
         <constructor-arg value="other_test_topic"/>
         <property name="messageListener" ref="messageListernerConsumerService"/>
     </bean> -->
     
     <!-- 創(chuàng)建messageListenerContainer bean,使用的時(shí)候剩盒,只需要注入這個(gè)bean -->
     <bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
         init-method="doStart">
         <constructor-arg ref="consumerFactory"/>
         <constructor-arg ref="containerProperties_trade"/>
     </bean>
     
     <!-- <bean id="messageListenerContainer_other" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
         init-method="doStart">
         <constructor-arg ref="consumerFactory"/>
         <constructor-arg ref="containerProperties_other"/>
     </bean> -->
     
</beans>



xml文件加載:

1 <import resource="classpath:kafkaConsumer.xml" />
2 <import resource="classpath:kafkaProducer.xml" />



具體代碼實(shí)現(xiàn):
constant.java  //常量類谷婆,常量數(shù)據(jù)

package com.livzon.ydw.kafka;

/**
 * kafkaMessageConstant
 * @author yangdw
 *
 */
public class KafkaMesConstant {

    public static final String SUCCESS_CODE = "00000";
    public static final String SUCCESS_MES = "成功";
    
    //kakfa-code
    public static final String KAFKA_SEND_ERROR_CODE = "30001";
    public static final String KAFKA_NO_RESULT_CODE = "30002";
    public static final String KAFKA_NO_OFFSET_CODE = "30003";
    
    //kakfa-mes
    public static final String KAFKA_SEND_ERROR_MES = "發(fā)送消息超時(shí),聯(lián)系相關(guān)技術(shù)人員";
    public static final String KAFKA_NO_RESULT_MES = "未查詢到返回結(jié)果,聯(lián)系相關(guān)技術(shù)人員";
    public static final String KAFKA_NO_OFFSET_MES = "未查到返回?cái)?shù)據(jù)的offset,聯(lián)系相關(guān)技術(shù)人員";
    
    
}



kafkaProducerListener.java  //生產(chǎn)者監(jiān)聽(監(jiān)聽生產(chǎn)者消息情況)-打印日志

package com.livzon.ydw.kafka;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;

/**
 * kafkaProducer監(jiān)聽器,在producer配置文件中開啟
 * @author yangdw
 *
 */
@SuppressWarnings("rawtypes")
public class KafkaProducerListener implements ProducerListener{
    protected final Logger LOG = LoggerFactory.getLogger("KafkaProducerListener");
    /**
     * 發(fā)送消息成功后調(diào)用
     */
    @Override
    public void onSuccess(String topic, Integer partition, Object key,
            Object value, RecordMetadata recordMetadata) {
        LOG.info("==========kafka發(fā)送數(shù)據(jù)成功(日志開始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------RecordMetadata:"+recordMetadata);
        LOG.info("~~~~~~~~~~kafka發(fā)送數(shù)據(jù)成功(日志結(jié)束)~~~~~~~~~~");
    }

    /**
     * 發(fā)送消息錯(cuò)誤后調(diào)用
     */
    @Override
    public void onError(String topic, Integer partition, Object key,
            Object value, Exception exception) {
        LOG.info("==========kafka發(fā)送數(shù)據(jù)錯(cuò)誤(日志開始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------Exception:"+exception);
        LOG.info("~~~~~~~~~~kafka發(fā)送數(shù)據(jù)錯(cuò)誤(日志結(jié)束)~~~~~~~~~~");
        exception.printStackTrace();
    }

    /**
     * 方法返回值代表是否啟動(dòng)kafkaProducer監(jiān)聽器
     */
    @Override
    public boolean isInterestedInSuccess() {
        LOG.info("http:///kafkaProducer監(jiān)聽器啟動(dòng)///");
        return true;
    }

}



KafkaProducerServer //生產(chǎn)者實(shí)現(xiàn)類

package com.livzon.ydw.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;

import javax.annotation.Resource;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import com.google.gson.Gson;

import net.sf.json.JSONObject;

/**
 * kafkaProducer模板 使用此模板發(fā)送消息
 * 
 * @author yangdw
 *
 */
@Service
public class KafkaProducerServer {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * kafka發(fā)送消息模板
     * 
     * @param topic
     *            主題
     * @param value
     *            messageValue
     * @param ifPartition
     *            是否使用分區(qū) 0是\1不是
     * @param partitionNum
     *            分區(qū)數(shù) 如果是否使用分區(qū)為0,分區(qū)數(shù)必須大于0
     * @param role
     *            角色:bbc app erp...
     */
    public Map<String, Object> sndMesForTemplate(String topic, Object value, String ifPartition, Integer partitionNum,
            String role) {
        String key = role + "-" + value.hashCode();
        Gson gson = new Gson();
        String valueString = gson.toJson(value);
        if (ifPartition.equals("0")) {
            // 表示使用分區(qū)
            int partitionIndex = getPartitionIndex(key, partitionNum);

            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, partitionIndex, key,
                    valueString);

            Map<String, Object> res = checkProRecord(result);
            return res;
        } else {
            ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(topic, key, valueString);
            Map<String, Object> res = checkProRecord(result);
            return res;
        }
    }

    /**
     * 根據(jù)key值獲取分區(qū)索引
     * 
     * @param key
     * @param partitionNum
     * @return
     */
    private int getPartitionIndex(String key, int partitionNum) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(partitionNum);
        } else {
            int result = Math.abs(key.hashCode()) % partitionNum;
            return result;
        }
    }

    /**
     * 檢查發(fā)送返回結(jié)果record
     * 
     * @param res
     * @return
     */
    @SuppressWarnings("rawtypes")
    private Map<String, Object> checkProRecord(ListenableFuture<SendResult<String, Object>> res) {
        Map<String, Object> m = new HashMap<String, Object>();
        if (res != null) {
            try {
                SendResult r = res.get();// 檢查result結(jié)果集
                // 檢查recordMetadata的offset數(shù)據(jù),不檢查producerRecord
                Long offsetIndex = r.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    m.put("code", KafkaMesConstant.SUCCESS_CODE);
                    m.put("message", KafkaMesConstant.SUCCESS_MES);
                    return m;
                } else {
                    m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                    return m;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
                return m;
            } catch (ExecutionException e) {
                e.printStackTrace();
                m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
                return m;
            }
        } else {
            m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
            m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
            return m;
        }
    }

}

KafkaConsumerServer //消費(fèi)者實(shí)現(xiàn)類

package com.livzon.ydw.kafka;

import javax.annotation.Resource;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.listener.MessageListener;

import com.livzon.ydw.service.MsgService;

import net.sf.json.JSONObject;

/**
 * kafka監(jiān)聽器啟動(dòng) 自動(dòng)監(jiān)聽是否有消息需要消費(fèi)
 * 
 * @author yangdw
 *
 */
public class KafkaConsumerServer implements MessageListener<String, Object> {
    protected final Logger LOGGER = LoggerFactory.getLogger("KafkaConsumerServer");

    @Resource
    private MsgService msgService;

    /**
     * 監(jiān)聽器自動(dòng)執(zhí)行該方法 消費(fèi)消息 自動(dòng)提交offset 執(zhí)行業(yè)務(wù)代碼 (high level api
     * 不提供offset管理波材,不能指定offset進(jìn)行消費(fèi))
     */
    @Override
    public void onMessage(ConsumerRecord<String, Object> record) {
        String topic = record.topic(); 
        String value = (String) record.value();
        LOGGER.info("=============kafka監(jiān)聽消息開始=============topic:" + topic + "=======value" + value);

        JSONObject json = JSONObject.fromObject(value);
        LOGGER.info("*****************解析回調(diào)消息json**********************json:" + json);
        try {
            msgService.msgHandle(json);
        } catch (Exception ex) {
            LOGGER.error("***********KAFKA消息監(jiān)聽異常" + ex.getMessage() + "***********", ex);
        }
        LOGGER.info("=============kafka監(jiān)聽消息開結(jié)束=============topic:" + topic + "=======value" + value);
    }

}



注:此文是參考https://www.cnblogs.com/wangb0402/p/6187796.html地址寫的
因?yàn)閰⒖紩r(shí)有很多問題股淡,所以重新歸納一份做自己的總結(jié),同樣分享出來廷区,同時(shí)也非常感謝原作者的分享唯灵!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市隙轻,隨后出現(xiàn)的幾起案子埠帕,更是在濱河造成了極大的恐慌,老刑警劉巖玖绿,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敛瓷,死亡現(xiàn)場離奇詭異,居然都是意外死亡斑匪,警方通過查閱死者的電腦和手機(jī)呐籽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蚀瘸,“玉大人狡蝶,你說我怎么就攤上這事≈” “怎么了贪惹?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長寂嘉。 經(jīng)常有香客問我奏瞬,道長,這世上最難降的妖魔是什么泉孩? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任硼端,我火速辦了婚禮,結(jié)果婚禮上棵譬,老公的妹妹穿的比我還像新娘显蝌。我一直安慰自己预伺,他們只是感情好订咸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著酬诀,像睡著了一般脏嚷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瞒御,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天父叙,我揣著相機(jī)與錄音,去河邊找鬼。 笑死趾唱,一個(gè)胖子當(dāng)著我的面吹牛涌乳,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼倡鲸!你這毒婦竟也來了拷呆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤跟磨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體躬贡,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年眼坏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了拂玻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宰译,死狀恐怖纺讲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情囤屹,我是刑警寧澤熬甚,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站肋坚,受9級特大地震影響乡括,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜智厌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一诲泌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧铣鹏,春花似錦敷扫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至合溺,卻和暖如春卒密,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背棠赛。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工哮奇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留膛腐,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓鼎俘,卻偏偏與公主長得像哲身,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子贸伐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容