一具温、說明
# 1铣猩、此文章的基礎(chǔ)概念部分參考了如下文章(總結(jié)的比較到位)
https://www.cnblogs.com/yaochunhui/p/15506926.html
# 2茴丰、其余部分參考官網(wǎng)
https://kafka.apache.org/documentation
# 3、kafka資源包
https://archive.apache.org/dist/
二峦椰、kafka是什么
kafka是一個(gè)多分區(qū)汰规、多副本且基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)。也是一個(gè)分布式流式處理平臺(tái)滔金,它以高吞吐茂嗓、可持久化、可水平擴(kuò)展钟病、支持流數(shù)據(jù)處理等多種特性而被廣泛使用刚梭。
三、kafka作用
# 1屹徘、消息系統(tǒng)
kafka具備系統(tǒng)解耦衅金、冗余存儲(chǔ)、流量削峰鉴吹、緩沖惩琉、異步通信、擴(kuò)展性良蒸、可恢復(fù)性等功能。與此同時(shí)嫩痰,Kafka 還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障及回溯消費(fèi)的功能串纺。
# 2、存儲(chǔ)系統(tǒng)
Kafka 把消息持久化到磁盤造垛,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言五辽,有效地降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。也正是得益于Kafka 的消息持久化功能和多副本機(jī)制杆逗,我們可以把Kafka作為長期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來使用罪郊,只需要把對應(yīng)的數(shù)據(jù)保留策略設(shè)置為“永久”或啟用主題的日志壓縮功能即可。
# 3靶累、流式處理平臺(tái)
Kafka 不僅為每個(gè)流行的流式處理框架提供了可靠的數(shù)據(jù)來源癣疟,還提供了一個(gè)完整的流式處理類庫,比如窗口邪蛔、連接扎狱、變換和聚合等各類操作。
四匠抗、kafka結(jié)構(gòu)和術(shù)語
一個(gè)典型的 Kafka 體系架構(gòu)包括若干 Producer污抬、若干 Broker、若干 Consumer著蛙,以及一個(gè)ZooKeeper集群耳贬,如下圖所示。其中ZooKeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理顷蟆、控制器的選舉等操作的腐魂。Producer將消息發(fā)送到Broker,Broker負(fù)責(zé)將收到的消息存儲(chǔ)到磁盤中削樊,而Consumer負(fù)責(zé)從Broker訂閱并消費(fèi)消息兔毒。
kafka結(jié)構(gòu)圖
# 1育叁、Producer
生產(chǎn)者,也就是發(fā)送消息的一方谴蔑。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息龟梦,然后將其投遞到Kafka中。
# 2成榜、Consumer
消費(fèi)者蹦玫,也就是接收消息的一方。消費(fèi)者連接到Kafka上并接收消息挣输,進(jìn)而進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理福贞。
# 3、Consumer Group (CG)
消費(fèi)者組完丽,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)蜻底,一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)聘鳞;消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組站楚,即消費(fèi)者組是邏輯上的一個(gè)訂閱者搏嗡。
# 4采盒、Broker
服務(wù)代理節(jié)點(diǎn)。對于Kafka而言纽甘,Broker可以簡單地看作一個(gè)獨(dú)立的Kafka服務(wù)節(jié)點(diǎn)或Kafka服務(wù)實(shí)例悍赢。一個(gè)kafka集群由多個(gè) broker 組成。一個(gè) broker可以容納多個(gè) topic皮胡。
# 5赏迟、Topic
Kafka中的消息以主題為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題(發(fā)送到Kafka集群中的每一條消息都要指定一個(gè)主題)甩栈,而消費(fèi)者負(fù)責(zé)訂閱主題并進(jìn)行消費(fèi)
# 6糕再、Partition
主題是一個(gè)邏輯上的概念,它還可以細(xì)分為多個(gè)分區(qū)殴蹄,一個(gè)分區(qū)只屬于單個(gè)主題,很多時(shí)候也會(huì)把分區(qū)稱為主題分區(qū)(Topic-Partition)刺下。同一主題下的不同分區(qū)包含的消息是不同的稽荧,分區(qū)在存儲(chǔ)層面可以看作一個(gè)可追加的日志(Log)文件蛤克,消息在被追加到分區(qū)日志文件的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)夷蚊。offset是消息在分區(qū)中的唯一標(biāo)識,Kafka通過它來保證消息在分區(qū)內(nèi)的順序性筋现,不過offset并不跨越分區(qū)箱歧,也就是說,Kafka保證的是分區(qū)有序而不是主題有序洒沦。
Kafka中的分區(qū)可以分布在不同的服務(wù)器(broker)上价淌,也就是說,一個(gè)主題可以橫跨多個(gè)broker括尸,以此來提供比單個(gè)broker更強(qiáng)大的性能病毡。
每一條消息被發(fā)送到broker之前啦膜,會(huì)根據(jù)分區(qū)規(guī)則選擇存儲(chǔ)到哪個(gè)具體的分區(qū)。如果分區(qū)規(guī)則設(shè)定得合理娶眷,所有的消息都可以均勻地分配到不同的分區(qū)中啸臀。如果一個(gè)主題只對應(yīng)一個(gè)文件烁落,那么這個(gè)文件所在的機(jī)器 I/O 將會(huì)成為這個(gè)主題的性能瓶頸豌注,而分區(qū)解決了這個(gè)問題轧铁。 (7)Replica:Kafka 為分區(qū)引入了多副本(Replica)機(jī)制,通過增加副本數(shù)量可以提升容災(zāi)能力药薯。同一分區(qū)的不同副本中保存的是相同的消息(在同一時(shí)刻救斑,副本之間并非完全一樣),副本之間是“一主多從”的關(guān)系穷娱,其中l(wèi)eader副本負(fù)責(zé)處理讀寫請求运沦,follower副本只負(fù)責(zé)與leader副本的消息同步。副本處于不同的broker中嫁盲,當(dāng)leader副本出現(xiàn)故障時(shí)烈掠,從follower副本中重新選舉新的leader副本對外提供服務(wù)向叉。Kafka通過多副本機(jī)制實(shí)現(xiàn)了故障的自動(dòng)轉(zhuǎn)移,當(dāng)Kafka集群中某個(gè)broker失效時(shí)仍然能保證服務(wù)可用瘦黑。
Kafka 消費(fèi)端也具備一定的容災(zāi)能力奇唤。Consumer 使用拉(Pull)模式從服務(wù)端拉取消息,并且保存消費(fèi)的具體位置甲葬,當(dāng)消費(fèi)者宕機(jī)后恢復(fù)上線時(shí)可以根據(jù)之前保存的消費(fèi)位置重新拉取需要的消息進(jìn)行消費(fèi)懈贺,這樣就不會(huì)造成消息丟失。
五画侣、kafka 集群搭建
5.1配乱、環(huán)境
ip | hostname | 備注 |
---|---|---|
192.168.13.210 | kafka-01 | kafka broker、zookeeper |
192.168.13.213 | kafka-02 | kafka broker桑寨、zookeeper |
192.168.13.223 | kafka-03 | kafka broker忿檩、zookeeper |
5.2休溶、獲取資源包
# 如下步驟需要在所有節(jié)點(diǎn)上操作
cd ~ && mkdir efk && yum install wget -y
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xf kafka_2.12-2.6.2.tgz
cp -r kafka_2.12-2.6.2 /opt/kafka
mkdir /data/{kafka,zookeeper,kafka-logs,zkdatalog} -p
5.3扰她、配置文件
5.3.1 192.168.13.210(kafka-01)
[root@kafka-01 bin]# cat /data/zookeeper/myid
1
[root@kafka-01 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-01 bin]# cat /opt/kafka/config/server.properties
# broker.id徒役,同一個(gè)集群里面,每一個(gè)服務(wù)器都需要唯一的一個(gè)ID杉女,非負(fù)整數(shù),kafka節(jié)點(diǎn)通過id來識別broker節(jié)點(diǎn)鸳吸。當(dāng)該節(jié)點(diǎn)的ip地址發(fā)生變化,broker.id沒有變化坎拐,則不會(huì)影響consumers的消息情況养匈。
broker.id=0
# listeners呕乎,監(jiān)聽客戶端請求的IP和端口,默認(rèn)都是9092
listeners=PLAINTEXT://192.168.13.210:9092
host.name=192.168.13.210
port=9092
# broker處理網(wǎng)絡(luò)(io)的線程數(shù)帝璧,一般情況下不需要去修改
num.network.threads=3
# broker處理磁盤io的線程數(shù)
num.io.threads=8
# socket收發(fā)的緩沖區(qū)大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# socket請求的最大數(shù)值
socket.request.max.bytes=104857600
# kafka數(shù)據(jù)存放地址,多個(gè)地址的話用逗號分割
log.dirs=/data/kafka-logs
# 默認(rèn)的partition數(shù)目
num.partitions=3
num.recovery.threads.per.data.dir=1
# 數(shù)據(jù)保存時(shí)間
log.retention.hours=168
# 文件分段的大小
# topic分區(qū)是以一堆segment文件存儲(chǔ)的谆膳,這個(gè)參數(shù)用來控制每個(gè)segment的大小
log.segment.bytes=1073741824
# 文件大小檢查的周期時(shí)間
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
# zk連接超時(shí)
zookeeper.connection.timeout.ms=6000
# 我的kafka集群有三個(gè)broker漱病,當(dāng)其中一個(gè)broker宕機(jī)后把曼,會(huì)影響消費(fèi),報(bào)錯(cuò)信息如下:
# 【Number of alive brokers '2' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)】
# 這個(gè)值默認(rèn)是3(3個(gè)拷貝注盈、replication)叙赚,當(dāng)發(fā)生宕機(jī)后震叮,就無法滿足3個(gè)cp,會(huì)影響消費(fèi)尉间,在這我設(shè)置為2击罪,在有broker宕機(jī)的情況下不影響使用。
offsets.topic.replication.factor=2
5.3.2 192.168.13.213(kafka-02)
[root@kafka-02 ~]# cat /data/zookeeper/myid
2
[root@kafka-02 ~]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-02 ~]# cat /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.13.213:9092
host.name=192.168.13.213
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2
5.3.3 192.168.13.223(kafka-03)
[root@kafka-03 bin]# cat /data/zookeeper/myid
3
[root@kafka-03 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-03 bin]# cat /opt/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.13.223:9092
host.name=192.168.13.223
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2
六、集群測試
6.1侦啸、創(chuàng)建topic
rootkafka-topics.sh --create --zookeeper 192.168.13.210:2181 --replication-factor 3 --partitions 1 --topic fzh
6.2丧枪、發(fā)送消息
./kafka-console-producer.sh --topic fzh --bootstrap-server 192.168.13.210:9092
發(fā)送消息
6.3拧烦、打開消息監(jiān)聽
./kafka-console-consumer.sh --topic fzh --from-beginning --bootstrap-server 192.168.13.223:9092
消費(fèi)消息