Spring Kafka 教程 – spring讀取和發(fā)送kakfa消息

本文代碼格式不好調整脆栋,可以參考本人在其他地方的同篇博文 https://blog.csdn.net/russle/article/details/80296006

Apache Kafka, 分布式消息系統(tǒng)嫂伞,

非常流行。Spring是非常流行的Java快速開發(fā)框架岁诉。將兩者無縫平滑結合起來可以快速實現(xiàn)很多功能。本文件簡要介紹Spring

Kafka肌似,如何使用 KafkaTemplate發(fā)送消息到kafka的broker上义黎, 如何使用“l(fā)istener

container“接收Kafka消息。

1姐呐,Spring Kafka的組成

這一節(jié)我們首先介紹Spring Kafka的各個組成部分殿怜。

1.1? 發(fā)送消息

與 JmsTemplate 或者JdbcTemplate類似,Spring Kafka提供了 KafkaTemplate.? 該模板封裝了Kafka消息生產者并提供各種消息發(fā)送方法曙砂。

消息發(fā)送的各種方法头谜。

```

ListenableFuture> send(Stringtopic, V data);ListenableFuture> send(Stringtopic,Kkey, V data);ListenableFuture> send(Stringtopic, int partition, V data);ListenableFuture> send(Stringtopic, int partition,Kkey, V data);ListenableFuture> send(Message message);

```

1.2 接收消息

要接收消息,我們需要配置MessageListenerContainer并提供一個Message Listener鸠澈,或者使用 @KafkaListener注解柱告。

MessageListenserContainer

MessageListenserContainer 有以下兩個實現(xiàn)類:

KafkaMessageListenerContainer

ConcurrentMessageListenerContainer

KafkaMessageListenerContainer可以讓我們使用單線程消費Kafka topic的消息,而ConcurrentMessageListenerContainer 可以讓我們多線程消費消息笑陈。

@KafkaListener 注解

Spring Kafka提供的 @KafkaListener注解际度,可以讓我們監(jiān)聽某個topic或者topicPattern的消息。

監(jiān)聽符合topicPattern = “topic.*”的所有topic的消息

```

@Component@Slf4jpublicclassCmdReceiver {? ? @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {? ? ? ? Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {? ? ? ? ? ? Object message = kafkaMessage.get();? ? ? ? ? ? log.info("----------------- record =topic:"+ topic+", "+ record);? ? ? ? ? ? log.info("------------------ message =topic:"+ topic+", "+ message);? ? ? ? }? ? }}

```


監(jiān)聽某個topic的消息publicclassListener {? ? @KafkaListener(id ="id01", topics ="Topic1")publicvoidlisten(String data) {? ? }}

2涵妥, Spring Kafka 例子

下面我們介紹一個具體的例子乖菱, 這個例會發(fā)送和接收指定topic的消息。

準備工作

kafka_2.11-1.1.0.tgz和zookeeper-3.4.10.tar.gz

JDK jdk-8u171-linux-x64.tar.gz

IDE (Eclipse or IntelliJ)

Build tool (Maven? or Gradle)

本文不涉及安裝Kafka的介紹妹笆,請自行搜索块请,或者看官方文檔。

pom文件

也就是我們的依賴包. 這是筆者使用的依賴版本拳缠,僅供參考。

4.0.0com.yqkafkademo1.0-SNAPSHOTorg.springframework.bootspring-boot-starter-parent1.5.12.RELEASEUTF-8UTF-81.8org.springframework.bootspring-boot-starter-weborg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtestorg.springframework.kafkaspring-kafka1.1.8.RELEASEcom.google.code.gsongson2.8.2org.apache.kafkakafka-clients0.10.1.1io.springfoxspringfox-swagger22.7.0io.springfoxspringfox-swagger-ui2.7.0io.springfoxspringfox-spring-web2.7.0com.alibabafastjson1.1.33org.springframework.bootspring-boot-maven-plugin

*KafkaDemoApplication*

我們使用springboot的框架贸弥,這是我們程序的入口點窟坐。

@SpringBootApplicationpublicclassKafkaDemoApplication{privatestaticfinalLogger logger = LoggerFactory.getLogger(KafkaDemoApplication.class);publicstaticvoidmain(String[] args) {? ? ? ? ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);? ? ? ? logger.info("Done start Spring boot");? ? }}

