隨著spring boot 1.5版本的發(fā)布扛邑,在spring項(xiàng)目中與kafka集成更為簡(jiǎn)便。
引入依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
具體spring-kafka的版本由spring boot的當(dāng)前版本決定绑改。
application.properties配置文件
spring.kafka.bootstrap-servers=192.168.1.107:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
最簡(jiǎn)化的配置僅需指定kafka主機(jī)和消息者組名即可芥牌。這里使用的是單節(jié)點(diǎn)kafka,集群環(huán)境中配置多個(gè)kafka主機(jī)地址即可寸爆。例如:
spring.kafka.bootstrap-servers=192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092
以下4項(xiàng)配置指定消息key和消息體的編解碼方式替废。
spring.kafka.consumer.key-deserializer
spring.kafka.consumer.value-deserializer
spring.kafka.producer.key-serializer
spring.kafka.producer.value-serializer
消息對(duì)象
import java.util.Date;
public class Message {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
消息生產(chǎn)者
import java.util.Date;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@Component
public class Sender {
@Autowired
private KafkaTemplate kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void sendMessage(){
Message m = new Message();
m.setId(System.currentTimeMillis());
m.setMsg(UUID.randomUUID().toString());
m.setSendTime(new Date());
kafkaTemplate.send("test1", gson.toJson(m));
}
}
消息消費(fèi)者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@Component
public class Receiver {
private Gson gson = new GsonBuilder().create();
@KafkaListener(topics = "test1")
public void processMessage(String content) {
Message m = gson.fromJson(content, Message.class);
}
}
運(yùn)行
@SpringBootApplication
public class AppStart {
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(AppStart.class, args);
while(true){
Sender sender = app.getBean(Sender.class);
sender.sendMessage();
Thread.sleep(500);
}
}
}
通過(guò)上面的示例可以發(fā)現(xiàn),相對(duì)于spring boot 1.4.x版本,1.5集成kafka主要是將以前需要手工編碼進(jìn)行設(shè)置的kafka配置改由spring配置文件定義绎晃。
注意
我使用的spring boot版本是1.5.1,spring-kafka版本1.1.2,jdk1.8蜜唾,該組合似乎不支持低版本的kafka杂曲。之前我使用kafka版本為2.11-0.10.0.0,向kafka發(fā)送消息時(shí)一直產(chǎn)生異常,后來(lái)升級(jí)kafka版本至2.11-0.10.2.0故障消失袁余。由于測(cè)試時(shí)間有限擎勘,未作進(jìn)一步分析。希望查明原因的同學(xué)能私信我颖榜。謝謝棚饵。