本文代碼格式不好調整脆栋,可以參考本人在其他地方的同篇博文 https://blog.csdn.net/russle/article/details/80296006
Apache Kafka, 分布式消息系統(tǒng)嫂伞,
非常流行。Spring是非常流行的Java快速開發(fā)框架岁诉。將兩者無縫平滑結合起來可以快速實現(xiàn)很多功能。本文件簡要介紹Spring
Kafka肌似,如何使用 KafkaTemplate發(fā)送消息到kafka的broker上义黎, 如何使用“l(fā)istener
container“接收Kafka消息。
1姐呐,Spring Kafka的組成
這一節(jié)我們首先介紹Spring Kafka的各個組成部分殿怜。
1.1? 發(fā)送消息
與 JmsTemplate 或者JdbcTemplate類似,Spring Kafka提供了 KafkaTemplate.? 該模板封裝了Kafka消息生產者并提供各種消息發(fā)送方法曙砂。
消息發(fā)送的各種方法头谜。
```
ListenableFuture> send(Stringtopic, V data);ListenableFuture> send(Stringtopic,Kkey, V data);ListenableFuture> send(Stringtopic, int partition, V data);ListenableFuture> send(Stringtopic, int partition,Kkey, V data);ListenableFuture> send(Message message);
```
1.2 接收消息
要接收消息,我們需要配置MessageListenerContainer并提供一個Message Listener鸠澈,或者使用 @KafkaListener注解柱告。
MessageListenserContainer
MessageListenserContainer 有以下兩個實現(xiàn)類:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer可以讓我們使用單線程消費Kafka topic的消息,而ConcurrentMessageListenerContainer 可以讓我們多線程消費消息笑陈。
@KafkaListener 注解
Spring Kafka提供的 @KafkaListener注解际度,可以讓我們監(jiān)聽某個topic或者topicPattern的消息。
監(jiān)聽符合topicPattern = “topic.*”的所有topic的消息
```
@Component@Slf4jpublicclassCmdReceiver {? ? @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {? ? ? ? Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {? ? ? ? ? ? Object message = kafkaMessage.get();? ? ? ? ? ? log.info("----------------- record =topic:"+ topic+", "+ record);? ? ? ? ? ? log.info("------------------ message =topic:"+ topic+", "+ message);? ? ? ? }? ? }}
```
監(jiān)聽某個topic的消息publicclassListener {? ? @KafkaListener(id ="id01", topics ="Topic1")publicvoidlisten(String data) {? ? }}
2涵妥, Spring Kafka 例子
下面我們介紹一個具體的例子乖菱, 這個例會發(fā)送和接收指定topic的消息。
準備工作:
kafka_2.11-1.1.0.tgz和zookeeper-3.4.10.tar.gz
JDK jdk-8u171-linux-x64.tar.gz
IDE (Eclipse or IntelliJ)
Build tool (Maven? or Gradle)
本文不涉及安裝Kafka的介紹妹笆,請自行搜索块请,或者看官方文檔。
pom文件:
也就是我們的依賴包. 這是筆者使用的依賴版本拳缠,僅供參考。
4.0.0com.yqkafkademo1.0-SNAPSHOTorg.springframework.bootspring-boot-starter-parent1.5.12.RELEASEUTF-8UTF-81.8org.springframework.bootspring-boot-starter-weborg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtestorg.springframework.kafkaspring-kafka1.1.8.RELEASEcom.google.code.gsongson2.8.2org.apache.kafkakafka-clients0.10.1.1io.springfoxspringfox-swagger22.7.0io.springfoxspringfox-swagger-ui2.7.0io.springfoxspringfox-spring-web2.7.0com.alibabafastjson1.1.33org.springframework.bootspring-boot-maven-plugin
*KafkaDemoApplication*
我們使用springboot的框架贸弥,這是我們程序的入口點窟坐。
@SpringBootApplicationpublicclassKafkaDemoApplication{privatestaticfinalLogger logger = LoggerFactory.getLogger(KafkaDemoApplication.class);publicstaticvoidmain(String[] args) {? ? ? ? ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);? ? ? ? logger.info("Done start Spring boot");? ? }}
ProducerConfig
其實我們可以可以不用編寫KafkaProducerConfig,直接使用KafkaTemplate(當然前提是我們要設置好producer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.producer.key-serializer, spring.kafka.producer.retries等等)
@Configuration@EnableKafkapublicclassKafkaProducerConfig{@BeanpublicProducerFactoryproducerFactory() {returnnewDefaultKafkaProducerFactory<>(producerConfigs());? ? }@BeanpublicMapproducerConfigs() {? ? ? ? Map props =newHashMap<>();? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根據(jù)實際情況修改)");? ? ? ? props.put(ProducerConfig.RETRIES_CONFIG,0);? ? ? ? props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);? ? ? ? props.put(ProducerConfig.LINGER_MS_CONFIG,1);? ? ? ? props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);? ? ? ? props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);? ? ? ? props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);returnprops;? ? }@BeanpublicKafkaTemplatekafkaTemplate() {returnnewKafkaTemplate(producerFactory());? ? }}
KafkaConsumerConfig
同理哲鸳,其實我們可以可以不用編寫KafkaConsumerConfig臣疑,直接使用 @KafkaListener(當然前提是我們要設置好consumer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.consumer.key-deserializer, spring.kafka.consumer.group-id徙菠、spring.kafka.consumer.auto-offset-reset等等)
@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@BeanKafkaListenerContainerFactory> kafkaListenerContainerFactory() {? ? ? ? ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory<>();? ? ? ? factory.setConsumerFactory(consumerFactory());? ? ? ? factory.setConcurrency(3);? ? ? ? factory.getContainerProperties().setPollTimeout(3000);returnfactory;? ? }@BeanpublicConsumerFactoryconsumerFactory() {returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());? ? }@BeanpublicMapconsumerConfigs() {? ? ? ? Map propsMap =newHashMap<>();? ? ? ? propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根據(jù)實際情況修改)");? ? ? ? propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);? ? ? ? propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");? ? ? ? propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");? ? ? ? propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);? ? ? ? propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);? ? ? ? propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");? ? ? ? propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");returnpropsMap;? ? }@BeanpublicMyListenerlistener() {returnnewMyListener();? ? }}
定義了ProducerConfig和ConsumerConfig后我們需要實現(xiàn)具體的生產者和消費者讯沈。
本文的KafkaListenerContainerFactory 中使用了ConcurrentKafkaListenerContainer, 我們將使用多線程消費消息婿奔。
注意消息代理的地址是localhost:9092缺狠,
需要根據(jù)實際情況修改。需要特別注意的是萍摊,我在windows運行程序挤茄,kafka在我的linux虛擬機,
我需要配置windows的hosts文件冰木,配置虛擬機hostname和ip的映射穷劈,例如192.168.119.131? ? ? ?
ubuntu01
開發(fā)Listener
我們來開發(fā)自己的Listener監(jiān)聽具體的topic, 這里例子中我們監(jiān)聽以topic開頭的主題踊沸,不做其他業(yè)務歇终,只是打印出來。
@Component@Slf4jpublicclassMyListener{? ? @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {? ? ? ? Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {? ? ? ? ? ? Object message = kafkaMessage.get();? ? ? ? ? ? log.info("------------------ message =topic:"+ topic+", "+ message);? ? ? ? }? ? }}
開發(fā)producer
我在程序中增加了controller逼龟,這樣我們可以通過controller給topic發(fā)消息评凝。consumer一直在監(jiān)聽,只要有消息發(fā)送過去审轮,就會打印出來肥哎。controller中調用了ProducerServiceImpl , 具體代碼比較簡單就不再羅列疾渣。
我們producerServiceImpl主要是有這句篡诽, 通過KafkaTemplate 發(fā)送消息。
@Autowired
private KafkaTemplate template;
@ServicepublicclassProducerServiceImplimplementsProducerService{privatestaticfinalLogger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);privateGson gson =newGsonBuilder().create();@AutowiredprivateKafkaTemplate template;//發(fā)送消息方法publicvoidsendJson(String topic, String json) {? ? ? ? JSONObject jsonObj = JSON.parseObject(json);? ? ? ? jsonObj.put("topic", topic);? ? ? ? jsonObj.put("ts", System.currentTimeMillis() +"");? ? ? ? logger.info("json+++++++++++++++++++++? message = {}", jsonObj.toJSONString());? ? ? ? ListenableFuture> future = template.send(topic, jsonObj.toJSONString());? ? ? ? future.addCallback(newListenableFutureCallback>() {@OverridepublicvoidonSuccess(SendResult result) {? ? ? ? ? ? ? ? System.out.println("msg OK."+ result.toString());? ? ? ? ? ? }@OverridepublicvoidonFailure(Throwable ex) {? ? ? ? ? ? ? ? System.out.println("msg send failed: ");? ? ? ? ? ? }? ? ? ? });? ? }
運行程序
運行第一步榴捡,確保Kafka broker配置正確杈女,筆者的程序在Windows10機器上,Kafka在虛擬機上吊圾,因為我的地址是192.168.119.129:9092, 而不是localhost:9092.
運行第二步驟达椰,在IDEA中選中KafkaDemoApplication ,? 單擊鼠標右鍵,選擇 Run KafkaDemoApplication
效果圖
kafka段命令行接收到的消息
3项乒,總結
Spring Kafka提供了很好的集成啰劲,我們只需配置properties文件,就可以直接使用KafkaTemplate發(fā)送消息檀何,使用@KafkaListener監(jiān)聽消息蝇裤。
參考文檔:
https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#kafka-template