docker安裝
這里推薦使用docker-compose安裝
以下是docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_HOST_NAME: 172.31.245.238
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./kafka-log:/kafka
- ALLOW_PLAINTEXT_LISTENER 允許使用PLAINTEXT偵聽器。
- depends_on: -zookeeper:kafka依賴于zookeeper
- KAFKA_ADVERTISED_LISTENERS 是指向Kafka代理的可用地址列表躏结。 Kafka將在初次連接時將它們發(fā)送給客戶却盘。格式為 PLAINTEXT://host:port ,此處已將容器9092端口映射到宿主機9092端口媳拴,0.0.0.0為監(jiān)聽所有地址(未驗證)
- KAFKA_ADVERTISED_HOST_NAME: 172.31.245.238 :映射宿主機地址
- KAFKA_CREATE_TOPICS: "test:1:1" 預創(chuàng)建主題test, 分區(qū)數(shù)黄橘,分區(qū)副本數(shù)
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 映射zookeeper
步驟
(1)創(chuàng)建以上yaml文件
(2)在該文件目錄下,執(zhí)行
docker-compose build
docker-compose up -d
SpringBoot簡單應用實例
依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=PLAINTEXT://172.31.245.238:9092
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應答級別:多少個分區(qū)副本備份完成時向生產(chǎn)者發(fā)送ack確認(可選0屈溉、1塞关、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時
spring.kafka.producer.properties.linger.ms=0
# 當生產(chǎn)端積累的消息達到batch-size或接收到消息linger.ms后,生產(chǎn)者就會將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時候batch-size其實就沒用了
# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
?
###########【初始化消費者配置】###########
# 默認的消費組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費會話超時時間(超過這個時間consumer沒有發(fā)送心跳,就會觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費請求超時時間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消費端監(jiān)聽的topic不存在時,項目啟動會報錯(關(guān)掉)
spring.kafka.listener.missing-topics-fatal=false
# 設(shè)置批量消費
# spring.kafka.listener.type=batch
# 批量消費每次最多消費多少條消息
# spring.kafka.consumer.max-poll-records=50
spring.kafka.bootstrap-servers=PLAINTEXT://172.31.245.238:9092子巾,其中172.31.245.238為docker-compose的KAFKA_ADVERTISED_HOST_NAME參數(shù)帆赢。
生產(chǎn)者:
@RestController
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 直接發(fā)送字符串
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("test", normalMessage);
}
// 將對象轉(zhuǎn)化為json字符串再發(fā)送
@GetMapping("/kafka/sendTopic2")
public void sendMessage2() {
User user = User.builder()
.name("yuanwei")
.password("1234")
.age(12)
.build();
try{
String message= JacksonUtil.objToJson(user);
kafkaTemplate.send("test", message);
}catch (Exception e){
log.error(e.toString());
}
}
}
消費者:
@Component
@Slf4j
public class KafkaConsumer {
// 消費監(jiān)聽
@KafkaListener(topics = {"test"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消費的哪個topic、partition的消息,打印出消息內(nèi)容
System.out.println("簡單消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 將消費到的json字符串轉(zhuǎn)化為對象线梗。
@KafkaListener(topics = {"test"})
public void onMessage2(ConsumerRecord<?, ?> record){
// 消費的哪個topic椰于、partition的消息,打印出消息內(nèi)容
System.out.println("對象1消費:"+record.topic()+"-"+record.partition()+"-"+record.value());
try{
User user = (User)JacksonUtil.jsonToObj(new User(),(String)record.value());
System.out.println("對象1消費:"+record.topic()+"-"+record.partition()+"-"+user);
}catch (Exception e){
log.error(e.toString());
}
}
}
json轉(zhuǎn)換工具類
public class JacksonUtil {
/*
* 001.json轉(zhuǎn)換成對象
* @param:傳入對象,json字符串
* @return:Object
*/
public static Object jsonToObj(Object obj,String jsonStr) throws JsonParseException, JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
return obj = mapper.readValue(jsonStr, obj.getClass());
}
/*
* 002.對象轉(zhuǎn)換成json
* @param:傳入對象
* @return:json字符串
*/
public static String objToJson(Object obj) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(obj);
}
}
User對象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String name;
private String password;
private Integer age;
}