1.添加依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
2.application.properties 中加入配置
#kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.創(chuàng)建一個(gè)控制器昔逗,用來接收請求測試
package com.zyw.springboot.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("kafka")
public class KakfaController {
????@Autowired
? ? private KafkaTemplatekafkaTemplate;
????@RequestMapping("send/{message}")
????public String send(@PathVariable String message){
????????kafkaTemplate.send("test",message);
????????return "消息發(fā)出成功擂错,消息內(nèi)容:" + message;
? ? }
}
4.創(chuàng)建一個(gè)監(jiān)聽來獲取消息
package com.zyw.springboot.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
????????@KafkaListener(topics ="test")
????????public void listener(ConsumerRecord record){
????????????????System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
????????}
}