Kafka集群是把狀態(tài)保存在Zookeeper中的,首先要搭建Zookeeper集群。
Zookeeper集群部署請查閱上節(jié)
一、Kafka 集群下載安裝
1趁俊、下載包
$ cd /usr/local/services/src
$ wget http://apache.communilink.net/kafka/2.1.0/kafka_2.12-2.1.0.tgz
$ tar xvf kafka_2.12-2.1.0.tgz -C ../
$ cd ../kafka_2.12-2.1.0/
2、配置
broker配置:
broker配置文件為config/server.properties文件刑然,配置內(nèi)容主要分為以下幾個模塊,
Server Basics
Kafka server 基本配置
broker.id:是kafka集群server的唯一標識暇务。
broker.id=1
Socket Server Settings
Kafka 網(wǎng)絡相關配置
listeners:由用戶配置協(xié)議泼掠,ip,port垦细。
num.network.threads:這個是borker進行網(wǎng)絡處理的線程數(shù)
num.io.threads:這個是borker進行I/O處理的線程數(shù)
socket.send.buffer.bytes: 發(fā)送緩沖區(qū)buffer大小择镇,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送括改,能提高性能
socket.receive.buffer.bytes:kafka接收緩沖區(qū)大小腻豌,當數(shù)據(jù)到達一定大小后在序列化到磁盤
socket.request.max.bytes:這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個值不能超過java的堆棧大小
其他配置項,開發(fā)測試環(huán)境可使用默認配置吝梅;生產(chǎn)環(huán)境推薦如下配置虱疏。
listeners=PLAINTEXT://10.4.4.151:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
Log Basics
Kafka log 基本配置
log.dirs:log文件存儲路徑
num.partitions:topic默認的partitions數(shù)量。在創(chuàng)建topic時苏携,一般會指定partitions數(shù)量做瞪,因此該配置項在上述條件下基本無用。為了防止在創(chuàng)建topic時右冻,未指定partitions數(shù)量装蓬,因此推薦使用配置為3。
其他配置推薦使用默認配置
log.dirs=/data/kafka/kafka-sa
num.partitions=3
num.recovery.threads.per.data.dir=1
Internal Topic Settings
Kafka 內(nèi)部topic配置
開發(fā)測試環(huán)境推薦使用默認配置纱扭,均為1
生產(chǎn)環(huán)境推薦如下配置牍帚,replication數(shù)量為3,isr數(shù)量為2乳蛾。
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
Log Flush Policy
Kafka log 刷盤暗赶、落盤機制
log.flush.interval.messages:日志落盤消息條數(shù)間隔,即每接收到一定條數(shù)消息屡久,即進行l(wèi)og落盤忆首。
log.flush.interval.ms:日志落盤時間間隔,單位ms被环,即每隔一定時間糙及,即進行l(wèi)og落盤。
強烈推薦開發(fā)筛欢、測試浸锨、生產(chǎn)環(huán)境均采用默認值,即不配置該配置版姑,交由操作系統(tǒng)自行決定何時落盤柱搜,以提升性能。
若對消息高可靠性要求較高的應用系統(tǒng)剥险,可針對topic級別的配置聪蘸,配置該屬性。
Log Retention Policy
Kafka log保留策略配置
log.retention.hours:日志保留時間表制,單位小時健爬。和log.retention.minutes兩個配置只需配置一項。
message.max.bytes:表示接受消息體的最大大小么介,單位是字節(jié)
default.replication.factor:默認的備份的復制自動創(chuàng)建topics的個數(shù)
replica.fetch.max.bytes:最大備份的拉取數(shù)量
log.retention.bytes:日志保留大小娜遵。一topic的一partition下的所有日志大小總和達到該值,即進行日志清除任務壤短。當日志保留時間或日志保留大小设拟,任一條件滿足即進行日志清除任務,-1表示不限制慨仿。
log.segment.bytes:日志分段大小。即一topic的一partition下的所有日志會進行分段纳胧,達到該大小镰吆,即進行日志分段,滾動出新的日志文件躲雅。
log.retention.check.interval.ms:日志保留策略定期檢查時間間隔鼎姊,單位ms。
日志保留大小相赁,保留時間以及日志分段大小可根據(jù)具體服務器磁盤空間大小相寇,業(yè)務場景自行決定。
log.retention.hours=3
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
log.retention.bytes=5368709120
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
Zookeeper
Kafka zookeeper 配置
zookeeper.connect:zk連接地址
zookeeper.connection.timeout.ms:zk連接超時時間钮科,默認6s唤衫。可根據(jù)具體的應用場景進行更改绵脯,特可采用如下配置佳励。
zookeeper.connect=10.4.4.151:2181,10.4.4.152:2181,10.4.4.153:2181
zookeeper.connection.timeout.ms=60000
Group Coordinator Settings
Kafka consumer group 協(xié)調(diào)配置
生產(chǎn)環(huán)境推薦配置3000
開發(fā)測試環(huán)境推薦配置0
group.initial.rebalance.delay.ms=3
二、啟動Kafka集群并測試
1蛆挫、系統(tǒng)服務啟動配置
$ cat /lib/systemd/system/kafka-sa.service
[Unit]
Description=Apache kafka-sa-sales
After=network.target
[Service]
Type=simple
Environment=JAVA_HOME=/usr/local/services/jdk1.8.0_91
PIDFile=PIDFile=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-sa.pid
ExecStart=/usr/local/services/kafka_2.12-2.1.0/bin/kafka-server-start.sh /usr/local/services/kafka_2.12-2.1.0/config/server.properties
ExecStop=/bin/kill -TERM ${MAINPID}
User=user_00
Group=users
Restart=always
RestartSec=20
[Install]
WantedBy=multi-user.target
2赃承、啟動服務
從后臺啟動Kafka集群(3臺都需要啟動)
[root@localhost logs]# systemctl start kafka-sa.service
3、檢查服務是否啟動
[root@localhost logs]# jps
32502 Kafka
521 Jps
23151 QuorumPeerMain
4悴侵、創(chuàng)建Topic來驗證是否創(chuàng)建成功
# cd /usr/local/services/kafka_2.12-2.1.0/bin
# ./kafka-topics.sh --create --zookeeper 10.4.4.151:2181 --replication-factor 2 --partitions 1 --topic basketball
Created topic "basketball".
#解釋
--replication-factor 2 #復制兩份
--partitions 1 #創(chuàng)建1個分區(qū)
--topic #主題為basketball
查看所有topic和topic 狀態(tài)
# ./kafka-topics.sh --list --zookeeper 10.4.4.151:2181
basketball
# ./kafka-topics.sh --describe --zookeeper 10.4.4.151:2181 --topic basketball
Topic:basketball PartitionCount:1 ReplicationFactor:2 Configs:
Topic: basketball Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
#分區(qū)為1 復制因子為2 Topic basketball的分區(qū)為0
#Replicas: 3,1 復制的為3瞧剖,1
##創(chuàng)建一個broker,發(fā)布者發(fā)布消息
./kafka-console-producer.sh --broker-list 10.4.4.151:9092 --topic basketball
>NBA
##在到另一臺機器或同一臺一臺機器開一個終端創(chuàng)建一個消費者消費:
./kafka-console-consumer.sh --bootstrap-server 10.4.4.152:9092 --topic basketball --from-beginning
NBA
OKkafka集群搭建完畢
日志說明
server.log #kafka的運行日志
state-change.log #kafka他是用zookeeper來保存狀態(tài)可免,所以他可能會進行切換抓于,切換的日志就保存在這里
controller.log #kafka選擇一個節(jié)點作為“controller”,當發(fā)現(xiàn)有節(jié)點down掉的時候它負責在游泳分區(qū)的所有節(jié)點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區(qū)節(jié)點的主從關系。如果controller down掉了浇借,活著的節(jié)點中的一個會備切換為新的controller.
登錄zk來查看zk的目錄情況
#使用客戶端進入zk
./zkCli.sh -server 10.4.4.152:2181
#查看目錄情況 執(zhí)行“l(fā)s /”
[zk: 10.4.4.152:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: 10.4.4.152:2181(CONNECTED) 1]
#顯示結(jié)果:[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
##上面的顯示結(jié)果中:只有zookeeper是zookeeper原生的捉撮,其他都是Kafka創(chuàng)建的
#標注一個broker
[zk: 10.4.4.152:2181(CONNECTED) 3] get /brokers/ids/3
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.4.4.153:9092"],"jmx_port":-1,"host":"10.4.4.153","timestamp":"1547877474320","port":9092,"version":4}
cZxid = 0x600000182
ctime = Sat Jan 19 13:57:53 CST 2019
mZxid = 0x600000182
mtime = Sat Jan 19 13:57:53 CST 2019
pZxid = 0x600000182
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x300046cf32a0005
dataLength = 190
numChildren = 0
#查看partion
[zk: 10.4.4.152:2181(CONNECTED) 4] get /brokers/topics/basketball/partitions/0
null
cZxid = 0x60000019f
ctime = Sat Jan 19 14:07:05 CST 2019
mZxid = 0x60000019f
mtime = Sat Jan 19 14:07:05 CST 2019
pZxid = 0x6000001a0
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1