本文整合基于Springboot2.0+,kafka版本
kafka_2.12-2.3.0
折剃,使用org.springframework.kafka
來做的整合
項(xiàng)目目錄結(jié)構(gòu)
pom.xml依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
application.yml配置文件
server:
port: 8081
spring:
kafka:
bootstrap-servers: http://ip1:9092,http://ip2:9092,http://ip3:9092
producer:
retries: 3
acks: all
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: consumer-group1
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 20000
listener:
concurrency: 3
ack-mode: MANUAL
本配置文件是才用的并發(fā)批量消費(fèi)方式督弓, bootstrap-servers是我們集群的機(jī)器地址
生產(chǎn)者controller
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/send/{messge}")
public String sendmsg(@PathVariable String messge) {
//建議看一下KafkaTemplate的源碼 很多api 我們可以指定分區(qū)發(fā)送消息
kafkaTemplate.send("test", messge); //使用kafka模板發(fā)送信息
String res = "消息:【" + messge + "】發(fā)送成功 SUCCESS !";
log.info(res);
return res;
}
}
消費(fèi)者監(jiān)聽器
@Component
@Slf4j
public class ConsumerListener {
//建議看一下KafkaListener的源碼 很多api 我們也可以指定分區(qū)消費(fèi)消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
@KafkaListener(topics = "test", groupId = "consumer-group")
public void listen(List<String> list, Acknowledgment ack) {
log.info("本次批量拉取數(shù)量:" + list.size() + " 開始消費(fèi)....");
List<String> msgList = new ArrayList<>();
for (String record : list) {
Optional<?> kafkaMessage = Optional.ofNullable(record);
// 獲取消息
kafkaMessage.ifPresent(o -> msgList.add(o.toString()));
}
if (msgList.size() > 0) {
for (String msg : msgList) {
log.info("開始消費(fèi)消息【" + msg + "】");
}
// 更新索引
// updateES(messages);
}
//手動提交offset
ack.acknowledge();
msgList.clear();
log.info("消費(fèi)結(jié)束");
}
}
我們的消費(fèi)者監(jiān)聽器才用的并發(fā)批量下拉數(shù)據(jù) 才用手動提交方式避免消息丟失
啟動類
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
啟動程序并且生產(chǎn)消息
通啟動日志我們可以看到我們成功連接到kafka集群
kafka生產(chǎn)和消費(fèi)日志信息
這里我們也可以通過批量生產(chǎn)消息 改變配置文件的并發(fā)參數(shù)和批量下拉參數(shù)來做批量并發(fā)消費(fèi)
我們這里topic設(shè)置的為test groupId為consumer-group