? ? 本文主要介紹kafka producer的可靠性,包括ack碾盟、batch沽翔、重試機(jī)制等
消息發(fā)布
KafkaProducer的send是異步發(fā)送方法,一旦消息存儲(chǔ)到發(fā)送隊(duì)列緩沖區(qū)浇坐,方法就會(huì)立即返回睬捶,實(shí)際處理消息發(fā)送的是一個(gè)后臺(tái)Sender線程。send方法返回一個(gè)Future對(duì)象(包含消息的元數(shù)據(jù)信息如分區(qū)近刘、消息存儲(chǔ)的offset)擒贸,并接收回調(diào)函數(shù)參數(shù)臀晃,當(dāng)消息ack后會(huì)調(diào)用回調(diào)函數(shù);調(diào)用Future的get方法酗宋,將阻塞直到相關(guān)聯(lián)的請(qǐng)求完成并返回該消息的元數(shù)據(jù)或在消息發(fā)送過(guò)程中拋出異常积仗。
? ? 如果是簡(jiǎn)單的阻塞調(diào)用,可以使用get方法:
????完全非阻塞的用法是使用回調(diào)參數(shù)蜕猫,在消息發(fā)送請(qǐng)求完成后調(diào)用該函數(shù):
忽略返回參數(shù)寂曹,客戶端感知不到消息發(fā)送狀態(tài):
????如果應(yīng)用和Kafka集群間的網(wǎng)絡(luò)質(zhì)量太差,那么阻塞的方式發(fā)送每條消息后需要等待較長(zhǎng)時(shí)間才能收到應(yīng)答回右。這對(duì)高并發(fā)隆圆、海量消息發(fā)送簡(jiǎn)直就是災(zāi)難,因?yàn)榈却龖?yīng)答的時(shí)間遠(yuǎn)超過(guò)消息發(fā)送時(shí)間翔烁。如果一些消息不太注重可靠性渺氧,發(fā)送失敗了只需要記錄下日志,可以用回調(diào)函數(shù)方式蹬屹。
Acker
? ? Kafka使用push模式把消息發(fā)布到broker侣背,消息后發(fā)布后,producer又是怎么確定消息已經(jīng)成功持久化慨默?這是通過(guò)acker機(jī)制實(shí)現(xiàn)的贩耐,broker反饋給客戶端消息已經(jīng)收到并寫入到日志文件(基于性能考慮,broker并沒(méi)有把數(shù)據(jù)落盤而是放到內(nèi)存)厦取。通過(guò)配置不同acks值潮太,對(duì)應(yīng)不同級(jí)別:
acks=0:消息會(huì)立即添加到socket緩沖區(qū),producer不會(huì)等待broker的任何確認(rèn)消息虾攻,就認(rèn)為消息已經(jīng)發(fā)送了铡买。這種模式下數(shù)據(jù)傳輸效率是最高的,但不能保證broker已經(jīng)收到了消息霎箍,所以數(shù)據(jù)可靠性也是最低的奇钞。
? ? acks=1:意味著leader成功把消息寫到本地日志后就反饋給producer,而不關(guān)心同步節(jié)點(diǎn)上該消息是否已寫到本地漂坏。如果leader宕機(jī)但同步節(jié)點(diǎn)還沒(méi)有及時(shí)拉取到該消息蛇券,則數(shù)據(jù)就丟失了。
? ? acks=all/-1:leader會(huì)等待ISR中所有的同步節(jié)點(diǎn)都確認(rèn)接收到了消息樊拓,才反饋給客戶端纠亚。這種可靠性是最高的,保證了只要至少有一個(gè)in-sync副本還活著筋夏,消息就不會(huì)丟失蒂胞,acks=all需要leader等待所有同步節(jié)點(diǎn)ack,這種延遲取決于最慢節(jié)點(diǎn)条篷。但是這樣也不能保證數(shù)據(jù)不丟失骗随,比如當(dāng)ISR中只有l(wèi)eader時(shí)蛤织,這樣就變成了acks=1的情況。
? ? 當(dāng)acks=all/-1時(shí)鸿染,min.insync.replicas這個(gè)參數(shù)指定了ISR中的最少副本數(shù)指蚜,默認(rèn)值為1。如果ISR中的副本數(shù)小于min.insync.replicas時(shí)涨椒,客戶端會(huì)返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required摊鸡。通過(guò)這兩個(gè)參數(shù)配合使用,能保證消息發(fā)送的可靠性蚕冬。
重試機(jī)制
? ? Kafka Producer一般有兩類錯(cuò)誤免猾,可重試錯(cuò)誤會(huì)通過(guò)重試發(fā)送消息解決,比如連接重連可解決連接錯(cuò)誤囤热、partition重新選舉leader可解決“NotLeaderForPartitionException”錯(cuò)誤猎提。Kafka Producer能配置重試次數(shù),超過(guò)重試次數(shù)還不能解決的會(huì)拋出錯(cuò)誤旁蔼。另外一類就是不能通過(guò)重試處理的錯(cuò)誤锨苏,比如,消息大小太大棺聊,這種情況下Kafka Producer會(huì)立即報(bào)錯(cuò)伞租。
? ? 如果producer發(fā)送數(shù)據(jù)給broker后,遇到的網(wǎng)絡(luò)問(wèn)題而造成通信中斷躺屁,那producer就無(wú)法判斷該條消息是否已經(jīng)提交(commit)肯夏。雖然Kafka無(wú)法確定網(wǎng)絡(luò)故障期間發(fā)生了什么经宏,但是producer可以retry多次犀暑,確保消息已經(jīng)正確傳輸?shù)絙roker中
Batch
? ? 當(dāng)多個(gè)消息被發(fā)送到同一分區(qū)時(shí),producer會(huì)嘗試將多個(gè)消息合并到一個(gè)批次烁兰,就是把多個(gè)消息打包在一起發(fā)送到 Broker耐亏。在一次請(qǐng)求中發(fā)送大批量數(shù)據(jù),提高producer和broker性能沪斟,廣泛用于大數(shù)據(jù)場(chǎng)景广辰。
batch.size:配置批量發(fā)送的最大字節(jié)數(shù),如果batch.size=0主之,會(huì)禁用batch择吊。producer是在內(nèi)存中積累數(shù)據(jù),batch size越大占用內(nèi)存越多槽奕,因會(huì)始終分配指定大小的緩沖區(qū)几睛。
linger.ms:通常是在消息的到達(dá)速度比發(fā)送速度快,producer才會(huì)把多個(gè)消息打包成一個(gè)批次粤攒,然而所森,在某些場(chǎng)景下囱持,客戶端希望降低請(qǐng)求次數(shù),這可以通過(guò)增加延遲發(fā)送功能來(lái)實(shí)現(xiàn):producer不是立即發(fā)送消息焕济,而是等待給定的延遲時(shí)間纷妆,以積累更多的消息批量發(fā)送,達(dá)到節(jié)省網(wǎng)絡(luò)資源的目的晴弃。linger.ms配置項(xiàng)就是讓Producer在發(fā)送消息前等待一定時(shí)間掩幢,以積累更多的消息打包發(fā)送,默認(rèn)配置為0
消息重復(fù)性
? ? acks=-1的情況下肝匆,消息發(fā)送到leader后 粒蜈,當(dāng)只有部分ISR副本完成了消息同步,leader此時(shí)掛掉旗国,客戶端會(huì)認(rèn)為消息發(fā)送失敗枯怖,就會(huì)重新發(fā)送數(shù)據(jù)(設(shè)置了retries),數(shù)據(jù)就可能會(huì)重復(fù)能曾。比如follower1同步了leader的消息度硝,follower2沒(méi)有同步到,leader掛掉后寿冕,producer會(huì)得到異常蕊程,認(rèn)為消息發(fā)送失敗了,而follower1被選舉為leader驼唱,producer又重新發(fā)送消息藻茂,這樣消息就重復(fù)了。
本文首發(fā)于公眾號(hào):data之道