一掷空、Maven準(zhǔn)備
二、配置文件
三炬称、發(fā)送者
四汁果、接收者
五涡拘、消息類
六玲躯、在虛擬機(jī)中,查看接受者是否接受成功
一鳄乏、Maven準(zhǔn)備
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
二跷车、配置文件
#============== kafka ===================
# 指定kafka 代理地址,可以多個(gè)
spring.kafka.bootstrap-servers=192.168.71.129:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量發(fā)送消息的數(shù)量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默認(rèn)消費(fèi)者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
三橱野、發(fā)送者
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
Message message = new Message();
message.setId( System.currentTimeMillis() );
message.setMsg( respStr );
message.setSendTime( new Date() );
kafkaTemplate.send( "producersToConsumers_logs", gson.toJson( message ) );
四朽缴、接受者
@KafkaListener(topics = {"producersToConsumers_logs"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> optional = Optional.ofNullable( record.value() );
if (optional.isPresent()) {
Object msg = optional.get();
log.info( "record:{}", record );
log.info( "message:{}", msg );
}
}
五、消息類
@Data
public class Message {
private long id;
private String msg;
private Date SendTime;
}
六水援、在虛擬機(jī)中密强,查看接受者是否接受成功
命令:
######啟動(dòng)zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#####啟動(dòng)kafka服務(wù)
bin/kafka-server-start.sh config/server.properties
#####查看集合
bin/kafka-topics.sh --list --zookeeper localhost:2181
#####消費(fèi)者查看信息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic producersToConsumers_logs --from-beginning
如圖
注:配置文件只是做了簡(jiǎn)單的測(cè)試配置,在實(shí)際業(yè)務(wù)生成中蜗元,要根據(jù)業(yè)務(wù)要求或渤,再詳細(xì)配置
參考官網(wǎng)API:https://www.springcloud.cc/apache-kafka-zhcn.html#api