kafka cluster

一具温、說明

# 1铣猩、此文章的基礎(chǔ)概念部分參考了如下文章(總結(jié)的比較到位)
https://www.cnblogs.com/yaochunhui/p/15506926.html
# 2茴丰、其余部分參考官網(wǎng)
https://kafka.apache.org/documentation
# 3、kafka資源包
https://archive.apache.org/dist/

二峦椰、kafka是什么

kafka是一個(gè)多分區(qū)汰规、多副本且基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)。也是一個(gè)分布式流式處理平臺(tái)滔金,它以高吞吐茂嗓、可持久化、可水平擴(kuò)展钟病、支持流數(shù)據(jù)處理等多種特性而被廣泛使用刚梭。

三、kafka作用

# 1屹徘、消息系統(tǒng)
kafka具備系統(tǒng)解耦衅金、冗余存儲(chǔ)、流量削峰鉴吹、緩沖惩琉、異步通信、擴(kuò)展性良蒸、可恢復(fù)性等功能。與此同時(shí)嫩痰,Kafka 還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障及回溯消費(fèi)的功能串纺。
# 2、存儲(chǔ)系統(tǒng)
Kafka 把消息持久化到磁盤造垛,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言五辽,有效地降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。也正是得益于Kafka 的消息持久化功能和多副本機(jī)制杆逗,我們可以把Kafka作為長期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來使用罪郊,只需要把對應(yīng)的數(shù)據(jù)保留策略設(shè)置為“永久”或啟用主題的日志壓縮功能即可。
# 3靶累、流式處理平臺(tái)
Kafka 不僅為每個(gè)流行的流式處理框架提供了可靠的數(shù)據(jù)來源癣疟,還提供了一個(gè)完整的流式處理類庫,比如窗口邪蛔、連接扎狱、變換和聚合等各類操作。

四匠抗、kafka結(jié)構(gòu)和術(shù)語

一個(gè)典型的 Kafka 體系架構(gòu)包括若干 Producer污抬、若干 Broker、若干 Consumer著蛙,以及一個(gè)ZooKeeper集群耳贬,如下圖所示。其中ZooKeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理顷蟆、控制器的選舉等操作的腐魂。Producer將消息發(fā)送到Broker,Broker負(fù)責(zé)將收到的消息存儲(chǔ)到磁盤中削樊,而Consumer負(fù)責(zé)從Broker訂閱并消費(fèi)消息兔毒。
kafka結(jié)構(gòu)圖
# 1育叁、Producer
生產(chǎn)者,也就是發(fā)送消息的一方谴蔑。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息龟梦,然后將其投遞到Kafka中。
# 2成榜、Consumer
消費(fèi)者蹦玫,也就是接收消息的一方。消費(fèi)者連接到Kafka上并接收消息挣输,進(jìn)而進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理福贞。
# 3、Consumer Group (CG)
消費(fèi)者組完丽,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)蜻底,一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi)聘鳞;消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組站楚,即消費(fèi)者組是邏輯上的一個(gè)訂閱者搏嗡。
# 4采盒、Broker
服務(wù)代理節(jié)點(diǎn)。對于Kafka而言纽甘,Broker可以簡單地看作一個(gè)獨(dú)立的Kafka服務(wù)節(jié)點(diǎn)或Kafka服務(wù)實(shí)例悍赢。一個(gè)kafka集群由多個(gè) broker 組成。一個(gè) broker可以容納多個(gè) topic皮胡。
# 5赏迟、Topic
Kafka中的消息以主題為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題(發(fā)送到Kafka集群中的每一條消息都要指定一個(gè)主題)甩栈,而消費(fèi)者負(fù)責(zé)訂閱主題并進(jìn)行消費(fèi)
# 6糕再、Partition
主題是一個(gè)邏輯上的概念,它還可以細(xì)分為多個(gè)分區(qū)殴蹄,一個(gè)分區(qū)只屬于單個(gè)主題,很多時(shí)候也會(huì)把分區(qū)稱為主題分區(qū)(Topic-Partition)刺下。同一主題下的不同分區(qū)包含的消息是不同的稽荧,分區(qū)在存儲(chǔ)層面可以看作一個(gè)可追加的日志(Log)文件蛤克,消息在被追加到分區(qū)日志文件的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)夷蚊。offset是消息在分區(qū)中的唯一標(biāo)識,Kafka通過它來保證消息在分區(qū)內(nèi)的順序性筋现,不過offset并不跨越分區(qū)箱歧,也就是說,Kafka保證的是分區(qū)有序而不是主題有序洒沦。
Kafka中的分區(qū)可以分布在不同的服務(wù)器(broker)上价淌,也就是說,一個(gè)主題可以橫跨多個(gè)broker括尸,以此來提供比單個(gè)broker更強(qiáng)大的性能病毡。
每一條消息被發(fā)送到broker之前啦膜,會(huì)根據(jù)分區(qū)規(guī)則選擇存儲(chǔ)到哪個(gè)具體的分區(qū)。如果分區(qū)規(guī)則設(shè)定得合理娶眷,所有的消息都可以均勻地分配到不同的分區(qū)中啸臀。如果一個(gè)主題只對應(yīng)一個(gè)文件烁落,那么這個(gè)文件所在的機(jī)器 I/O 將會(huì)成為這個(gè)主題的性能瓶頸豌注,而分區(qū)解決了這個(gè)問題轧铁。 (7)Replica:Kafka 為分區(qū)引入了多副本(Replica)機(jī)制,通過增加副本數(shù)量可以提升容災(zāi)能力药薯。同一分區(qū)的不同副本中保存的是相同的消息(在同一時(shí)刻救斑,副本之間并非完全一樣),副本之間是“一主多從”的關(guān)系穷娱,其中l(wèi)eader副本負(fù)責(zé)處理讀寫請求运沦,follower副本只負(fù)責(zé)與leader副本的消息同步。副本處于不同的broker中嫁盲,當(dāng)leader副本出現(xiàn)故障時(shí)烈掠,從follower副本中重新選舉新的leader副本對外提供服務(wù)向叉。Kafka通過多副本機(jī)制實(shí)現(xiàn)了故障的自動(dòng)轉(zhuǎn)移,當(dāng)Kafka集群中某個(gè)broker失效時(shí)仍然能保證服務(wù)可用瘦黑。
Kafka 消費(fèi)端也具備一定的容災(zāi)能力奇唤。Consumer 使用拉(Pull)模式從服務(wù)端拉取消息,并且保存消費(fèi)的具體位置甲葬,當(dāng)消費(fèi)者宕機(jī)后恢復(fù)上線時(shí)可以根據(jù)之前保存的消費(fèi)位置重新拉取需要的消息進(jìn)行消費(fèi)懈贺,這樣就不會(huì)造成消息丟失。

