pom配置
```
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
代碼設(shè)置
生產(chǎn)者?
```
@RestController
public class KafkaController extends BaseController{
// 服務(wù)端接口日志
protected static final Logger logger = LoggerFactory.getLogger(KafkaController.class);
@Autowired
? ? private KafkaTemplate<String, String> kafkaTemplate;
@CrossOrigin(origins = "*", maxAge = 3600)
@RequestMapping(value = "/send", method = RequestMethod.GET)
@ResponseBody
public JSONObject send(HttpServletRequest request, HttpServletResponse response) throws Exception {
logger.info(String.valueOf(RequestHolder.getId()));
String msg = request.getParameter("msg");
try {
kafkaTemplate.send("topic", msg);
JSONObject jsonObject = new JSONObject();
jsonObject.put("return_code", "0");
return jsonObject;
} catch (Exception e) {
logger.error("ExChangeController.commandDetail error.", e);
return getNetJson();
}
}
}
```
消費(fèi)者
```
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaReceiver {
protected static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
? ? @KafkaListener(topics = {"topic"})
? ? public void listen(ConsumerRecord<?, ?> record) {
? ? logger.debug("消費(fèi)接收:? topic = "+record.topic() +" , value = "+record.value());
? ? }
}
```
application文件配置
```
#============== kafka ===================
# 指定kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=10.241.95.105:9092,10.241.95.106:9092,10.241.95.107:9092
#=============== provider? =======================
spring.kafka.producer.retries=0
# 每次批量發(fā)送消息的數(shù)量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer? =======================
# 指定默認(rèn)消費(fèi)者group id? ? 同一個(gè)groupid只會(huì)有一個(gè)客戶端收到消息
spring.kafka.consumer.group-id=kafka-group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
```
PS
卡夫卡消息消費(fèi)完之后并不會(huì)直接把這個(gè)消息從隊(duì)列中移出,因此當(dāng)有新的groupid加入進(jìn)來的話烛占,已發(fā)送的消息將會(huì)再次發(fā)送一遍,卡夫卡的消息是保存在磁盤中的,直到用戶配置的過期時(shí)間到了才會(huì)刪除
默認(rèn)
# 消息失效期程腹,7天log.retention.hours=168