1. 前言
本文為 Kafka 入門筆記努潘,主要包括 Kafka 單節(jié)點(diǎn)部署、生產(chǎn)消費(fèi)消息,以及新手踩坑記錄疯坤。
Kafka 作為大數(shù)據(jù)必備組件报慕、消息中間件必學(xué)的 Apache 頂級(jí)開源項(xiàng)目,服務(wù)穩(wěn)定贴膘、高吞吐的流數(shù)據(jù)處理平臺(tái)卖子。具體介紹可查看文末參考文檔略号。
目前 簡(jiǎn)書 暫不支持 markdown 收縮語句塊刑峡,對(duì)于文章展示方面略有問題
1.1. 可查看美觀版本
文檔:Kafka 入門手記.md
鏈接:http://note.youdao.com/noteshare?id=d1c65daf3c137f29a860b5efd5dff944&sub=F4E6A5E5FB3946499E7C4C3E6022E1DC
2. Kafka單節(jié)點(diǎn)部署
2.1. 下載
選擇合適版本即可,這里選擇最新版本玄柠。Linux 環(huán)境突梦。
2.2. 解壓
Kafka 安裝包 后綴為 .tgz
, 解壓即可羽利。
tar -zxvf kafka_package.tgz
其中宫患,kafka_package.tgz
是 Kafka 安裝包名稱。
2.2.1. zookeeper 安裝
Kafka 依賴于 zookeeper(ZK) 支持这弧,本文直接采用 Kafka 安裝包自帶的 zookeeper娃闲。
也可以單獨(dú)部署 zookeeper,使用方式一樣匾浪。對(duì)于 生產(chǎn)環(huán)境皇帮,建議單獨(dú)搭建 zookeeper。
2.3. 配置
2.3.1. zookeeper
進(jìn)入 Kafka 解壓包根目錄蛋辈,config
文件夾下的即為 Kafka 提供的默認(rèn)配置文件属拾。
此處我們修改下 zookeeper.properties
的 host
信息。此處修改 host
配置冷溶,主要是為了避免在使用 Java 客戶端連接解析為 localhost
host.name=your_ip
advertised.host.name=your_ip
其中渐白,your_ip
配置為服務(wù)器外網(wǎng) IP。
2.3.2. Kafka
同樣在 config
配置文件下修改 server.properties
文件即可逞频。
主要配置如下:
# 監(jiān)聽
listeners=PLAINTEXT://服務(wù)器內(nèi)網(wǎng)IP:9092
# 以下兩項(xiàng)類似 zookeeper 配置
advertised.listeners=PLAINTEXT://服務(wù)器外網(wǎng)IP:9092
host.name=外網(wǎng)IP
# zookeeper 連接信息
zookeeper.connect=zookeeper服務(wù)器IP:2181
2.4. 啟動(dòng)
先啟動(dòng) zookeeper纯衍,因?yàn)?Kafka 啟動(dòng)的時(shí)候會(huì)連接注冊(cè) zookeeper。
2.4.1. zookeeper
啟動(dòng) zookeeper苗胀,在 Kafka 根目錄執(zhí)行
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
上述方式襟诸,會(huì)占有 shell 客戶端窗口,如果想后臺(tái)啟動(dòng)柒巫,添加參數(shù) daemon
即可
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
2.4.2. kafka
類似 zookeeper 啟動(dòng)励堡。
./bin/kafka-server-start.sh ./config/server.properties
后臺(tái)啟動(dòng):
./bin/kafka-server-start.sh -daemon ./config/server.properties
此時(shí) Kafka 單節(jié)點(diǎn)部署就已經(jīng)完成了,通過 ps -ef | grep zookeeper
, ps -ef | grep kafka
看到對(duì)應(yīng)進(jìn)程堡掏,證明啟動(dòng)成功应结。
2.5. topic
2.5.1. 創(chuàng)建 topic
通過 Kafka 提供的腳本文件,即可創(chuàng)建。在 Kafka 根目錄下執(zhí)行:
./config/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic_name
1)指定 zk 為 localhost:2181
2)副本因子為 1鹅龄,即不需要副本
3)partition 數(shù)量為 3
4)topic 名稱為 top_name
2.5.2. 查看 topic
./config/kafka-topics.sh --list --zookeeper localhost:2181
- list 命令用于查看
2)需要指定 zk
2.6. 生產(chǎn)消費(fèi)
此處直接通過 Kafka 提供的簡(jiǎn)單客戶端進(jìn)行生產(chǎn)消費(fèi)數(shù)據(jù)揩慕。
2.6.1. 生產(chǎn)
1、啟動(dòng)簡(jiǎn)單 producer
./config/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
1) --broker-list 指定 Kafka 的地址及端口
2)--topic 指定具體 topic_name
2扮休、 生產(chǎn)消息
直接在 producer 窗口輸入消息即可迎卤,消息是否發(fā)送成功,直接在 consumer 窗口即可查看玷坠。
2.6.2. 消費(fèi)
1蜗搔、啟動(dòng)簡(jiǎn)單 consumer
./config/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning
1)--bootstrap-server 指定 Kafka 地址及端口
2)-- topic 指定 topic
3)--from-beginning 表示指定從 offset 從頭開始消費(fèi)
2、消費(fèi)數(shù)據(jù)
直接在 producer 窗口發(fā)送消息八堡,然后切換至 consumer 窗口樟凄,查看是否成功消費(fèi)消息。
3. Java Client 生產(chǎn)消費(fèi)
此步基于 SpringBoot 進(jìn)行搭建 demo 項(xiàng)目兄渺。 SpringBoot 版本為 2.x
3.1. 新建 SpringBoot 項(xiàng)目
直接通過 spring.starter 創(chuàng)建即可缝龄。完整項(xiàng)目: lambochen/demo/kafka
3.1.1. 引入依賴
Kafka 依賴:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.2. Kafka 配置
3.2.1. producer
1、application.properties
配置
kafka.producer.servers=kafka服務(wù)器IP:服務(wù)器端口號(hào)
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
kafka.topic.default=topic名稱
2挂谍、ProducerFactory
, KafkaTemplate
配置:
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new LinkedHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG,linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return props;
}
public ProducerFactory<String, MessageEntity> producerFactory(){
return new DefaultKafkaProducerFactory<>(
producerConfigs(),
new StringSerializer(),
new JsonSerializer<MessageEntity>());
}
@Bean("kafkaTemplate")
public KafkaTemplate<String, MessageEntity> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
3.2.2. consumer
1叔壤、application.properties
配置
kafka.consumer.zookeeper.connect=ZK服務(wù)器端口:ZK端口
kafka.consumer.servers=Kafka服務(wù)器IP:Kafka端口
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=topic名稱
kafka.consumer.group.id=consumerGroup名稱
kafka.consumer.concurrency=10
2、KafkaListenerContainerFactory
配置
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
private ConsumerFactory<String, MessageEntity> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(MessageEntity.class)
);
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
3.2.3. ProducerCallBack
生產(chǎn)回調(diào)口叙,主要用于生產(chǎn)者發(fā)送消息后的處理炼绘。此 demo 僅作日志記錄。
需要集成 ListenableFutureCallback
庐扫,并指定消息實(shí)體類型饭望。
public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>>
其中, MessageEntity
為消息實(shí)體類型形庭。
3.3. 生產(chǎn)消息
創(chuàng)建 ProducerRecord
消息铅辞,通過 KafkaTemplate
發(fā)送即可。
@Autowired
@Qualifier("kafkaTemplate")
private KafkaTemplate<String, MessageEntity> kafkaTemplate;
public void send(String topic, MessageEntity message) {
kafkaTemplate.send(topic, message);
}
public void send(String topic, String key, MessageEntity message) {
ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(topic, key, message);
long startTime = System.currentTimeMillis();
ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
future.addCallback(new ProducerCallback(startTime, key, message));
}
3.4. 消費(fèi)消息
消費(fèi)消息萨醒,通過 @KafkaConsumer
注解即可實(shí)現(xiàn)斟珊。
@KafkaListener(topics = "${kafka.topic.default}", containerFactory = "kafkaListenerContainerFactory")
public void consumer(MessageEntity message){
log.info("consumer: " + gson.toJson(message));
}
3.5. 測(cè)試
啟動(dòng) SpringBoot 項(xiàng)目,通過提供的 controller 進(jìn)行請(qǐng)求生產(chǎn)消息富纸。
查看日志囤踩,成功記錄消息內(nèi)容,即為生產(chǎn)晓褪、消費(fèi)成功。
到此為止涣仿,Kafka demo 應(yīng)用已完成啦
4. 踩坑記錄
4.1. 打印日志
我在剛開始建好項(xiàng)目勤庐、配置 Kafka 后示惊,啟動(dòng)項(xiàng)目失敗,無日志輸出愉镰,不好排查問題米罚。
設(shè)置日志 level, application.properties
文件配置:
logging.level.root=debug
4.2. kafka 啟動(dòng)內(nèi)存不足
kafka 啟動(dòng) 報(bào)錯(cuò)cannot allocate memory丈探,即內(nèi)存不足
4.3. java client 連接失敗
按照本教程配置录择,已經(jīng)避免了這個(gè)問題。
【kafka】Java連接出現(xiàn)Connection refused: no further information