五画侣、kafka 集群搭建

5.1配乱、環(huán)境

ip hostname 備注
192.168.13.210 kafka-01 kafka broker、zookeeper
192.168.13.213 kafka-02 kafka broker桑寨、zookeeper
192.168.13.223 kafka-03 kafka broker忿檩、zookeeper

5.2休溶、獲取資源包

# 如下步驟需要在所有節(jié)點(diǎn)上操作
cd ~ &&  mkdir efk && yum install wget -y
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xf kafka_2.12-2.6.2.tgz
cp -r kafka_2.12-2.6.2 /opt/kafka
mkdir /data/{kafka,zookeeper,kafka-logs,zkdatalog} -p

5.3扰她、配置文件

5.3.1 192.168.13.210(kafka-01)

[root@kafka-01 bin]# cat /data/zookeeper/myid 
1
[root@kafka-01 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-01 bin]# cat /opt/kafka/config/server.properties
# broker.id徒役,同一個(gè)集群里面,每一個(gè)服務(wù)器都需要唯一的一個(gè)ID杉女,非負(fù)整數(shù),kafka節(jié)點(diǎn)通過id來識別broker節(jié)點(diǎn)鸳吸。當(dāng)該節(jié)點(diǎn)的ip地址發(fā)生變化,broker.id沒有變化坎拐,則不會(huì)影響consumers的消息情況养匈。
broker.id=0
# listeners呕乎,監(jiān)聽客戶端請求的IP和端口,默認(rèn)都是9092
listeners=PLAINTEXT://192.168.13.210:9092
host.name=192.168.13.210
port=9092
# broker處理網(wǎng)絡(luò)(io)的線程數(shù)帝璧,一般情況下不需要去修改
num.network.threads=3
# broker處理磁盤io的線程數(shù)
num.io.threads=8
# socket收發(fā)的緩沖區(qū)大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# socket請求的最大數(shù)值
socket.request.max.bytes=104857600
# kafka數(shù)據(jù)存放地址,多個(gè)地址的話用逗號分割
log.dirs=/data/kafka-logs
# 默認(rèn)的partition數(shù)目
num.partitions=3
num.recovery.threads.per.data.dir=1
# 數(shù)據(jù)保存時(shí)間
log.retention.hours=168
# 文件分段的大小
# topic分區(qū)是以一堆segment文件存儲(chǔ)的谆膳,這個(gè)參數(shù)用來控制每個(gè)segment的大小
log.segment.bytes=1073741824
# 文件大小檢查的周期時(shí)間
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
# zk連接超時(shí)
zookeeper.connection.timeout.ms=6000
# 我的kafka集群有三個(gè)broker漱病,當(dāng)其中一個(gè)broker宕機(jī)后把曼,會(huì)影響消費(fèi),報(bào)錯(cuò)信息如下:
# 【Number of alive brokers '2' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)】
# 這個(gè)值默認(rèn)是3(3個(gè)拷貝注盈、replication)叙赚,當(dāng)發(fā)生宕機(jī)后震叮,就無法滿足3個(gè)cp,會(huì)影響消費(fèi)尉间,在這我設(shè)置為2击罪,在有broker宕機(jī)的情況下不影響使用。
offsets.topic.replication.factor=2

