一、前言
1结窘、Kafka簡介
Kafka是一個開源的分布式消息引擎/消息中間件如孝,同時Kafka也是一個流處理平臺。Kakfa支持以發(fā)布/訂閱的方式在應(yīng)用間傳遞消息常熙,同時并基于消息功能添加了Kafka Connect纬乍、Kafka Streams以支持連接其他系統(tǒng)的數(shù)據(jù)(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的還是他的消息引擎症概,所以Kafka大部分應(yīng)用場景還是用來作為消息隊列削峰平谷蕾额。另外,Kafka也是目前性能最好的消息中間件彼城。
2诅蝶、Kafka架構(gòu)
在Kafka集群(Cluster)中,一個Kafka節(jié)點就是一個Broker,消息由Topic來承載,可以存儲在1個或多個Partition中派草。發(fā)布消息的應(yīng)用為Producer、消費消息的應(yīng)用為Consumer缰泡,多個Consumer可以促成Consumer Group共同消費一個Topic中的消息。
概念/對象 | 簡單說明 |
---|---|
Broker | Kafka節(jié)點 |
Topic | 主題,用來承載消息 |
Partition | 分區(qū)棘钞,用于主題分片存儲 |
Producer | 生產(chǎn)者缠借,向主題發(fā)布消息的應(yīng)用 |
Consumer | 消費者,從主題訂閱消息的應(yīng)用 |
Consumer Group | 消費者組宜猜,由多個消費者組成 |
3泼返、準(zhǔn)備工作
1、Kafka服務(wù)器
準(zhǔn)備3臺CentOS服務(wù)器姨拥,并配置好靜態(tài)IP绅喉、主機(jī)名
服務(wù)器名 | IP | 說明 |
---|---|---|
kafka01 | 192.168.88.51 | Kafka節(jié)點1 |
kafka02 | 192.168.88.52 | Kafka節(jié)點2 |
kafka03 | 192.168.88.53 | Kafka節(jié)點3 |
軟件版本說明
項 | 說明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
2、ZooKeeper集群
Kakfa集群需要依賴ZooKeeper存儲Broker叫乌、Topic等信息柴罐,這里我們部署三臺ZK
服務(wù)器名 | IP | 說明 |
---|---|---|
zk01 | 192.168.88.21 | ZooKeeper節(jié)點 |
zk02 | 192.168.88.22 | ZooKeeper節(jié)點 |
zk03 | 192.168.88.23 | ZooKeeper節(jié)點 |
部署過程參考:https://ken.io/note/zookeeper-cluster-deploy-guide
二、部署過程
1憨奸、應(yīng)用&數(shù)據(jù)目錄
#創(chuàng)建應(yīng)用目錄
mkdir /usr/kafka
#創(chuàng)建Kafka數(shù)據(jù)目錄
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka
2革屠、下載&解壓
Kafka官方下載地址:https://kafka.apache.org/downloads
這次我下載的是2.3.0版本
#創(chuàng)建并進(jìn)入下載目錄
mkdir /home/downloads
cd /home/downloads
#下載安裝包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解壓到應(yīng)用目錄
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0才是Kafka的版本
3膀藐、Kafka節(jié)點配置
#進(jìn)入應(yīng)用目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#修改配置文件
vi config/server.properties
通用配置
配置日志目錄屠阻、指定ZooKeeper服務(wù)器
# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs
# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181
分節(jié)點配置
- Kafka01
broker.id=0
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
- Kafka02
broker.id=1
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
- Kafka03
broker.id=2
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092
4、防火墻配置
#開放端口
firewall-cmd --add-port=9092/tcp --permanent
#重新加載防火墻配置
firewall-cmd --reload
5额各、啟動Kafka
#進(jìn)入kafka根目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#啟動
/bin/kafka-server-start.sh config/server.properties &
#啟動成功輸出示例(最后幾行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
三、Kafka測試
1吧恃、創(chuàng)建Topic
在kafka01(Broker)上創(chuàng)建測試Tpoic:test-ken-io虾啦,這里我們指定了3個副本、1個分區(qū)
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
Topic在kafka01上創(chuàng)建后也會同步到集群中另外兩個Broker:kafka02痕寓、kafka03
2傲醉、查看Topic
我們可以通過命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092
3、發(fā)送消息
這里我們向Broker(id=0)的Topic=test-ken-io發(fā)送消息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io
#消息內(nèi)容
> test by ken.io
4呻率、消費消息
在Kafka02上消費Broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在Kafka03上消費Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然后均能收到消息
test by ken.io
這是因為這兩個消費消息的命令是建立了兩個不同的Consumer
如果我們啟動Consumer指定Consumer Group Id就可以作為一個消費組協(xié)同工硬毕,1個消息同時只會被一個Consumer消費到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
四、備注
1礼仗、Kafka常用配置項說明
Kafka常用Broker配置說明:
配置項 | 默認(rèn)值/示例值 | 說明 |
---|---|---|
broker.id | 0 | Broker唯一標(biāo)識 |
listeners | PLAINTEXT://192.168.88.53:9092 | 監(jiān)聽信息吐咳,PLAINTEXT表示明文傳輸 |
log.dirs | kafka/logs | kafka數(shù)據(jù)存放地址,可以填寫多個元践。用","間隔 |
message.max.bytes | message.max.bytes | 單個消息長度限制韭脊,單位是字節(jié) |
num.partitions | 1 | 默認(rèn)分區(qū)數(shù) |
log.flush.interval.messages | Long.MaxValue | 在數(shù)據(jù)被寫入到硬盤和消費者可用前最大累積的消息的數(shù)量 |
log.flush.interval.ms | Long.MaxValue | 在數(shù)據(jù)被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數(shù)據(jù)是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 24 | 控制一個log保留時間单旁,單位:小時 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服務(wù)器地址沪羔,多臺用","間隔 |
2、附錄
—-
本文首發(fā)于我的獨立博客:https://ken.io/note/kafka-cluster-deploy-guide