ProducerConfig

其實我們可以可以不用編寫KafkaProducerConfig,直接使用KafkaTemplate(當然前提是我們要設置好producer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.producer.key-serializer, spring.kafka.producer.retries等等)

@Configuration@EnableKafkapublicclassKafkaProducerConfig{@BeanpublicProducerFactoryproducerFactory() {returnnewDefaultKafkaProducerFactory<>(producerConfigs());? ? }@BeanpublicMapproducerConfigs() {? ? ? ? Map props =newHashMap<>();? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根據(jù)實際情況修改)");? ? ? ? props.put(ProducerConfig.RETRIES_CONFIG,0);? ? ? ? props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);? ? ? ? props.put(ProducerConfig.LINGER_MS_CONFIG,1);? ? ? ? props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);? ? ? ? props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);? ? ? ? props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);returnprops;? ? }@BeanpublicKafkaTemplatekafkaTemplate() {returnnewKafkaTemplate(producerFactory());? ? }}

KafkaConsumerConfig

同理哲鸳,其實我們可以可以不用編寫KafkaConsumerConfig臣疑,直接使用 @KafkaListener(當然前提是我們要設置好consumer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.consumer.key-deserializer, spring.kafka.consumer.group-id徙菠、spring.kafka.consumer.auto-offset-reset等等)

@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@BeanKafkaListenerContainerFactory> kafkaListenerContainerFactory() {? ? ? ? ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory<>();? ? ? ? factory.setConsumerFactory(consumerFactory());? ? ? ? factory.setConcurrency(3);? ? ? ? factory.getContainerProperties().setPollTimeout(3000);returnfactory;? ? }@BeanpublicConsumerFactoryconsumerFactory() {returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());? ? }@BeanpublicMapconsumerConfigs() {? ? ? ? Map propsMap =newHashMap<>();? ? ? ? propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根據(jù)實際情況修改)");? ? ? ? propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);? ? ? ? propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");? ? ? ? propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");? ? ? ? propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);? ? ? ? propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);? ? ? ? propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");? ? ? ? propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");returnpropsMap;? ? }@BeanpublicMyListenerlistener() {returnnewMyListener();? ? }}

定義了ProducerConfig和ConsumerConfig后我們需要實現(xiàn)具體的生產者和消費者讯沈。

本文的KafkaListenerContainerFactory 中使用了ConcurrentKafkaListenerContainer, 我們將使用多線程消費消息婿奔。

注意消息代理的地址是localhost:9092缺狠,

需要根據(jù)實際情況修改。需要特別注意的是萍摊,我在windows運行程序挤茄,kafka在我的linux虛擬機,

我需要配置windows的hosts文件冰木,配置虛擬機hostname和ip的映射穷劈,例如192.168.119.131? ? ? ?

ubuntu01

開發(fā)Listener

我們來開發(fā)自己的Listener監(jiān)聽具體的topic, 這里例子中我們監(jiān)聽以topic開頭的主題踊沸,不做其他業(yè)務歇终,只是打印出來。

@Component@Slf4jpublicclassMyListener{? ? @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {? ? ? ? Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {? ? ? ? ? ? Object message = kafkaMessage.get();? ? ? ? ? ? log.info("------------------ message =topic:"+ topic+", "+ message);? ? ? ? }? ? }}

開發(fā)producer

我在程序中增加了controller逼龟,這樣我們可以通過controller給topic發(fā)消息评凝。consumer一直在監(jiān)聽,只要有消息發(fā)送過去审轮,就會打印出來肥哎。controller中調用了ProducerServiceImpl , 具體代碼比較簡單就不再羅列疾渣。

我們producerServiceImpl主要是有這句篡诽, 通過KafkaTemplate 發(fā)送消息。

@Autowired

private KafkaTemplate template;

