pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
-
生產(chǎn)者(發(fā)布消息)
application.yml文件
server:
port: 8091
spring:
application:
name: producer
kafka:
# kafka 配置
bootstrap-servers: 192.168.75.149:9092 # 本地利用centOs7搭建kafka的地址
producer:
acks: all
client-id: producer-demo
retries: 0
# 每次批量發(fā)送消息的數(shù)量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
KafkaConfig
/**
* 啟動(dòng)默認(rèn)創(chuàng)建topic,分區(qū),副本數(shù)量,topic存在則自動(dòng)忽略
* @author wushiyi
**/
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public NewTopic initNewTopic(){
return new NewTopic("userInfo",1,(short) 1);
}
}
發(fā)布消息
/**
* @author wushiyi
**/
@Service
@Slf4j
public class UserService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
/**
* 發(fā)送消息
*/
public void send() {
User user = new User("ZhangSan","男","12345678901");
log.info("向kafka推送用戶(hù)信息:{}",gson.toJson(user));
this.kafkaTemplate.send("userInfo",gson.toJson(user));
}
}
調(diào)用接口控制臺(tái)打印
-
消費(fèi)者(訂閱消息)
application.yml文件
server:
port: 8092
spring:
application:
name: consumer
kafka:
# kafka配置
bootstrap-servers: 192.168.75.149:9092
consumer:
# 指定默認(rèn)消費(fèi)者group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 5000
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消費(fèi)
/**
* @author wushiyi
**/
@Slf4j
@Component
public class UserConsumer {
private Gson gson = new GsonBuilder().create();
@KafkaListener(topics = "userInfo")
public void userConsumer(String message) {
log.info("receive msg {}" , message);
}
}
訂閱結(jié)果
-
補(bǔ)充
producer啟動(dòng)出現(xiàn)以下報(bào)錯(cuò)檢查防火墻是否開(kāi)啟胯盯,把防火墻關(guān)閉即可