一、Kafka中的消息是否會丟失和重復(fù)消費
要確定Kafka的消息是否丟失或重復(fù),從兩個方面分析入手:消息發(fā)送和消息消費
1、消息發(fā)送
kafka消息發(fā)送有同步(sync)逆巍、異步(async)兩種,以及三種消息確認(rèn)方式莽使。
1). sync vs async
Kafka消息發(fā)送有兩種方式:同步(sync)和異步(async)锐极,可通過默認(rèn)是同步方式producer.type屬性進(jìn)行配置。
在官方文檔Producer Configs中有如下:
Property | Default | Description |
---|---|---|
producer.type | sync | This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data. |
翻譯過來就是:
producer.type的默認(rèn)值是sync吮旅,即同步的方式溪烤。這個參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的味咳。如果設(shè)置成異步的模式庇勃,可以運行生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會極大的提高broker的性能槽驶,但是這樣會增加丟失數(shù)據(jù)的風(fēng)險责嚷。
對于異步模式,還有4個配套的參數(shù)掂铐,如下:
Property | Default | Description |
---|---|---|
queue.buffering.max.ms | 5000 | 啟用異步模式時罕拂,producer緩存消息的時間揍异。比如我們設(shè)置成1000時,它會緩存1s的數(shù)據(jù)再一次發(fā)送出去爆班,這樣可以極大的增加broker吞吐量衷掷,但也會造成時效性的降低。 |
queue.buffering.max.messages | 10000 | 啟用異步模式時柿菩,producer緩存隊列里最大緩存的消息數(shù)量戚嗅,如果超過這個值,producer就會阻塞或者丟掉消息枢舶。 |
queue.enqueue.timeout.ms | -1 | 當(dāng)達(dá)到上面參數(shù)時producer會阻塞等待的時間懦胞。如果設(shè)置為0,buffer隊列滿時producer不會阻塞凉泄,消息直接被丟掉躏尉;若設(shè)置為-1,producer會被阻塞后众,不會丟消息胀糜。 |
batch.num.messages | 200 | 啟用異步模式時,一個batch緩存的消息數(shù)量吼具。達(dá)到這個數(shù)值時僚纷,producer才會發(fā)送消息。(每次批量發(fā)送的數(shù)量) |
以batch的方式推送數(shù)據(jù)可以極大的提高處理效率拗盒,kafka producer可以將消息在內(nèi)存中累計到一定數(shù)量后作為一個batch發(fā)送請求怖竭。batch的數(shù)量大小可以通過producer的參數(shù)(batch.num.messages)控制。通過增加batch的大小陡蝇,可以減少網(wǎng)絡(luò)請求和磁盤IO的次數(shù)痊臭,當(dāng)然具體參數(shù)設(shè)置需要在效率和時效性方面做一個權(quán)衡。在比較新的版本中還有batch.size這個參數(shù)登夫。
2). acks
producers可以一步的并行向kafka發(fā)送消息广匙,但是通常producer在發(fā)送完消息之后會得到一個響應(yīng),返回的是offset值或者發(fā)送過程中遇到的錯誤恼策。這其中有個非常重要的參數(shù)“request.required.acks"鸦致,這個參數(shù)決定了producer要求leader partition收到確認(rèn)的副本個數(shù),如果acks設(shè)置為0涣楷,表示producer不會等待broker的相應(yīng)分唾,所以,producer無法知道消息是否發(fā)生成功狮斗,這樣有可能導(dǎo)致數(shù)據(jù)丟失绽乔,但同時,acks值為0會得到最大的系統(tǒng)吞吐量碳褒。若acks設(shè)置為1折砸,表示producer會在leader partition收到消息時得到broker的一個確認(rèn)看疗,這樣會有更好的可靠性,因為客戶端會等待知道broker確認(rèn)收到消息睦授。若設(shè)置為-1两芳,producer會在所有ISR副本完成同步時,得到broker的確認(rèn)去枷,這個設(shè)置可以得到最高的可靠性保證盗扇。
Kafka的消息確認(rèn)方式通過配置request.required.acks屬性配置(僅僅for sync):
Property | Default | Description |
---|---|---|
acks | 1 | 此配置是 Producer 在確認(rèn)一個請求發(fā)送完成之前需要收到的反饋信息的數(shù)量。 這個參數(shù)是為了保證發(fā)送請求的可靠性沉填。以下配置方式是允許的:acks=0 如果設(shè)置為0疗隶,則 producer 不會等待服務(wù)器的反饋。該消息會被立刻添加到 socket buffer 中并認(rèn)為已經(jīng)發(fā)送完成翼闹。在這種情況下斑鼻,服務(wù)器是否收到請求是沒法保證的,并且參數(shù)retries也不會生效(因為客戶端無法獲得失敗信息)猎荠。每個記錄返回的 offset 總是被設(shè)置為-1坚弱。 acks=1 如果設(shè)置為1,leader節(jié)點會將記錄寫入本地日志关摇,并且在所有 follower 節(jié)點反饋之前就先確認(rèn)成功荒叶。在這種情況下,如果 leader 節(jié)點在接收記錄之后输虱,并且在 follower 節(jié)點復(fù)制數(shù)據(jù)完成之前產(chǎn)生錯誤些楣,則這條記錄會丟失。acks=all 如果設(shè)置為all宪睹,這就意味著 leader 節(jié)點會等待所有同步中的副本確認(rèn)之后再確認(rèn)這條記錄是否發(fā)送完成愁茁。只要至少有一個同步副本存在,記錄就不會丟失亭病。這種方式是對請求傳遞的最有效保證鹅很。acks=-1與acks=all是等效的。 |
簡單說:
0---表示不進(jìn)行消息接收是否成功的確認(rèn)罪帖;
1---表示當(dāng)Leader接收成功時確認(rèn)促煮;
-1---表示Leader和Follower都接收成功時確認(rèn);
3)分析
下面分情況來分析消息丟失的場景:
1)在request.required.acks配置為1(只保證寫入leader成功)的話整袁,如果剛好leader partition掛了菠齿,數(shù)據(jù)就會丟失。
2)使用異步模式的時候葬项,當(dāng)緩沖區(qū)滿了泞当,如果阻塞等待的時間配置為0(還沒有收到確認(rèn)的情況下迹蛤,緩沖池一滿民珍,就清除緩沖池里的消息)襟士,數(shù)據(jù)就會被立即丟棄掉。
在數(shù)據(jù)生產(chǎn)時避免數(shù)據(jù)丟失的方法:
只要能避免上述兩種情況嚷量,那么就可以保證消息不會被丟失陋桂。
1)確認(rèn)機(jī)制設(shè)置為-1,也就是讓消息寫入leader和所有的ISR副本蝶溶。
2)還有嗜历,在異步模式下,如果消息發(fā)出去了抖所,但還沒有收到確認(rèn)的時候梨州,緩沖池滿了,在配置文件中設(shè)置成不限制阻塞超時的時間田轧,也就說讓生產(chǎn)端一直阻塞暴匠,這樣也能保證數(shù)據(jù)不會丟失。
2傻粘、消息消費
Kafka消息消費有兩個consumer接口每窖,Low-level API和High-level API:
Low-level API:消費者自己維護(hù)offset等值,可以實現(xiàn)對Kafka的完全控制弦悉;
High-level API:封裝了對parition和offset的管理窒典,使用簡單;
丟失消息的場景:
如果使用高級接口High-level API稽莉,可能存在一個問題就是當(dāng)消息消費者從集群中把消息取出來瀑志、并提交了新的消息offset值后,還沒來得及消費就掛掉了污秆,那么下次再消費時之前沒消費成功的消息就“詭異”的消失了后室;
解決辦法:
enable.auto.commit=false 關(guān)閉自動提交位移,并確認(rèn)數(shù)據(jù)被完成處理之后混狠,再更新offset值岸霹。
如果使用了storm,要開啟storm的ackfail機(jī)制将饺;如果沒有使用storm贡避,低級API中需要手動控制offset值。
3.數(shù)據(jù)重復(fù)消費
(1)去重:將消息的唯一標(biāo)識保存到外部介質(zhì)中予弧,每次消費處理時判斷是否處理過刮吧;
(2)不管:大數(shù)據(jù)場景中,報表系統(tǒng)或者日志信息丟失幾條都無所謂掖蛤,不會影響最終的統(tǒng)計分析結(jié)
二杀捻、Kafka的Leader選舉機(jī)制
Kafka將每個Topic進(jìn)行分區(qū)Patition,以提高消息的并行處理蚓庭,同時為保證高可用性致讥,每個分區(qū)都有一定數(shù)量的副本 Replica仅仆,這樣當(dāng)部分服務(wù)器不可用時副本所在服務(wù)器就可以接替上來,保證系統(tǒng)可用性垢袱。在Leader上負(fù)責(zé)讀寫墓拜,F(xiàn)ollower負(fù)責(zé)數(shù)據(jù)的同步。當(dāng)一個Leader發(fā)生故障如何從Follower中選擇新Leader呢请契?
Kafka在Zookeeper上針對每個Topic都維護(hù)了一個ISR(in-sync replica---已同步的副本)的集合咳榜,集合的增減Kafka都會更新該記錄。如果某分區(qū)的Leader不可用爽锥,Kafka就從ISR集合中選擇一個副本作為新的Leader涌韩。這樣就可以容忍的失敗數(shù)比較高,假如某Topic有N+1個副本氯夷,則可以容忍N個服務(wù)器不可用贸辈。
如果ISR中副本都不可用,有兩種處理方法:
- 等待一個 ISR 的副本重新恢復(fù)正常服務(wù)肠槽,并選擇這個副本作為領(lǐng) leader (它有極大可能擁有全部數(shù)據(jù))擎淤。
- 選擇第一個重新恢復(fù)正常服務(wù)的副本(不一定是 ISR 中的)作為leader。
附:
Unclean leader 選舉: 如果節(jié)點全掛了秸仙?
請注意嘴拢,Kafka 對于數(shù)據(jù)不會丟失的保證,是基于至少一個節(jié)點在保持同步狀態(tài)寂纪,一旦分區(qū)上的所有備份節(jié)點都掛了席吴,就無法保證了。
但是捞蛋,實際在運行的系統(tǒng)需要去考慮假設(shè)一旦所有的備份都掛了孝冒,怎么去保證數(shù)據(jù)不會丟失,這里有兩種實現(xiàn)的方法
- 等待一個 ISR 的副本重新恢復(fù)正常服務(wù)拟杉,并選擇這個副本作為領(lǐng) leader (它有極大可能擁有全部數(shù)據(jù))庄涡。
- 選擇第一個重新恢復(fù)正常服務(wù)的副本(不一定是 ISR 中的)作為leader。
這是可用性和一致性之間的簡單妥協(xié)搬设,如果我只等待 ISR 的備份節(jié)點穴店,那么只要 ISR 備份節(jié)點都掛了,我們的服務(wù)將一直會不可用拿穴,如果它們的數(shù)據(jù)損壞了或者丟失了泣洞,那就會是長久的宕機(jī)。另一方面默色,如果不是 ISR 中的節(jié)點恢復(fù)服務(wù)并且我們允許它成為 leader 球凰, 那么它的數(shù)據(jù)就是可信的來源,即使它不能保證記錄了每一個已經(jīng)提交的消息。 kafka 默認(rèn)選擇第二種策略呕诉,當(dāng)所有的 ISR 副本都掛掉時缘厢,會選擇一個可能不同步的備份作為 leader ,可以配置屬性 unclean.leader.election.enable 禁用此策略义钉,那么就會使用第 一種策略即停機(jī)時間優(yōu)于不同步。
這種困境不只有 Kafka 遇到规肴,它存在于任何 quorum-based 規(guī)則中捶闸。例如,在大多數(shù)投票算法當(dāng)中拖刃,如果大多數(shù)服務(wù)器永久性的掛了删壮,那么您要么選擇丟失100%的數(shù)據(jù),要么違背數(shù)據(jù)的一致性選擇一個存活的服務(wù)器作為數(shù)據(jù)可信的來源兑牡。
可用性和持久性保證
向 Kafka 寫數(shù)據(jù)時央碟,producers 設(shè)置 ack 是否提交完成, 0:不等待broker返回確認(rèn)消息,1: leader保存成功返回或, -1(all): 所有備份都保存成功返回.請注意. 設(shè)置 “ack = all” 并不能保證所有的副本都寫入了消息均函。默認(rèn)情況下亿虽,當(dāng) acks = all 時,只要 ISR 副本同步完成苞也,就會返回消息已經(jīng)寫入洛勉。例如,一個 topic 僅僅設(shè)置了兩個副本如迟,那么只有一個 ISR 副本收毫,那么當(dāng)設(shè)置acks = all時返回寫入成功時,剩下了的那個副本數(shù)據(jù)也可能數(shù)據(jù)沒有寫入殷勘。 盡管這確保了分區(qū)的最大可用性此再,但是對于偏好數(shù)據(jù)持久性而不是可用性的一些用戶,可能不想用這種策略玲销,因此输拇,我們提供了兩個topic 配置,可用于優(yōu)先配置消息數(shù)據(jù)持久性:
- 禁用 unclean leader 選舉機(jī)制 - 如果所有的備份節(jié)點都掛了,分區(qū)數(shù)據(jù)就會不可用贤斜,直到最近的 leader 恢復(fù)正常淳附。這種策略優(yōu)先于數(shù)據(jù)丟失的風(fēng)險, 參看上一節(jié)的 unclean leader 選舉機(jī)制蠢古。
- 指定最小的 ISR 集合大小奴曙,只有當(dāng) ISR 的大小大于最小值,分區(qū)才能接受寫入操作草讶,以防止僅寫入單個備份的消息丟失造成消息不可用的情況洽糟,這個設(shè)置只有在生產(chǎn)者使用 acks = all 的情況下才會生效,這至少保證消息被 ISR 副本寫入。此設(shè)置是一致性和可用性 之間的折衷坤溃,對于設(shè)置更大的最小ISR大小保證了更好的一致性拍霜,因為它保證將消息被寫入了更多的備份,減少了消息丟失的可能性薪介。但是祠饺,這會降低可用性,因為如果 ISR 副本的數(shù)量低于最小閾值汁政,那么分區(qū)將無法寫入道偷。