一嚼鹉、組件
介紹一下kafka進(jìn)行數(shù)據(jù)復(fù)制時(shí)會(huì)涉及到的一些組件概念
zookeeper:維護(hù)集群信息贩汉,當(dāng)broker加入或退出時(shí),kafka通過(guò)訂閱zookeeper就能獲得通知
broker:一個(gè)獨(dú)立的kafka服務(wù)器稱為一個(gè)broker锚赤。broker接收來(lái)自生產(chǎn)者的消息匹舞,為消息設(shè)置位移,并將消息刷入到磁盤里线脚。broker并且提供消費(fèi)者服務(wù)赐稽,對(duì)讀取的分區(qū)數(shù)據(jù)提供響應(yīng)。
-
控制器/Controller:除了有一般broker的功能外浑侥,還會(huì)負(fù)責(zé)分區(qū)首領(lǐng)的選舉姊舵,使用epoch來(lái)控制“腦裂”。
集群里第一個(gè)啟動(dòng)的broker通過(guò)在Zookeeper里創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)/controller使自己成為控制器寓落,其他的broker節(jié)點(diǎn)在啟動(dòng)時(shí)也會(huì)嘗試創(chuàng)建這個(gè)節(jié)點(diǎn)括丁,但會(huì)提示失敗,因?yàn)橐呀?jīng)存在了伶选,其它broker節(jié)點(diǎn)會(huì)在Zookeeper創(chuàng)建/watcher節(jié)點(diǎn)去感知控制器的狀態(tài)史飞,當(dāng)控制器被關(guān)閉或者離開(kāi)集群了,他們會(huì)再次嘗試創(chuàng)建/controller節(jié)點(diǎn)重復(fù)同樣的操作仰税。
新選舉出來(lái)的控制器构资,會(huì)得到一個(gè)遞增的controller epoch,其它broker在得知當(dāng)前的controller epoch后陨簇,會(huì)忽略舊控制器發(fā)出的消息蚯窥,避免了腦裂的現(xiàn)象。
控制器可以進(jìn)行broker分區(qū)選舉塞帐。當(dāng)分區(qū)首領(lǐng)所在的broker離開(kāi)集群時(shí)拦赠,控制器遍歷這些分區(qū),并確定哪個(gè)副本會(huì)成為新的分區(qū)首領(lǐng)葵姥,然后向所有broker發(fā)送請(qǐng)求荷鼠,該請(qǐng)求包含誰(shuí)是新leader誰(shuí)是follower,隨后新首領(lǐng)開(kāi)始處理來(lái)自生產(chǎn)者和消費(fèi)者的請(qǐng)求榔幸,而follower開(kāi)始從leader處復(fù)制消息
-
分區(qū):kafka使用主題來(lái)組織數(shù)據(jù)允乐,每個(gè)主題被劃分為若干個(gè)分區(qū)矮嫉,每個(gè)分區(qū)可以有若干個(gè)副本,分區(qū)分配遵循同一分區(qū)副本均勻分布在不同broker上牍疏。
例如有4個(gè)broker蠢笋,創(chuàng)建一個(gè)包含10個(gè)分區(qū)的主題,復(fù)制因子設(shè)置為2鳞陨,那么總共有20個(gè)副本昨寞,可以按照如下方式分配給4個(gè)broker:
1、若未指定機(jī)架信息厦滤,隨機(jī)指定一個(gè)broker0援岩,首領(lǐng)分區(qū)0分配給broker0,首領(lǐng)分區(qū)1分配給broker1掏导,以此類推......隨后從分區(qū)首領(lǐng)后開(kāi)始享怀,依次分配跟隨者副本,例如分區(qū)0的首領(lǐng)在broker0趟咆,那么它的第一個(gè)跟隨者副本會(huì)分配給broker1......
2添瓷、若指定了機(jī)架信息,例如broker0和broker1在機(jī)架1值纱,broker2和broker3分別在不同的機(jī)架仰坦,那么分區(qū)副本需要按照broker0,broker2计雌,broker1,broker3進(jìn)行交替分配
-
副本:分為首領(lǐng)(leader)副本和跟隨者(follower)副本玫霎。
-
leader副本處理所有的寫入和訪問(wèn)請(qǐng)求凿滤,另外會(huì)通過(guò)與follower保持狀態(tài)的交互,維護(hù)一個(gè)isr列表庶近;
broker在處理請(qǐng)求時(shí)翁脆,如果收到一個(gè)包含特定分區(qū)的生產(chǎn)和讀取請(qǐng)求,但是該分區(qū)的leader副本并不在該broker上鼻种,會(huì)導(dǎo)致報(bào)錯(cuò)反番。
客戶端會(huì)采用元數(shù)據(jù)請(qǐng)求方式,服務(wù)器會(huì)給出對(duì)應(yīng)的響應(yīng)叉钥,響應(yīng)的消息會(huì)指明特定的主題罢缸,主題的分區(qū)、分區(qū)的副本以及副本leader信息投队,然后客戶端會(huì)緩存起來(lái)便于下次直接訪問(wèn)枫疆。并會(huì)時(shí)不時(shí)更新元數(shù)據(jù)信息
follower的任務(wù)是復(fù)制leader的消息,保持與leader的一致性
-
ISR機(jī)制:每個(gè)分區(qū)都有一個(gè)ISR列表敷鸦,用于維護(hù)所有的同步副本息楔。leader副本必須是同步的寝贡,follower副本要滿足兩個(gè)條件才算是同步副本:
- 定時(shí)向zk發(fā)送心跳消息,保持與zk的活躍會(huì)話
- 持續(xù)向leader副本請(qǐng)求消息值依,在允許的消息量/時(shí)間延遲范圍內(nèi)保持與leader副本的消息同步(副本LEO落后于leader LEO的時(shí)長(zhǎng)不大于replica.lag.time.max.ms參數(shù)值)
-
LEO:日志末端位移圃泡,記錄每個(gè)副本中下一條消息的偏移量
-
HW:水位值,記錄當(dāng)前topic已提交的偏移量愿险。也即消費(fèi)者能消費(fèi)到的最大偏移量
Leader Epoch
二颇蜡、消息的可靠性傳遞
-
broker有3個(gè)配置可影響消息存儲(chǔ)可靠性
復(fù)制系數(shù):主題級(jí)別的配置參數(shù)是replication.factor,broker級(jí)別可以通過(guò)default.replication.factor來(lái)配置自動(dòng)創(chuàng)建的主題拯啦;更高的復(fù)制系數(shù)可以帶來(lái)更高的可用性澡匪、可靠性,但是也需要消耗更多的存儲(chǔ)空間
-
不完全的首領(lǐng)選舉:unclean.leader.election只能在broker級(jí)別配置褒链,默認(rèn)值是enable唁情。
當(dāng)分區(qū)首領(lǐng)不可用時(shí),一個(gè)同步副本會(huì)被選為新首領(lǐng)甫匹,如果在選舉過(guò)程中沒(méi)有出現(xiàn)數(shù)據(jù)丟失甸鸟,那么這個(gè)選舉就是完全的。如果允許不同步的副本成為分區(qū)首領(lǐng)兵迅,那么需要承擔(dān)丟失數(shù)據(jù)和數(shù)據(jù)不一致的風(fēng)險(xiǎn)抢韭,如果不允許,那么就要接受較低的可靠性
最小同步副本:主題和broker級(jí)別上都可以配置參數(shù)min.insync.replicas參數(shù)恍箭,如果當(dāng)前同步副本的個(gè)數(shù)小于這個(gè)參數(shù)時(shí)刻恭,那么生產(chǎn)者將不能往主題分區(qū)寫入數(shù)據(jù),分區(qū)也變成了只讀狀態(tài)扯夭。
-
生產(chǎn)者配置
- 發(fā)送確認(rèn)配置:acks可配置3中不同的確認(rèn)模式
- acks=0:生產(chǎn)者能夠把消息發(fā)送出去鳍贾,則認(rèn)為消息已成功寫入kafka,這種配置可以得到最大的吞吐量帶寬利用率交洗,但是卻最不穩(wěn)定最有可能丟失數(shù)據(jù)
- acks=1:分區(qū)首領(lǐng)在收到數(shù)據(jù)后寫入分區(qū)數(shù)據(jù)文件時(shí)會(huì)返回確認(rèn)或者失敗的消息骑科,如果生產(chǎn)者能正確處理錯(cuò)誤消息,會(huì)重試嘗試發(fā)送消息构拳,最終消息會(huì)成功寫入到分區(qū)首領(lǐng)咆爽。這種配置方式也有造成丟失數(shù)據(jù)的風(fēng)險(xiǎn),當(dāng)消息寫入分區(qū)leader但是在follower復(fù)制時(shí)leader崩潰了
- acks=all:生產(chǎn)者在消息寫入分區(qū)首領(lǐng)和所有的副本后才確認(rèn)消息被寫入置森,這個(gè)參數(shù)會(huì)配合最小同步副本來(lái)使用斗埂,在確認(rèn)最小寫入副本數(shù)成功后就能返回繼續(xù)處理下一條消息的繼續(xù)寫入。這種配置可靠性最高凫海,但是吞吐率最低
- 配置重試次數(shù):對(duì)于可重試解決錯(cuò)誤的事件蜜笤,生產(chǎn)者可以嘗試重新發(fā)送消息;對(duì)于不可重試解決錯(cuò)誤的事件盐碱,多次重試已失去意義把兔,可以直接丟棄或保存到磁盤再后續(xù)處理沪伙。重試次數(shù)的配置主要看重試的目的是什么。
- 額外的錯(cuò)誤處理:對(duì)于重試機(jī)制不能解決的錯(cuò)誤县好,例如消息序列化失敗围橡,生產(chǎn)者重試次數(shù)達(dá)到上限,需要開(kāi)發(fā)人員自行捕獲異常并處理缕贡。
- 發(fā)送確認(rèn)配置:acks可配置3中不同的確認(rèn)模式
-
消費(fèi)者可靠性配置
-
自動(dòng)提交偏移量
- enable.auto.commit(消費(fèi)者再均衡后會(huì)有消息重復(fù)消費(fèi)的情況)
- auto.commit.interval.ms(自動(dòng)提交開(kāi)啟翁授,默認(rèn)提交間隔是5s)
-
手動(dòng)提交偏移量
enable.auto.commit參數(shù)設(shè)置為false,手動(dòng)提交偏移量分兩類
- 手動(dòng)提交當(dāng)前輪訓(xùn)的最大偏移量
- 手動(dòng)提交固定偏移量
api分同步提交和異步提交兩類
-
同步提交:提交失敗消息后阻塞晾咪,消費(fèi)者進(jìn)行自動(dòng)重試收擦,保證消息能夠最大限度地提交成功,但會(huì)降低吞吐量
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); }
-
異步提交:提交失敗后不能自動(dòng)重試谍倦,但是可以通過(guò)一個(gè)Map<TopicPartition, Integer> offsets對(duì)象來(lái)維護(hù)每個(gè)分區(qū)提交的偏移量塞赂,如果失敗的偏移量小于最后一次已提交的偏移量,則不需要重試
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*異步提交并定義回調(diào)*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
-