一比然、概述
(一)、kafka的定義
1周循、定義
1)kafka傳統(tǒng)的定義:kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列强法,主要用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域
2)kafka最新的定義:kafka是一個(gè)開源的分布式事件流平臺(tái)(event stream platform),主要用高性能數(shù)據(jù)管道湾笛,流分析饮怯,數(shù)據(jù)集成和關(guān)鍵任務(wù)等領(lǐng)域
2、消息隊(duì)列
目前市面上大部分公司采用的消息隊(duì)列主要有kafka嚎研,activeMQ蓖墅,rabbitMQ,rocketMQ等。kafka作為消息隊(duì)列论矾,主要應(yīng)用于大數(shù)據(jù)場(chǎng)景下教翩,而在Javaee開發(fā)中更多采用的是activeMQ,rabbitMQ,rockectMQ等贪壳。
3饱亿、消息隊(duì)列的應(yīng)用場(chǎng)景
傳統(tǒng)的消息隊(duì)列的主要應(yīng)用場(chǎng)景包括:緩沖/削峰,解耦和異步通信
緩沖/削峰:在實(shí)際的應(yīng)用系統(tǒng)中寥袭,如果數(shù)據(jù)生產(chǎn)端(比如其前端)的數(shù)據(jù)產(chǎn)生的速率與數(shù)據(jù)處理端(服務(wù)端)的數(shù)據(jù)處理速率相當(dāng)或小于時(shí)路捧,整合系統(tǒng)運(yùn)行就不會(huì)有很大的壓力。但是當(dāng)系統(tǒng)上了個(gè)秒殺活動(dòng)或者雙11活動(dòng)到來(lái)传黄,前端用戶猛增杰扫,數(shù)據(jù)率隨之也會(huì)增加數(shù)倍,甚至是數(shù)十倍膘掰。但是服務(wù)端需要對(duì)數(shù)據(jù)進(jìn)行處理章姓,持久化等操作,處理速率必然跟不上數(shù)據(jù)產(chǎn)生的速度识埋,久而久之系統(tǒng)就會(huì)產(chǎn)生數(shù)據(jù)積壓凡伊,最終就有可能導(dǎo)致系統(tǒng)的崩潰。在數(shù)據(jù)生產(chǎn)端和處理端之間使用消息隊(duì)列就可以解決這種問(wèn)題窒舟。此時(shí)系忙,消息隊(duì)列就發(fā)揮了不同系統(tǒng)之間數(shù)據(jù)的緩沖和削峰的作用。數(shù)據(jù)生產(chǎn)端將數(shù)據(jù)發(fā)送到消息隊(duì)列惠豺,然后隨即返回響應(yīng)银还,這個(gè)過(guò)程相對(duì)來(lái)說(shuō)是非常快的洁墙。數(shù)據(jù)處理端則根據(jù)自己的處理速度從消息隊(duì)列中拉取數(shù)據(jù)蛹疯。示意圖如下:
不使用消息隊(duì)列的情況
使用消息隊(duì)列的情況
解耦:允許獨(dú)立的拓展和修改兩邊的處理過(guò)程,但兩邊需要確保使用相同的接口約束热监。
異步通信:將處理的用戶數(shù)據(jù)寫入到消息隊(duì)列中捺弦,并立即返回處理結(jié)果,隊(duì)列中數(shù)據(jù)由另一個(gè)線程拉取出來(lái)做響應(yīng)的處理孝扛。下面是用戶注冊(cè)列吼,并把注冊(cè)成功的消息發(fā)送到用戶手機(jī)上的同步處理和異步處理的流程。
(二)苦始、kafka基礎(chǔ)架構(gòu)
1冈欢、消息隊(duì)列的兩種模式
1)點(diǎn)對(duì)點(diǎn)模式
消費(fèi)者主動(dòng)拉取數(shù)據(jù),數(shù)據(jù)消費(fèi)完后就會(huì)在隊(duì)列中刪除
2)發(fā)布/訂閱模式
可以有有多個(gè)主題(topic)
消費(fèi)者拉取數(shù)據(jù)消費(fèi)完后盈简,不刪除數(shù)據(jù)
每個(gè)消費(fèi)者相互獨(dú)立,都可以消費(fèi)到數(shù)據(jù)
2、基礎(chǔ)架構(gòu)
1)producer:消息生產(chǎn)者柠贤,就是向broker發(fā)送消息的客戶端
2)consumer:消息消費(fèi)者香浩,就是從broker拉取數(shù)據(jù)的客戶端
3)consumer group:消費(fèi)者組,由多個(gè)消費(fèi)者consumer組成臼勉。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同的分區(qū)邻吭,一個(gè)分區(qū)只能由同一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者消費(fèi);消費(fèi)者組之間相互獨(dú)立宴霸,互不影響囱晴。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是一個(gè)邏輯上的訂閱者瓢谢。
4)broker:一臺(tái)服務(wù)器就是一個(gè)broker畸写,一個(gè)集群由多個(gè)broker組成,一個(gè)broker可以有多個(gè)topic氓扛。
5)topic:可以理解為一個(gè)隊(duì)列枯芬,所有的生產(chǎn)者和消費(fèi)者都是面向topic的。
6)partition:分區(qū)采郎,kafka中的topic為了提高拓展性和實(shí)現(xiàn)高可用而將它分布到不同的broker中千所,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition都是有序的蒜埋,即消息發(fā)送到隊(duì)列的順序跟消費(fèi)時(shí)拉取到的順序是一致的淫痰。
7)replication:副本。一個(gè)topic對(duì)應(yīng)的分區(qū)partition可以有多個(gè)副本整份,多個(gè)副本中只有一個(gè)為leader待错,其余的為follower。為了保證數(shù)據(jù)的高可用性皂林,leader和follower會(huì)盡量均勻的分布在各個(gè)broker中朗鸠,避免了leader所在的服務(wù)器宕機(jī)而導(dǎo)致topic不可用的問(wèn)題。
8)leader:多個(gè)副本的主副本础倍,生產(chǎn)者發(fā)送的數(shù)據(jù)和消費(fèi)者消費(fèi)的數(shù)據(jù)都是通過(guò)leader進(jìn)行處理的掏击。
9)follower:多個(gè)副本中除了leader副本均唉,其余的均為follower副本,也即從副本。從副本不會(huì)和生產(chǎn)者和消費(fèi)者提供服務(wù)替梨,而是實(shí)時(shí)同步主副本的數(shù)據(jù)。當(dāng)主副本宕機(jī)后雀监,通過(guò)一定算法選舉出新的從副本成為主副本寺枉,繼續(xù)為生產(chǎn)者和消費(fèi)者提供服務(wù)。
(三)胳搞、kafka常用命令行操作
1卸例、主題相關(guān)
| 參數(shù) | 描述 |
| --bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)称杨。 |
| --topic <String: topic> | 操作的 topic 名稱。 |
| --create | 創(chuàng)建主題筷转。 |
| --delete | 刪除主題姑原。 |
| --alter | 修改主題。 |
| --list | 查看所有主題呜舒。 |
| --describe | 查看主題詳細(xì)描述锭汛。 |
| --partitions <Integer: # of partitions> | 設(shè)置分區(qū)數(shù)。 |
| --replication-factor<Integer: replication factor> | 設(shè)置分區(qū)副本袭蝗。 |
| --config <String: name=value> | 更新系統(tǒng)默認(rèn)的配置唤殴。 |
2、生產(chǎn)者相關(guān)
| **參數(shù) ** | 描述 |
| --bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)到腥。 |
| --topic <String: topic> | 操作的 topic 名稱朵逝。 |
3、消費(fèi)者相關(guān)
| **參數(shù) ** | 描述 |
| --bootstrap-server <String: server toconnect to> | 連接的 Kafka Broker 主機(jī)名稱和端口號(hào)左电。 |
| --topic <String: topic> | 操作的 topic 名稱廉侧。 |
| --from-beginning | 從頭開始消費(fèi)。 |
| --group <String: consumer group id> | 指定消費(fèi)者組名稱篓足。 |
二段誊、生產(chǎn)者
(一)、重要參數(shù)列表
|
參數(shù)名稱
| 描述 |
|
bootstrap.servers
| 生產(chǎn)者連接集群所需的 broker 地 址 清 單 栈拖×幔可以設(shè)置 1 個(gè)或者多個(gè),中間用逗號(hào)隔開涩哟。注意這里并非需要所有的 broker 地址索赏,因?yàn)樯a(chǎn)者從給定的 broker里查找到其他 broker 信息。 |
|
key.serializer 和 value.serializer
| 指定發(fā)送消息的 key 和 value 的序列化類型贴彼。一定要寫全類名潜腻。 |
|
buffer.memory
| RecordAccumulator 緩沖區(qū)總大小,默認(rèn) 32m器仗。 |
|
batch.size
| 緩沖區(qū)一批數(shù)據(jù)最大值融涣,默認(rèn) 16k。適當(dāng)增加該值精钮,可以提高吞吐量威鹿,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加轨香。 |
|
linger.ms
| 如果數(shù)據(jù)遲遲未達(dá)到 batch.size忽你,sender 等待 linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位 ms臂容,默認(rèn)值是 0ms科雳,表示沒(méi)有延遲根蟹。生產(chǎn)環(huán)境建議該值大小為 5-100ms 之間。 |
|
acks
|
- 0:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù)炸渡,不需要等數(shù)據(jù)落盤應(yīng)答娜亿。
- 1:生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader 收到數(shù)據(jù)后應(yīng)答蚌堵。
- -1(all):生產(chǎn)者發(fā)送過(guò)來(lái)的數(shù)據(jù),Leader+和 isr 隊(duì)列里面的所有節(jié)點(diǎn)收齊數(shù)據(jù)后應(yīng)答沛婴。默認(rèn)值是-1吼畏,-1 和all 是等價(jià)的。
|
| max.in.flight.requests.per.connection |
允許最多沒(méi)有返回 ack 的次數(shù)嘁灯,默認(rèn)為 5泻蚊,開啟冪等性要保證該值是 1-5 的數(shù)字。
|
|
retries
|
當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候丑婿,系統(tǒng)會(huì)重發(fā)消息性雄。retries表示重試次數(shù)。默認(rèn)是 int 最大值羹奉,2147483647秒旋。如果設(shè)置了重試,還想保證消息的有序性诀拭,需要設(shè)置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否則在重試此失敗消息的時(shí)候迁筛,其他的消息可能發(fā)送成功了。
|
|
retry.backoff.ms
|
兩次重試之間的時(shí)間間隔耕挨,默認(rèn)是 100ms细卧。
|
|
enable.idempotence
|
- 是否開啟冪等性,默認(rèn) true筒占,開啟冪等性贪庙。*
|
|
compression.type
|
- 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是 none翰苫,也就是不壓縮止邮。支持壓縮類型:none、gzip革骨、snappy农尖、lz4 和 zstd。*
|
(二)良哲、發(fā)送流程以及發(fā)送API
1盛卡、發(fā)送流程
(三)、分區(qū)
1筑凫、分區(qū)的好處
- 便于合理使用存儲(chǔ)資源滑沧。每一個(gè)partition存儲(chǔ)在不同的broker上面并村,可以把海量的數(shù)據(jù)切割成更小數(shù)據(jù)存儲(chǔ)在不同的broker上。合理是分配分區(qū)的任務(wù)滓技,可以實(shí)現(xiàn)負(fù)載均衡效果哩牍,避免單機(jī)數(shù)據(jù)量太大而導(dǎo)致的壓力。
-
提高并行度令漂。生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù)膝昆;消費(fèi)者可以以分區(qū)為單位消費(fèi)數(shù)據(jù)。
2叠必、分區(qū)策略
1)荚孵、默認(rèn)分區(qū)策略:DefaultPartitioner
- 若生產(chǎn)者發(fā)送數(shù)據(jù)時(shí)指明partition,那么就會(huì)將該數(shù)據(jù)發(fā)送指定的分區(qū)纬朝,比如partition=0收叶,則數(shù)據(jù)發(fā)送到第零個(gè)分區(qū)上
- 若沒(méi)有指明partition,但是有key的情況下共苛,用key的hashcode值與該topic對(duì)應(yīng)的partition的數(shù)量進(jìn)行取余得到partition判没,然后將數(shù)據(jù)發(fā)送到該分區(qū)上。比如:key的hashcode值為5隅茎,topic對(duì)應(yīng)的分區(qū)數(shù)為3澄峰,那么 5 % 3 = 2,所以該數(shù)據(jù)應(yīng)該發(fā)送到第二個(gè)分區(qū)患膛。
- 既沒(méi)有指定partition和key摊阀,kafka采用粘性分區(qū)器,會(huì)隨機(jī)選擇一個(gè)分區(qū)踪蹬,然后盡可能使用該分區(qū)胞此,指導(dǎo)該分區(qū)batch-size滿了或已完成,然后再隨機(jī)尋找另一個(gè)分區(qū)(與當(dāng)前的分區(qū)不一樣)跃捣。比如:這次隨機(jī)到分區(qū)0漱牵,等當(dāng)前批次滿了或者linger.ms時(shí)間已到,kafka再隨機(jī)一個(gè)分區(qū)疚漆,如果仍然隨機(jī)到分區(qū)0則繼續(xù)隨機(jī)酣胀。
2)自定義分區(qū)器
在實(shí)際的企業(yè)應(yīng)用中,可能會(huì)有不用的場(chǎng)景娶聘,默認(rèn)的分區(qū)器無(wú)法滿足需求闻镶,那么就需要自定也分區(qū)器來(lái)滿足需求,比如某個(gè)分區(qū)的服務(wù)器性能比較好丸升,另一個(gè)是比較久的服務(wù)器铆农,性能相對(duì)差一點(diǎn),那么就需要通過(guò)自定義分區(qū)器讓更多的數(shù)據(jù)向性能更好的分區(qū)傾斜狡耻。
自定義分區(qū)器的實(shí)現(xiàn):第一墩剖,定義類實(shí)現(xiàn) Partitioner 接口猴凹。第二,重寫partition()方法岭皂。第三郊霎,在生產(chǎn)者配置中指定自定義的分區(qū)器。
(四)爷绘、提高生產(chǎn)者的吞吐量
要想更好的提高生產(chǎn)者的吞吐量书劝,則必須先了解生產(chǎn)者發(fā)送數(shù)據(jù)的流程,具體流程可以看靠(一)的介紹揉阎。以下是提高生產(chǎn)者的吞吐量的建議庄撮。
修改batch.size的大小。batch.size默認(rèn)大小為16k毙籽,提高批次大小可以一定程度提高吞吐量。這就好比用小卡車?yán)浳锖陀么罂ㄜ嚴(yán)浳镎鼻欤】ㄜ噯挝粫r(shí)間內(nèi)不能一次拉完貨物坑赡,那么來(lái)回就需要消耗額外時(shí)間,而大卡車一次性把貨物拉走么抗,那么就節(jié)省了來(lái)回的時(shí)間毅否。
修改linger.ms的時(shí)間,默認(rèn)為0蝇刀,即直接發(fā)送數(shù)據(jù)螟加,一般設(shè)置為5-100ms,通過(guò)設(shè)置延遲時(shí)間一次性可以發(fā)送更多的數(shù)據(jù)吞琐。
修改compression.type的數(shù)據(jù)壓縮類型捆探,默認(rèn)snappy。根據(jù)不同的業(yè)務(wù)場(chǎng)景選擇不同的數(shù)據(jù)壓縮方法站粟,提高數(shù)據(jù)壓縮率黍图。kafka提供的壓縮類型有:gzip,snappy,lz4,zstd。
修改RecordAccumulator緩沖區(qū)的大小奴烙。默認(rèn)32m助被,增加緩沖區(qū)大小可以那么每個(gè)batch.size的值就更大,每次發(fā)送的數(shù)據(jù)更多切诀。
(五)揩环、數(shù)據(jù)可靠性
數(shù)據(jù)的可靠性是指producer發(fā)送數(shù)據(jù)到kafka收到應(yīng)答后,該數(shù)據(jù)都能成功落盤幅虑,那么這次發(fā)送是可靠的丰滑。但是,為了適應(yīng)不用的應(yīng)用場(chǎng)景以及實(shí)現(xiàn)高可用翘单,kafka會(huì)將數(shù)據(jù)備份到不同的副本當(dāng)中吨枉,在數(shù)據(jù)同步的過(guò)程中如果出現(xiàn)的故障蹦渣,那么就有可能出現(xiàn)數(shù)據(jù)丟失,重復(fù)情況貌亭。想要弄清楚kafka數(shù)據(jù)可靠性柬唯,就必須先要了解kafka中ACK的應(yīng)答原理。
1圃庭、ACK應(yīng)答原理
ACK應(yīng)答是指在leader分區(qū)接收到生產(chǎn)者的數(shù)據(jù)后锄奢,何時(shí)對(duì)生產(chǎn)者做出應(yīng)答的策略。ACK可選的值有0剧腻,1拘央,-1三個(gè),可以在生產(chǎn)者的配置項(xiàng) acks 中設(shè)置书在,ACK設(shè)置不同灰伟,對(duì)生產(chǎn)者做出應(yīng)答的時(shí)機(jī)也不同。
ACK=0:可靠性級(jí)別最低儒旬。leader收到生產(chǎn)者數(shù)據(jù)后不需要等數(shù)據(jù)落盤栏账,立即對(duì)生產(chǎn)者做出應(yīng)答。生產(chǎn)者收到應(yīng)答后認(rèn)為leader已成功接收數(shù)據(jù)栈源,因此不需要再發(fā)當(dāng)前數(shù)據(jù)了挡爵。但是,如果leader在將內(nèi)存中的數(shù)據(jù)落盤時(shí)突然出現(xiàn)故障甚垦,那么這條數(shù)據(jù)因?yàn)闆](méi)有保存到磁盤中而導(dǎo)致數(shù)據(jù)的丟失茶鹃。
ACK=1:可靠性級(jí)別較高。leader收到生產(chǎn)者的數(shù)據(jù)并將數(shù)據(jù)落盤后艰亮,對(duì)生產(chǎn)者做出應(yīng)答闭翩。生產(chǎn)者收到應(yīng)答后繼續(xù)發(fā)送其他數(shù)據(jù)。如果leader做出應(yīng)答并且follower未同步到該數(shù)據(jù)時(shí)垃杖,leader出現(xiàn)故障男杈。kafka會(huì)重新在follower中選出新的leader,而新的leader心有同步到數(shù)據(jù)调俘,生產(chǎn)者也不會(huì)再發(fā)該數(shù)據(jù)伶棒,因此導(dǎo)致該數(shù)據(jù)的丟失。
ACK=-1(all):可靠性級(jí)別最高彩库,kafka的acks默認(rèn)值肤无。leader收到數(shù)據(jù)并落盤,并且確認(rèn)所有follower收到數(shù)據(jù)后再給生產(chǎn)者應(yīng)答骇钦。此時(shí)宛渐,所有分區(qū)副本都有該數(shù)據(jù)了,即使任意分區(qū)出現(xiàn)故障數(shù)據(jù)仍然是完整的。
(六)窥翩、數(shù)據(jù)去重
1业岁、數(shù)據(jù)重復(fù)原因
在使用kafka時(shí)為了保證數(shù)據(jù)的高可靠性,我們一般都會(huì)將應(yīng)答級(jí)別設(shè)置為-1(all)寇蚊,即leader的ISR列表的follower均收到數(shù)據(jù)后再應(yīng)答笔时。非常不幸的是,leader在收到所有的follower的確認(rèn)后發(fā)生故障仗岸,所有的分區(qū)均已保存到磁盤中允耿,但是生產(chǎn)者沒(méi)有收到應(yīng)答,認(rèn)為leader沒(méi)有收到生產(chǎn)者發(fā)送請(qǐng)求扒怖,于是嘗試重新發(fā)送請(qǐng)求较锡。由于leader發(fā)生故障,kafka重新選舉leader盗痒,生產(chǎn)者將數(shù)據(jù)再一次發(fā)送到新的leader上蚂蕴,所以造成的數(shù)據(jù)重復(fù)。
2俯邓、數(shù)據(jù)去重
kafka 0.11之后引入了冪等性和事務(wù)兩大特性掂墓。利用這兩個(gè)特性可解決數(shù)據(jù)重復(fù)的問(wèn)題。
1)數(shù)據(jù)傳遞語(yǔ)義
- 至少一次(At Least Once):ACK級(jí)別為-1 + 分區(qū)副本大于等于2 + ISR中應(yīng)答的最小副本數(shù)量大于等于2看成;
- 至多一次(At Most Once):ACK應(yīng)答級(jí)別為 0;
At Least Once可以保證數(shù)據(jù)不丟失跨嘉,但不能保證數(shù)據(jù)重復(fù)川慌。
At Most Once 可以保證數(shù)據(jù)不重復(fù),但不能保證數(shù)據(jù)不丟失祠乃。
2)冪等性的原理
冪等性:是指無(wú)論producer發(fā)送多少條重復(fù)的數(shù)據(jù)梦重,broker端都只會(huì)持久化一條數(shù)據(jù),保證了數(shù)據(jù)不重復(fù)亮瓷。
數(shù)據(jù)重復(fù)的判斷依據(jù):具有<PID, Partition, SeqNumber>相同主鍵的消息提交時(shí)琴拧,Broker只會(huì)持久化一條。其中PID是Kafka每次重啟都會(huì)分配一個(gè)新的嘱支;Partition 表示分區(qū)號(hào)蚓胸;Sequence Number是單調(diào)自增的。
從數(shù)據(jù)重復(fù)判斷依據(jù)來(lái)看除师,冪等性只能保證單分區(qū)會(huì)話內(nèi)數(shù)據(jù)不重復(fù)沛膳。
開啟冪等性:enable.idempotence = true,默認(rèn)為true。
3)事務(wù)
kafka的事務(wù)需要跟冪等性配合起來(lái)使用汛聚。開啟事務(wù)就必須開始冪等性锹安。
(七)、數(shù)據(jù)亂序和有序
1、數(shù)據(jù)亂序
kafka的producer客戶端在向broker發(fā)送數(shù)據(jù)時(shí)并不是直接將數(shù)據(jù)發(fā)送出去叹哭,而是將數(shù)據(jù)先緩存到本地的雙端緩存隊(duì)列中忍宋,sender線程會(huì)不斷地檢測(cè)緩存隊(duì)列中地?cái)?shù)據(jù),若隊(duì)列中地?cái)?shù)據(jù)達(dá)到了設(shè)置的(batch.size)值地容量或者達(dá)到一定的時(shí)間(linger.ms)风罩,sender就會(huì)創(chuàng)建一個(gè)InFlightRequests線程糠排,該線程負(fù)責(zé)將數(shù)據(jù)發(fā)送到broker上。kafka 默認(rèn)的InFlightRequests是5個(gè)泊交,InFlightRequests線程發(fā)送數(shù)據(jù)后不需要broker的應(yīng)答就可以發(fā)下一個(gè)數(shù)據(jù)乳讥,因此最多可以一起發(fā)5個(gè)請(qǐng)求。比如廓俭,需要發(fā)送0云石,1,2研乒,3汹忠,4這五個(gè)數(shù)據(jù)依次被發(fā)送出去,但是2這個(gè)數(shù)據(jù)沒(méi)有接收成功雹熬,客戶端則重新2的數(shù)據(jù)宽菜,此時(shí)broker接收到的數(shù)據(jù)的順序就變?yōu)榱?1342,而非01234竿报。
2铅乡、數(shù)據(jù)有序
1)kafka在1.x之前保證數(shù)據(jù)單分區(qū)有序,需要將InFlightRequests線程數(shù)設(shè)置為1個(gè)(****max.in.flight.requests.per.connection=1)烈菌。當(dāng)線程數(shù)為1時(shí)就能保證broker收到數(shù)據(jù)確認(rèn)后再發(fā)下一條數(shù)據(jù)阵幸。**
***** 2)kafka在1.x以后在未開啟冪等性的情況下,處理流程跟1.x以前的版本一樣芽世。*****
***** 3)1.x以后在開啟冪等性的情況下挚赊,可以將max.in.flight.requests.per.connection設(shè)置為小于等于5。原因是济瓢,Kafka服務(wù)端會(huì)緩存最近發(fā)過(guò)來(lái)的元數(shù)據(jù)荠割,等緩存滿了5個(gè)后就會(huì)對(duì)這些元數(shù)據(jù)進(jìn)行排序,這樣就可以保證數(shù)據(jù)有序了旺矾。*****
三蔑鹦、broker
(一)、工作流程
1宠漩、重要參數(shù)
| **參數(shù)名稱 ** | 描述 |
| replica.lag.time.max.ms | ISR 中举反,如果 Follower 長(zhǎng)時(shí)間未向 Leader 發(fā)送通信請(qǐng)求或同步數(shù)據(jù),則該 Follower 將被踢出 ISR扒吁。該時(shí)間閾值火鼻,默認(rèn) 30s室囊。 |
| auto.leader.rebalance.enable | 默認(rèn)是 true。 自動(dòng) Leader Partition 平衡魁索。 |
| leader.imbalance.per.broker.percentage | 默認(rèn)是 10%融撞。每個(gè) broker 允許的不平衡的 leader的比率。如果每個(gè) broker 超過(guò)了這個(gè)值粗蔚,控制器會(huì)觸發(fā) leader 的平衡尝偎。 |
| leader.imbalance.check.interval.seconds | 默認(rèn)值 300 秒。檢查 leader 負(fù)載是否平衡的間隔時(shí)間鹏控。 |
| log.segment.bytes | Kafka 中 log 日志是分成一塊塊存儲(chǔ)的致扯,此配置是指 log 日志劃分 成塊的大小,默認(rèn)值 1G当辐。 |
| log.index.interval.bytes | 默認(rèn) 4kb抖僵,kafka 里面每當(dāng)寫入了 4kb 大小的日志(.log),然后就往 index 文件里面記錄一個(gè)索引缘揪。 |
2耍群、zookeeper存儲(chǔ)的kafka信息
3、工作的總流程
① broker啟動(dòng)找筝,向zookeeper注冊(cè)broker節(jié)點(diǎn)信息
② 向zookeeper注冊(cè)controller蹈垢,若注冊(cè)成功則該controller負(fù)責(zé)選舉leader,監(jiān)聽broker節(jié)點(diǎn)等工作
③ 注冊(cè)成功的controller開始監(jiān)聽brokers節(jié)點(diǎn)變化
④ 注冊(cè)成功的controller舉行l(wèi)eader選舉袖裕。按照AR列表優(yōu)先選取排在前面的節(jié)點(diǎn)曹抬,若該節(jié)點(diǎn)在ISR中存活則成為leader,否則繼續(xù)輪詢急鳄。例如:AR[0,1,2],ISR[1,2]沐祷,controller會(huì)從broker 0 開始輪詢,發(fā)現(xiàn)ISR中并沒(méi)有0攒岛,那么繼續(xù)輪詢到1,發(fā)現(xiàn)ISR有1則1成功leader胞锰。
⑤ 注冊(cè)成功的controller將選舉結(jié)果寫入到zookeeper中灾锯,同時(shí)其他controller也監(jiān)聽該信息,等leader故障后方便立即上位成為新的leader嗅榕。
⑥ 加入leader掛掉了顺饮,controller監(jiān)聽到節(jié)點(diǎn)發(fā)送變化,開始拉取ISR信息凌那,然后開始重新選擇leader兼雄。
(二)、副本
1帽蝶、基本信息
- 作用:提高數(shù)據(jù)可用性
- Kafka默認(rèn)的副本數(shù)為1赦肋,生產(chǎn)環(huán)境可設(shè)置為2,保證數(shù)據(jù)可靠性。不建議設(shè)置太多的副本佃乘,一者增加磁盤存儲(chǔ)空間囱井,二者增加副本同步時(shí)的網(wǎng)絡(luò)開銷,影響效率趣避。
- 副本分為 leader 和follower庞呕。leader只有一個(gè),其余的為follower程帕。生產(chǎn)者的數(shù)據(jù)發(fā)給的是leader住练,消費(fèi)者消費(fèi)的消息也是來(lái)自leader的。follower同步leader的消息愁拭。
- AR:分區(qū)中所有副本的總和讲逛。
- ISR:分區(qū)中能夠和leader保持同步的副本,包括leader本身敛苇。如果follower長(zhǎng)時(shí)間未向leader發(fā)送通信信息或同步請(qǐng)求妆绞,那么該follower就會(huì)從ISR中移除。leader掛了后ISR中的follower重新選舉leader枫攀。
- OSR:分區(qū)中跟leader同步信息差距較大的副本或者是掛了的副本括饶。
- AR = ISR + OSR
2、leader選舉流程
Kafka集群中有一個(gè)會(huì)被選舉為controller leader来涨,選舉方式搶占式图焰,誰(shuí)先搶占到zookeeper的節(jié)點(diǎn),誰(shuí)就能成為leader蹦掐。該controller負(fù)責(zé)Kafka集群broker的上下線技羔,topic分區(qū)副本的分配和leader選舉等工作。
3卧抗、leader和follower故障處理
LEO(log end offset):每個(gè)副本的最后一個(gè)offset藤滥,即最新offset + 1。
HW(high watermark):所有副本中最小的LEO社裆。
1)leader發(fā)生故障
leader發(fā)生故障后拙绊,kafka會(huì)從ISR中重新選舉出新的leader
為保證多副本之前的數(shù)據(jù)一致性,其余的follower會(huì)先將自己高于HW的數(shù)據(jù)截掉泳秀,然后同步新leader的數(shù)據(jù)标沪。
2) follower發(fā)生故障
follower發(fā)生故障后會(huì)被臨時(shí)踢出ISR。
此時(shí)leader和follower繼續(xù)接收數(shù)據(jù)嗜傅。
等f(wàn)ollower恢復(fù)后讀取保存在磁盤的HW金句,并將log文件中高于HW部分的數(shù)據(jù)截取丟棄,從HW位置開始同步leader的數(shù)據(jù)吕嘀。
follower同步到LEO大于等于分區(qū)的HW時(shí)违寞,kafka就將該副本重新加入到ISR中贞瞒。
4、副本分配
1)盡量將所有副本平均地分配到所有地broker上坞靶。
2)每個(gè)broker分配到地leader盡可能一樣多憔狞。
3)leader和follower盡可能地分配到不同地broker上。
5彰阴、leader分區(qū)自平衡
正常情況下瘾敢,Kafka本身會(huì)盡可能地將leader分區(qū)均勻地分布到各個(gè)機(jī)器上,這樣使得每個(gè)機(jī)器的讀寫吞吐量比較均勻尿这。但是簇抵,如果當(dāng)leader分區(qū)故障后,另一個(gè)follower就會(huì)迅速上位并且承擔(dān)之前的leader的工作射众,讀寫請(qǐng)求的壓力就會(huì)上升碟摆,造成集群負(fù)載不平衡。kafka允許的每個(gè)broker不平衡的比率默認(rèn)為10%叨橱,如果超過(guò)了這個(gè)值就會(huì)觸發(fā)leader分區(qū)平衡典蜕。
相關(guān)參數(shù):
| 參數(shù)名稱 | 描述 |
| auto.leader.rebalance.enable | 默認(rèn)是 true。 自動(dòng) Leader Partition 平衡罗洗。生產(chǎn)環(huán)境中愉舔,leader 重選舉的代價(jià)比較大,可能會(huì)帶來(lái)性能影響伙菜,建議設(shè)置為 false 關(guān)閉轩缤。 |
| leader.imbalance.per.broker.percentage | 默認(rèn)是 10%。每個(gè) broker 允許的不平衡的 leader的比率贩绕。如果每個(gè) broker 超過(guò)了這個(gè)值火的,控制器會(huì)觸發(fā) leader 的平衡。 |
| leader.imbalance.check.interval.seconds | 默認(rèn)值 300 秒淑倾。檢查 leader 負(fù)載是否平衡的間隔時(shí)間馏鹤。 |
(三)、數(shù)據(jù)存儲(chǔ)
1娇哆、存儲(chǔ)機(jī)制
Kafka中topic是邏輯概念假瞬,但是分區(qū)partition是物理概念,生產(chǎn)者發(fā)送的數(shù)據(jù)都是存儲(chǔ)在partition中的迂尝,每個(gè)partition中都有一個(gè)log文件,數(shù)據(jù)實(shí)際都是存儲(chǔ)在log文件中剪芥,生產(chǎn)者生產(chǎn)的數(shù)據(jù)都會(huì)最佳到該log文件的末端垄开。為防止log文件過(guò)大導(dǎo)致數(shù)據(jù)檢索過(guò)慢,kafka將log文件進(jìn)行切片税肪,每個(gè)片成為segment溉躲,每個(gè)segment中包括“.index”文件榜田,“.log”文件和“.timestamp”等文件。
log文件:保存實(shí)際數(shù)據(jù)的文件锻梳。
index文件:偏移量索引文件箭券,文件為segment第一個(gè)數(shù)據(jù)offset值。index文件是一個(gè)稀疏索引疑枯,每往log文件中寫入4kb的數(shù)據(jù)才向index文件中寫入一條索引辩块。
timestamp文件:時(shí)間戳索引文件,文件名為segment第一條數(shù)據(jù)的offset值荆永。
2废亭、數(shù)據(jù)文件清楚策略
Kafka提供的文件刪除策略有delete和compact兩種。
delete策略:將過(guò)期數(shù)據(jù)刪除
log.cleanup.plocy = delete 所有數(shù)據(jù)啟用刪除策略
- 基于時(shí)間刪除具钥,默認(rèn)打開豆村。以segment中所有記錄中最后記錄的時(shí)間戳作為該文件的時(shí)間戳。
- 基于大小刪除骂删,默認(rèn)關(guān)閉掌动。超過(guò)設(shè)置的所有日志總大小,刪除最早的segment宁玫。
compact日志壓縮:對(duì)于相同key的不同value值粗恢,只保留最后一個(gè)value值。壓縮后的offset可能是不連續(xù)的撬统。這種策略只適合某些特殊的場(chǎng)景适滓,比如key保存的是用戶id,value保存的用戶信息恋追,通過(guò)壓縮策略就能將舊的用戶數(shù)據(jù)刪除凭迹,只保留新的用戶信息。
(四)苦囱、高效讀寫數(shù)據(jù)
1嗅绸、Kafka是基于分布式集群,采用分區(qū)技術(shù)撕彤,數(shù)據(jù)讀寫并行度高鱼鸠。
2、讀數(shù)據(jù)采用稀疏索引羹铅,消費(fèi)者能夠快速定位消費(fèi)的數(shù)據(jù)蚀狰。
3、順序?qū)懘疟P职员,producer生產(chǎn)數(shù)據(jù)只是在log文件的末端追加數(shù)據(jù)麻蹋,因此寫磁盤的速度很快。
4焊切、Kafka以來(lái)系統(tǒng)底層的pagecache技術(shù)和零拷貝技術(shù)扮授,能夠提高磁盤數(shù)據(jù)讀寫速度芳室。
四、消費(fèi)者
(一)刹勃、重要參數(shù)
| 參數(shù)名稱 | 描述 |
| bootstrap.servers | 向 Kafka 集群建立初始連接用到的 host/port 列表堪侯。 |
| key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化類型。一定要寫全
類名荔仁。 |
| group.id | 標(biāo)記消費(fèi)者所屬的消費(fèi)者組伍宦。 |
| enable.auto.commit | 默認(rèn)值為 true,消費(fèi)者會(huì)自動(dòng)周期性地向服務(wù)器提交偏移量咕晋。 |
| auto.commit.interval.ms | 如果設(shè)置了 enable.auto.commit 的值為 true雹拄, 則該值定義了消費(fèi)者偏移量向 Kafka 提交的頻率,默認(rèn) 5s掌呜。 |
| auto.offset.reset |
當(dāng) Kafka 中沒(méi)有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在如滓玖,數(shù)據(jù)被刪除了),該如何處理质蕉?
earliest:自動(dòng)重置偏移量到最早的偏移量势篡。
latest:默認(rèn),自動(dòng)重置偏移量為最新的偏移量模暗。
none:如果消費(fèi)組原來(lái)的(previous)偏移量存在禁悠,則向消費(fèi)者拋異常。
anything:向消費(fèi)者拋異常兑宇。
|
| offsets.topic.num.partitions | __consumer_offsets 的分區(qū)數(shù)碍侦,默認(rèn)是 50 個(gè)分區(qū)。 |
| heartbeat.interval.ms | Kafka 消費(fèi)者和 coordinator 之間的心跳時(shí)間隶糕,默認(rèn) 3s瓷产。該條目的值必須小于 session.timeout.ms ,也不應(yīng)該高于session.timeout.ms 的 1/3枚驻。 |
| session.timeout.ms | Kafka 消費(fèi)者和 coordinator 之間連接超時(shí)時(shí)間濒旦,默認(rèn) 45s池磁。超過(guò)該值穆咐,該消費(fèi)者被移除,消費(fèi)者組執(zhí)行再平衡兰珍。 |
(二)锉矢、消費(fèi)方式
1梯嗽、pull(拉)模式
consumer主動(dòng)拉取服務(wù)端的數(shù)據(jù)。Kafka采用這種方式沽损。
優(yōu)點(diǎn):consumer根據(jù)自己的處理能力拉取服務(wù)端的數(shù)據(jù)灯节。
缺點(diǎn):當(dāng)服務(wù)端沒(méi)有數(shù)據(jù)時(shí),consumer仍然不斷從服務(wù)端拉取數(shù)據(jù),會(huì)消耗consumer一定的資源显晶。
2、push(推)模式
服務(wù)端主動(dòng)推消息給consumer壹士。
優(yōu)點(diǎn):服務(wù)端只要有數(shù)據(jù)是才會(huì)推數(shù)據(jù)給consumer磷雇。
缺點(diǎn):服務(wù)端發(fā)送消息的速率很難適應(yīng)所有consumer處理消息的速率。
(三)躏救、消費(fèi)者消費(fèi)流程
1唯笙、總體消費(fèi)流程
1)消費(fèi)者通過(guò)offset拉取broker中指定位置的消息。offset則保存在系統(tǒng)主題中盒使,系統(tǒng)主題保存在磁盤中崩掘,所以即使服務(wù)端出現(xiàn)故障或重啟等能夠按上次消費(fèi)的位置開始消費(fèi)。
2)一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū)的數(shù)據(jù)少办。
3)每個(gè)分區(qū)的數(shù)據(jù)只能由消費(fèi)者組的一個(gè)消費(fèi)者消費(fèi)苞慢。
2、消費(fèi)者組
1)消費(fèi)者在消費(fèi)消息時(shí)必須指定消費(fèi)者組id(group.id)英妓,具有相同的group.id組成一個(gè)消費(fèi)者組挽放。
2)消費(fèi)者組的消費(fèi)者消費(fèi)不同分區(qū)的數(shù)據(jù)。一個(gè)分區(qū)只能由一個(gè)消費(fèi)者消費(fèi)蔓纠。
3)消費(fèi)者組之間相互獨(dú)立辑畦,互不影響。
4)消費(fèi)者組的消費(fèi)者數(shù)量應(yīng)小于等于分區(qū)數(shù)量腿倚,若消費(fèi)者數(shù)量大于分區(qū)數(shù)纯出,多余的消費(fèi)者則不會(huì)消費(fèi)消息。
5)消費(fèi)者消費(fèi)消息后發(fā)送offset給服務(wù)端敷燎。
3暂筝、消費(fèi)者初始化流程
(四)、分區(qū)的分配策略以及再平衡
1懈叹、Range分區(qū)策略
首先對(duì)同一個(gè)topic的分區(qū)對(duì)分區(qū)序號(hào)進(jìn)行排序乖杠,然后消費(fèi)者按照字母排序,利用分區(qū)數(shù)除以消費(fèi)者數(shù)量得出每個(gè)消費(fèi)者平均消費(fèi)分區(qū)數(shù)澄成,若剩余多的則按順序消費(fèi)胧洒。
注意:當(dāng)topic數(shù)量比較多時(shí),使用Range分區(qū)策略會(huì)導(dǎo)致排在前面的消費(fèi)總是分配更多的消費(fèi)者墨状,造成數(shù)據(jù)傾斜卫漫。
再平衡策略:當(dāng)某個(gè)消費(fèi)者掛了,Kafka會(huì)將該消費(fèi)者的任務(wù)全部交給正常消費(fèi)的消費(fèi)者肾砂。
2列赎、RoundRobin分區(qū)策略
先進(jìn)行排序,然后消費(fèi)依次輪詢消費(fèi)分區(qū)數(shù)據(jù)镐确。
再平衡:觸發(fā)再平衡后包吝,將故障的消費(fèi)者的任務(wù)輪詢交給正常消費(fèi)的消費(fèi)者饼煞。
3、Sticky分區(qū)策略
粘性分區(qū)是 Kafka 從 0.11.x 版本開始引入這種分配策略诗越,首先會(huì)盡量均衡的放置分區(qū)到消費(fèi)者上面砖瞧,在出現(xiàn)同一消費(fèi)者組內(nèi)消費(fèi)者出現(xiàn)問(wèn)題的時(shí)候,會(huì)盡量保持原有分配的分****區(qū)不變化嚷狞。**
再平衡*****:****盡可能均衡的隨機(jī)分成 0 和 1 號(hào)分區(qū)數(shù)據(jù)块促,分別由 1 號(hào)消費(fèi)者或者 2 號(hào)消費(fèi)者消費(fèi)。*
(五)床未、offset位置
1竭翠、offset的維護(hù)
Kafka0.9版本之前,consumer默認(rèn)將offset*****保存在Zookeeper中薇搁。*
從0.9版本開始斋扰,consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的topic中,該topic為__consumer_offsets只酥。
2褥实、offset提交
分為自動(dòng)提交和手動(dòng)提交,默認(rèn)為自動(dòng)提交
1)自動(dòng)提交
enable.auto.commit:設(shè)置為true
auto.commit.interval.ms:自動(dòng)提交offset的間隔時(shí)間裂允,默認(rèn)為5s
2)手動(dòng)提交
enable.auto.commit :設(shè)置為false损离。用戶可以根據(jù)具體場(chǎng)景進(jìn)行提交,更加靈活绝编。手動(dòng)提交可分為同步提交和異步提交兩張方式僻澎。
3、指定offset消費(fèi)
當(dāng)消費(fèi)者第一次消費(fèi)或者offset信息丟失十饥,這是消費(fèi)者該如何進(jìn)行消費(fèi)呢窟勃?Kafka提供了三種方式:earliest,latest和none
earliest:自動(dòng)將offset設(shè)置為最開始的位置逗堵。
latest:自動(dòng)將offset設(shè)置為最新的offset位置秉氧。
none:若沒(méi)有找到之前的offset信息,則會(huì)拋出異常蜒秤。
4汁咏、指定時(shí)間消費(fèi)
消費(fèi)者根據(jù)指定時(shí)間(比如1天前)獲取數(shù)據(jù),通過(guò)broker的時(shí)間戳索引文件查到該時(shí)間對(duì)應(yīng)的offset作媚,然后用該offset獲取對(duì)應(yīng)的數(shù)據(jù)攘滩。
(六)、漏消費(fèi)和重復(fù)消費(fèi)
1纸泡、漏消費(fèi)
先提交offset后處理數(shù)據(jù)漂问,可能會(huì)造成漏消費(fèi)消息。一般用戶設(shè)置為手動(dòng)異步提交offset會(huì)引起這種問(wèn)題。
場(chǎng)景:消費(fèi)者offset提交方式設(shè)置為手動(dòng)異步提交蚤假,消費(fèi)者把offset提交出去栏饮,但是數(shù)據(jù)還未落盤,此時(shí)消費(fèi)者掛掉磷仰,分區(qū)已經(jīng)將最新的offset保存起來(lái)了抡爹。消費(fèi)者重啟后獲取的offset則是最新提交的offset了,造成數(shù)據(jù)遺漏處理芒划。
2、重復(fù)消費(fèi)
已經(jīng)消費(fèi)了消息欧穴,但是沒(méi)提交offset民逼。一般消費(fèi)者設(shè)置為自動(dòng)提交會(huì)有這種問(wèn)題。
場(chǎng)景:消費(fèi)者設(shè)置offset自動(dòng)提交時(shí)間為5s涮帘,在第2s時(shí)消費(fèi)者掛掉拼苍,在掛掉前已經(jīng)處理部分?jǐn)?shù)據(jù),但offset沒(méi)有提交到broker调缨。消費(fèi)者重啟后拿到的offset則為舊的offset疮鲫,之前處理過(guò)的數(shù)據(jù)又被拉取一遍,造成重復(fù)消費(fèi)數(shù)據(jù)弦叶。
(七)俊犯、消費(fèi)者事務(wù)
若要保證消費(fèi)者消費(fèi)數(shù)據(jù)時(shí)不漏消費(fèi),不重復(fù)消費(fèi)伤哺,則需要將消費(fèi)者端在消費(fèi)消息和提交offset的操作看作是依次原子操作燕侠。
(八)、數(shù)據(jù)積壓
所謂數(shù)據(jù)積壓是指broker中保存大量未消費(fèi)的消息立莉。若長(zhǎng)時(shí)間未消費(fèi)且觸發(fā)了刪除策略绢彤,那么這部分?jǐn)?shù)據(jù)就會(huì)丟失◎殉埽可能造成數(shù)據(jù)積壓的原因有消費(fèi)者的處理能力不足茫舶,分區(qū)數(shù)較少,可適當(dāng)增加分區(qū)數(shù)量刹淌,使分?jǐn)?shù)量等于消費(fèi)者數(shù)量饶氏。也可以把消費(fèi)者每次拉取數(shù)據(jù)的大小往上調(diào)。
五芦鳍、總結(jié)
Kafka憑借自身優(yōu)秀的性能和海量數(shù)據(jù)的處理能力以及數(shù)據(jù)的可靠性保證嚷往,組件在大數(shù)據(jù)領(lǐng)域占據(jù)主流地位。