@ServicepublicclassProducerServiceImplimplementsProducerService{privatestaticfinalLogger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);privateGson gson =newGsonBuilder().create();@AutowiredprivateKafkaTemplate template;//發(fā)送消息方法publicvoidsendJson(String topic, String json) {? ? ? ? JSONObject jsonObj = JSON.parseObject(json);? ? ? ? jsonObj.put("topic", topic);? ? ? ? jsonObj.put("ts", System.currentTimeMillis() +"");? ? ? ? logger.info("json+++++++++++++++++++++? message = {}", jsonObj.toJSONString());? ? ? ? ListenableFuture> future = template.send(topic, jsonObj.toJSONString());? ? ? ? future.addCallback(newListenableFutureCallback>() {@OverridepublicvoidonSuccess(SendResult result) {? ? ? ? ? ? ? ? System.out.println("msg OK."+ result.toString());? ? ? ? ? ? }@OverridepublicvoidonFailure(Throwable ex) {? ? ? ? ? ? ? ? System.out.println("msg send failed: ");? ? ? ? ? ? }? ? ? ? });? ? }

運行程序

運行第一步榴捡,確保Kafka broker配置正確杈女,筆者的程序在Windows10機器上,Kafka在虛擬機上吊圾,因為我的地址是192.168.119.129:9092, 而不是localhost:9092.

運行第二步驟达椰,在IDEA中選中KafkaDemoApplication ,? 單擊鼠標右鍵,選擇 Run KafkaDemoApplication

效果圖

kafka段命令行接收到的消息

3项乒,總結

Spring Kafka提供了很好的集成啰劲,我們只需配置properties文件,就可以直接使用KafkaTemplate發(fā)送消息檀何,使用@KafkaListener監(jiān)聽消息蝇裤。

參考文檔:

https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#kafka-template

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末廷支,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子栓辜,更是在濱河造成了極大的恐慌恋拍,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藕甩,死亡現(xiàn)場離奇詭異施敢,居然都是意外死亡,警方通過查閱死者的電腦和手機狭莱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門僵娃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人贩毕,你說我怎么就攤上這事悯许。” “怎么了辉阶?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵先壕,是天一觀的道長。 經常有香客問我谆甜,道長垃僚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任规辱,我火速辦了婚禮谆棺,結果婚禮上,老公的妹妹穿的比我還像新娘罕袋。我一直安慰自己改淑,他們只是感情好,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布浴讯。 她就那樣靜靜地躺著朵夏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪榆纽。 梳的紋絲不亂的頭發(fā)上仰猖,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音奈籽,去河邊找鬼饥侵。 笑死,一個胖子當著我的面吹牛衣屏,可吹牛的內容都是我干的躏升。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼狼忱,長吁一口氣:“原來是場噩夢啊……” “哼煮甥!你這毒婦竟也來了盗温?” 一聲冷哼從身側響起藕赞,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤成肘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后斧蜕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體双霍,經...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年批销,在試婚紗的時候發(fā)現(xiàn)自己被綠了洒闸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡均芽,死狀恐怖丘逸,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情掀宋,我是刑警寧澤深纲,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站劲妙,受9級特大地震影響湃鹊,放射性物質發(fā)生泄漏。R本人自食惡果不足惜镣奋,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一币呵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧侨颈,春花似錦余赢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至温赔,卻和暖如春蛤奢,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背陶贼。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工啤贩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人拜秧。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓痹屹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親枉氮。 傳聞我的和親對象是個殘疾皇子志衍,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理暖庄,服務發(fā)現(xiàn),斷路器楼肪,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • Kafka入門經典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,812評論 4 54
  • 一培廓、基本概念 介紹 Kafka是一個分布式的、可分區(qū)的春叫、可復制的消息系統(tǒng)肩钠。它提供了普通消息系統(tǒng)的功能,但具有自己獨...
    ITsupuerlady閱讀 1,627評論 0 9
  • 前言 在微服務架構的系統(tǒng)中暂殖,我們通常會使用輕量級的消息代理來構建一個共用的消息主題讓系統(tǒng)中所有微服務實例都連接上來...
    Chandler_玨瑜閱讀 6,569評論 2 39
  • 真正的讀書价匠,須得你捧起完整漫長的字句,心無旁騖地走進作者設定的世界里呛每,與千年前的古人對話踩窖,觀別人的不可思議人生,去...
    舊城夢境exo閱讀 224評論 0 0