5.3.2 192.168.13.213(kafka-02)

[root@kafka-02 ~]# cat /data/zookeeper/myid
2
[root@kafka-02 ~]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-02 ~]# cat /opt/kafka/config/server.properties 
broker.id=1
listeners=PLAINTEXT://192.168.13.213:9092
host.name=192.168.13.213
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2

5.3.3 192.168.13.223(kafka-03)

[root@kafka-03 bin]# cat /data/zookeeper/myid
3
[root@kafka-03 bin]# cat /opt/kafka/config/zookeeper.properties
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/
dataLogDir=/data/zkdatalog/
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=192.168.13.210:2888:3888
server.2=192.168.13.213:2888:3888
server.3=192.168.13.223:2888:3888
[root@kafka-03 bin]# cat /opt/kafka/config/server.properties 
broker.id=2
listeners=PLAINTEXT://192.168.13.223:9092
host.name=192.168.13.223
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.13.210:2181,192.168.13.213:2181,192.168.13.223:2181
zookeeper.connection.timeout.ms=6000
offsets.topic.replication.factor=2

六、集群測試

6.1侦啸、創(chuàng)建topic

rootkafka-topics.sh --create --zookeeper 192.168.13.210:2181 --replication-factor 3 --partitions 1 --topic fzh

6.2丧枪、發(fā)送消息

./kafka-console-producer.sh  --topic fzh --bootstrap-server 192.168.13.210:9092
發(fā)送消息

6.3拧烦、打開消息監(jiān)聽

./kafka-console-consumer.sh --topic fzh --from-beginning --bootstrap-server 192.168.13.223:9092
消費(fèi)消息
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末婚脱,一起剝皮案震驚了整個(gè)濱河市填硕,隨后出現(xiàn)的幾起案子炼吴,更是在濱河造成了極大的恐慌硅蹦,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件童芹,死亡現(xiàn)場離奇詭異涮瞻,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)假褪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進(jìn)店門署咽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人生音,你說我怎么就攤上這事宁否。” “怎么了缀遍?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵慕匠,是天一觀的道長瑟由。 經(jīng)常有香客問我絮重,道長冤寿,這世上最難降的妖魔是什么歹苦? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮督怜,結(jié)果婚禮上殴瘦,老公的妹妹穿的比我還像新娘。我一直安慰自己号杠,他們只是感情好蚪腋,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著姨蟋,像睡著了一般屉凯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上眼溶,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天悠砚,我揣著相機(jī)與錄音,去河邊找鬼堂飞。 笑死灌旧,一個(gè)胖子當(dāng)著我的面吹牛绑咱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播枢泰,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼描融,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了衡蚂?” 一聲冷哼從身側(cè)響起窿克,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎讳窟,沒想到半個(gè)月后让歼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡丽啡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年谋右,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片补箍。...
    茶點(diǎn)故事閱讀 40,137評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡改执,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出坑雅,到底是詐尸還是另有隱情辈挂,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布裹粤,位于F島的核電站终蒂,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏遥诉。R本人自食惡果不足惜拇泣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望矮锈。 院中可真熱鬧霉翔,春花似錦、人聲如沸苞笨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瀑凝。三九已至序芦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間粤咪,已是汗流浹背谚中。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人藏杖。 一個(gè)月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓将塑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蝌麸。 傳聞我的和親對象是個(gè)殘疾皇子点寥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容