一苏携、環(huán)境準(zhǔn)備:
??首先準(zhǔn)備好三臺CentOS系統(tǒng)的主機做瞪,設(shè)置ip為:172.16.20.220对粪、172.16.20.221、172.16.20.222装蓬。
??Kafka會使用大量文件和網(wǎng)絡(luò)socket著拭,Linux默認(rèn)配置的File descriptors(文件描述符)不能夠滿足Kafka高吞吐量的要求,所以這里需要調(diào)整(更多性能優(yōu)化牍帚,請查看Kafka官方文檔):
vi /etc/security/limits.conf
# 在最后加入儡遮,修改完成后,重啟系統(tǒng)生效暗赶。
* soft nofile 131072
* hard nofile 131072
??新建kafka的日志目錄和zookeeper數(shù)據(jù)目錄鄙币,因為這兩項默認(rèn)放在tmp目錄肃叶,而tmp目錄中內(nèi)容會隨重啟而丟失,所以我們自定義以下目錄:
mkdir /data/zookeeper
mkdir /data/zookeeper/data
mkdir /data/zookeeper/logs
mkdir /data/kafka
mkdir /data/kafka/data
mkdir /data/kafka/logs
二、zookeeper.properties配置
vi /usr/local/kafka/config/zookeeper.properties
修改如下:
# 修改為自定義的zookeeper數(shù)據(jù)目錄
dataDir=/data/zookeeper/data
# 修改為自定義的zookeeper日志目錄
dataLogDir=/data/zookeeper/logs
# 端口
clientPort=2181
# 注釋掉
#maxClientCnxns=0
# 設(shè)置連接參數(shù)十嘿,添加如下配置
# 為zk的基本時間單元因惭,毫秒
tickTime=2000
# Leader-Follower初始通信時限 tickTime*10
initLimit=10
# Leader-Follower同步通信時限 tickTime*5
syncLimit=5
# 設(shè)置broker Id的服務(wù)地址,本機ip一定要用0.0.0.0代替
server.1=0.0.0.0:2888:3888
server.2=172.16.20.221:2888:3888
server.3=172.16.20.222:2888:3888
三绩衷、在各臺服務(wù)器的zookeeper數(shù)據(jù)目錄/data/zookeeper/data添加myid文件蹦魔,寫入服務(wù)broker.id屬性值
在data文件夾中新建myid文件,myid文件的內(nèi)容為1(一句話創(chuàng)建:echo 1 > myid)
cd /data/zookeeper/data
vi myid
#添加內(nèi)容:1 其他兩臺主機分別配置 2和3
1
四咳燕、kafka配置勿决,進(jìn)入config目錄下,修改server.properties文件
vi /usr/local/kafka/config/server.properties
# 每臺服務(wù)器的broker.id都不能相同
broker.id=1
# 是否可以刪除topic
delete.topic.enable=true
# topic 在當(dāng)前broker上的分片個數(shù)招盲,與broker保持一致
num.partitions=3
# 每個主機地址不一樣:
listeners=PLAINTEXT://172.16.20.220:9092
advertised.listeners=PLAINTEXT://172.16.20.220:9092
# 具體一些參數(shù)
log.dirs=/data/kafka/kafka-logs
# 設(shè)置zookeeper集群地址與端口如下:
zookeeper.connect=172.16.20.220:2181,172.16.20.221:2181,172.16.20.222:2181
五低缩、Kafka啟動
kafka啟動時先啟動zookeeper,再啟動kafka宪肖;關(guān)閉時相反表制,先關(guān)閉kafka,再關(guān)閉zookeeper控乾。
1么介、zookeeper啟動命令
./zookeeper-server-start.sh ../config/zookeeper.properties &
后臺運行啟動命令:
nohup ./zookeeper-server-start.sh ../config/zookeeper.properties >/data/zookeeper/logs/zookeeper.log 2>1 &
或者
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
查看集群狀態(tài):
./zookeeper-server-start.sh status ../config/zookeeper.properties
2、kafka啟動命令
./kafka-server-start.sh ../config/server.properties &
后臺運行啟動命令:
nohup bin/kafka-server-start.sh ../config/server.properties >/data/kafka/logs/kafka.log 2>1 &
或者
./kafka-server-start.sh -daemon ../config/server.properties &
3蜕衡、創(chuàng)建topic壤短,最新版本已經(jīng)不需要使用zookeeper參數(shù)創(chuàng)建。
./kafka-topics.sh --create --replication-factor 2 --partitions 1 --topic test --bootstrap-server 172.16.20.220:9092
參數(shù)解釋:
復(fù)制兩份
--replication-factor 2
創(chuàng)建1個分區(qū)
--partitions 1
topic 名稱
--topic test
4慨仿、查看已經(jīng)存在的topic(三臺設(shè)備都執(zhí)行時可以看到)
./kafka-topics.sh --list --bootstrap-server 172.16.20.220:9092
5久脯、啟動生產(chǎn)者:
./kafka-console-producer.sh --broker-list 172.16.20.220:9092 --topic test
6、啟動消費者:
./kafka-console-consumer.sh --bootstrap-server 172.16.20.221:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server 172.16.20.222:9092 --topic test
添加參數(shù) --from-beginning 從開始位置消費镰吆,不是從最新消息
./kafka-console-consumer.sh --bootstrap-server 172.16.20.221 --topic test --from-beginning
7帘撰、測試:在生產(chǎn)者輸入test,可以在消費者的兩臺服務(wù)器上看到同樣的字符test万皿,說明Kafka服務(wù)器集群已搭建成功摧找。