可靠消息
一殿雪,目的
系統(tǒng)間解耦,異步通知消息锋爪,削峰填谷丙曙。系統(tǒng)柔性可用(分區(qū)容錯)。
1.1其骄,何為可靠消息亏镰?
持久化,不丟失拯爽。努力送達(dá)索抓,冪等消費。CA/CP最終一致性。
1.2纸兔,可靠性責(zé)任鏈(Producer,KAFKA,Consumers)
由于Broker解耦生產(chǎn)者不知道消息是否被消費惰瓜。但能知道的是Broker是否接收了消息,是否把消息安全的存儲起來汉矿。這里存在一條責(zé)任鏈崎坊,開始于生產(chǎn)者,移動到消息系統(tǒng)洲拇,最后到達(dá)消費者奈揍。每個環(huán)節(jié)都要正確執(zhí)行,環(huán)節(jié)間的交接也要正確執(zhí)行赋续。這意味著應(yīng)用開發(fā)者要正確的流程寫程序男翰,防止丟失消息,或者濫用消息API纽乱。
本文從三個角色的職責(zé)劃分來討論如何達(dá)到可靠蛾绎。
生產(chǎn)側(cè):業(yè)務(wù)端進(jìn)行業(yè)務(wù)操作發(fā)KAFKA消息異步投遞解耦,消息發(fā)送失敗投遞補(bǔ)償端鸦列。補(bǔ)償端失敗記錄獨立錯誤日志作為憑證依據(jù)租冠。
消費側(cè):關(guān)于業(yè)務(wù)消費消息后失敗異常。倆種路線第一種捕獲業(yè)務(wù)異常并丟到補(bǔ)償端(較難實現(xiàn)薯嗤,開發(fā)測試遺漏難排查顽爹,對編碼規(guī)范要求極高)。業(yè)務(wù)自己控group id offset.雖然看起來邏輯難度大但SDK可以封裝而且口子比較少骆姐。
補(bǔ)償端:失敗直接丟KAFAK補(bǔ)償消費者刷oracle關(guān)系數(shù)據(jù)庫镜粤。定時跑批任務(wù)刷異常列表嘗試3次。失敗告警人工干預(yù)玻褪,管理后臺可以看到補(bǔ)償失敗的消息列表可以進(jìn)行人工干預(yù)肉渴。
二,KAFKA內(nèi)部的可靠性
2.1 KAFKA基本概念
2.1.1归园,Broker:消息中間件處理節(jié)點黄虱,一個Kafka節(jié)點就是一個broker稚矿,一個或者多個Broker可以組成一個Kafka集群庸诱;
2.1.2,Topic:主題是對一組消息的抽象分類晤揣,比如例如page view日志桥爽、click日志等都可以以topic的形式進(jìn)行抽象劃分類別。在物理上昧识,不同Topic的消息分開存儲钠四,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可使得數(shù)據(jù)的生產(chǎn)者或消費者不必關(guān)心數(shù)據(jù)存于何處;
2.1.3,Partition:每個主題又被分成一個或者若干個分區(qū)(Partition)缀去。每個分區(qū)在本地磁盤上對應(yīng)一個文件夾侣灶,分區(qū)命名規(guī)則為主題名稱后接“—”連接符,之后再接分區(qū)編號缕碎,分區(qū)編號從0開始至分區(qū)總數(shù)減-1褥影;
2.1.4,LogSegment:每個分區(qū)又被劃分為多個日志分段(LogSegment)組成咏雌,日志段是Kafka日志對象分片的最小單位凡怎;LogSegment算是一個邏輯概念,對應(yīng)一個具體的日志文件(“.log”的數(shù)據(jù)文件)和兩個索引文件(“.index”和“.timeindex”赊抖,分別表示偏移量索引文件和消息時間戳索引文件)組成统倒;
2.1.5,Offset:每個partition中都由一系列有序的氛雪、不可變的消息組成房匆,這些消息被順序地追加到partition中。每個消息都有一個連續(xù)的序列號稱之為offset—偏移量报亩,用于在partition內(nèi)唯一標(biāo)識消息(并不表示消息在磁盤上的物理位置)坛缕;
2.1.6,Message:消息是Kafka中存儲的最小最基本的單位捆昏,即為一個commit log赚楚,由一個固定長度的消息頭和一個可變長度的消息體組成;
三個部分保證可靠骗卜,KAFKA內(nèi)部宠页,生產(chǎn),消費
存儲的圖片資源: https://cloud.tencent.com/developer/article/1421267
2.2寇仓,kafka的內(nèi)部存儲機(jī)制
Kafka的高可靠性的保障來源于其健壯的副本(replication)策略举户。通過調(diào)節(jié)其副本相關(guān)參數(shù),可以使得Kafka在性能和可靠性之間運轉(zhuǎn)的游刃有余遍烦。Kafka從0.8.x版本開始提供partition級別的復(fù)制,replication的數(shù)量可$KAFKA_HOME/config/server.properties中配置俭嘁。
Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者通過topic向Kafka broker發(fā)送消息服猪,消費者通過topic讀取數(shù)據(jù)供填。然而topic在物理層面又能以partition為分組,一個topic可以分成若干個partition罢猪。Kafka中的消息以順序的方式存儲在文件中近她。
Kafka中的topic的partition有N個副本(replicas)。N個replicas中膳帕,其中一個replica為leader粘捎,其他都為follower, leader處理partition的所有讀寫請求,follower定期地去復(fù)制leader上的數(shù)據(jù)。
如果leader發(fā)生故障或掛掉攒磨,一個新leader被選舉并被接受客戶端的消息成功寫入泳桦。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader數(shù)據(jù)娩缰。
TOPIC-PARTITION
? ? kafka通過topic來分主題存放數(shù)據(jù)蓬痒,主題內(nèi)有分區(qū),分區(qū)可以有多個副本漆羔,分區(qū)的內(nèi)部還細(xì)分為若干個segment梧奢。
segment
在磁盤上,一個partition就是一個目錄演痒,然后每個segment由一個index文件和一個log文件組成亲轨。如下:
$ tree kafka | head -n 6
kafka
├── events-1
│ ├── 00000000003064504069.index
│ ├── 00000000003064504069.log
│ ├── 00000000003065011416.index
│ ├── 00000000003065011416.log
Segment下的log文件就是存儲消息的地方
每個消息都會包含消息體、offset鸟顺、timestamp惦蚊、key、size讯嫂、壓縮編碼器蹦锋、校驗和、消息版本號等欧芽。
在磁盤上的數(shù)據(jù)格式和producer發(fā)送到broker的數(shù)據(jù)格式一模一樣莉掂,也和consumer收到的數(shù)據(jù)格式一模一樣。由于磁盤格式與consumer以及producer的數(shù)據(jù)格式一模一樣千扔,這樣就使得Kafka可以通過零拷貝(zero-copy)技術(shù)來提高傳輸效率憎妙。
? ? 所謂的分區(qū)其實就是在kafka對應(yīng)存儲目錄下創(chuàng)建的文件夾,文件夾的名字是主題名加上分區(qū)編號曲楚,編號從0開始厘唾。
1、segment
? ? 所謂的segment其實就是在分區(qū)對應(yīng)的文件夾下產(chǎn)生的文件龙誊。
? ? 一個分區(qū)會被劃分成大小相等的若干segment抚垃,這樣一方面保證了分區(qū)的數(shù)據(jù)被劃分到多個文件中保證不會產(chǎn)生體積過大的文件;另一方面可以基于這些segment文件進(jìn)行歷史數(shù)據(jù)的刪除趟大,提高效率鹤树。
? ? 一個segment又由一個.log和一個.index文件組成。
1..log
? ? .log文件為數(shù)據(jù)文件用來存放數(shù)據(jù)分段數(shù)據(jù)护昧。
2..index
? ? .index為索引文件保存對對應(yīng)的.log文件的索引信息魂迄。
? ? 在.index文件中粗截,保存了對對應(yīng).log文件的索引信息惋耙,通過查找.index文件可以獲知每個存儲在當(dāng)前segment中的offset在.log文件中的開始位置,而每條日志有其固定格式,保存了包括offset編號绽榛、日志長度湿酸、key的長度等相關(guān)信息,通過這個固定格式中的數(shù)據(jù)可以確定出當(dāng)前offset的結(jié)束位置灭美,從而對數(shù)據(jù)進(jìn)行讀取推溃。
3.命名規(guī)則
? ? 這兩個文件的命名規(guī)則為:
? ? partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值届腐,數(shù)值大小為64位铁坎,20位數(shù)字字符長度,沒有數(shù)字用0填充犁苏。
2硬萍、讀取數(shù)據(jù)
? ? 開始讀取指定分區(qū)中某個offset對應(yīng)的數(shù)據(jù)時,先根據(jù)offset和當(dāng)前分區(qū)的所有segment的名稱做比較围详,確定出數(shù)據(jù)在哪個segment中朴乖,再查找該segment的索引文件,確定當(dāng)前offset在數(shù)據(jù)文件中的開始位置助赞,最后從該位置開始讀取數(shù)據(jù)文件买羞,在根據(jù)數(shù)據(jù)格式判斷結(jié)果,獲取完整數(shù)據(jù)雹食。
2.3畜普,多分片落盤的可靠性保證
1、AR
? ? 在Kafka中維護(hù)了一個AR列表群叶,包括所有的分區(qū)的副本漠嵌。AR又分為ISR和OSR。
? ? AR = ISR + OSR盖呼。
? ? AR儒鹿、ISR、OSR几晤、LEO约炎、HW這些信息都被保存在Zookeeper中。
1.ISR
? ? ISR中的副本都要同步leader中的數(shù)據(jù)蟹瘾,只有都同步完成了數(shù)據(jù)才認(rèn)為是成功提交了圾浅,成功提交之后才能供外界訪問。
? ? 在這個同步的過程中憾朴,數(shù)據(jù)即使已經(jīng)寫入也不能被外界訪問狸捕,這個過程是通過LEO-HW機(jī)制來實現(xiàn)的。
2.OSR
? ? OSR內(nèi)的副本是否同步了leader的數(shù)據(jù)众雷,不影響數(shù)據(jù)的提交灸拍,OSR內(nèi)的follower盡力的去同步leader做祝,可能數(shù)據(jù)版本會落后。
? ? 最開始所有的副本都在ISR中鸡岗,在kafka工作的過程中混槐,如果某個副本同步速度慢于replica.lag.time.max.ms指定的閾值,則被踢出ISR存入OSR轩性,如果后續(xù)速度恢復(fù)可以回到ISR中声登。
3.LEO
? ? LogEndOffset:分區(qū)的最新的數(shù)據(jù)的offset,當(dāng)數(shù)據(jù)寫入leader后揣苏,LEO就立即執(zhí)行該最新數(shù)據(jù)悯嗓。相當(dāng)于最新數(shù)據(jù)標(biāo)識位。
4.HW
? ? HighWatermark:只有寫入的數(shù)據(jù)被同步到所有的ISR中的副本后卸察,數(shù)據(jù)才認(rèn)為已提交绅作,HW更新到該位置,HW之前的數(shù)據(jù)才可以被消費者訪問蛾派,保證沒有同步完成的數(shù)據(jù)不會被消費者訪問到俄认。相當(dāng)于所有副本同步數(shù)據(jù)標(biāo)識位。
? ? 在leader宕機(jī)后洪乍,只能從ISR列表中選取新的leader眯杏,無論ISR中哪個副本被選為新的leader,它都知道HW之前的數(shù)據(jù),可以保證在切換了leader后,消費者可以繼續(xù)看到HW之前已經(jīng)提交的數(shù)據(jù)浆竭。
? ? 所以LEO代表已經(jīng)寫入的最新數(shù)據(jù)位置穴墅,而HW表示已經(jīng)同步完成的數(shù)據(jù)外臂,只有HW之前的數(shù)據(jù)才能被外界訪問。
5.HW截斷機(jī)制
? ? 如果leader宕機(jī),選出了新的leader,而新的leader并不能保證已經(jīng)完全同步了之前l(fā)eader的所有數(shù)據(jù)锉屈,只能保證HW之前的數(shù)據(jù)是同步過的,此時所有的follower都要將數(shù)據(jù)截斷到HW的位置垮耳,再和新的leader同步數(shù)據(jù)颈渊,來保證數(shù)據(jù)一致。
? ? 當(dāng)宕機(jī)的leader恢復(fù)终佛,發(fā)現(xiàn)新的leader中的數(shù)據(jù)和自己持有的數(shù)據(jù)不一致俊嗽,此時宕機(jī)的leader會將自己的數(shù)據(jù)截斷到宕機(jī)之前的hw位置,然后同步新leader的數(shù)據(jù)铃彰。宕機(jī)的leader活過來也像follower一樣同步數(shù)據(jù)绍豁,來保證數(shù)據(jù)的一致性。
2.4牙捉,Kafka的ack機(jī)制竹揍。
當(dāng)producer向leader發(fā)送數(shù)據(jù)時敬飒,可以通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)可靠性的級別:
? ?1(默認(rèn)):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條message。如果leader宕機(jī)了鬼佣,則會丟失數(shù)據(jù)驶拱。
? ?0:這意味著producer無需等待來自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息霜浴。這種情況下數(shù)據(jù)傳輸效率最高晶衷,但是數(shù)據(jù)可靠性確是最低的。
? ?-1:producer需要等待ISR中的所有follower都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成阴孟,可靠性最高晌纫。但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng)ISR中只有l(wèi)eader時永丝,這樣就變成了acks=1的情況锹漱。
? ?Kafka中的消息以一下方式存儲到文件中。
HW是HighWatermark的縮寫慕嚷,俗稱高水位哥牍,取一個partition對應(yīng)的ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置喝检。另外每個replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)嗅辣。對于leader新寫入的消息,consumer不能立刻消費挠说,leader會等待該消息被所有ISR中的replicas同步后更新HW澡谭,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效损俭,該消息仍然可以從新選舉的leader中獲取蛙奖。對于來自內(nèi)部broKer的讀取請求,沒有HW的限制杆兵。
LEO:LogEndOffset的縮寫雁仲,表示每個partition的log最后一條Message的位置。
當(dāng)leader掛了之后琐脏,現(xiàn)在B成為了leader伯顶,A重新恢復(fù)之后需要進(jìn)行消息的同步,如果使用追加的方式那么就會有冗余消息骆膝,所以A將自己的消息截取到HW的位置在進(jìn)行同步祭衩。
三,生產(chǎn)者
3.1阅签,生產(chǎn)者可靠性級別
在生產(chǎn)者向kafka集群發(fā)送時掐暮,數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)傳輸可能因為網(wǎng)絡(luò)延遲、Program crash等原因造成數(shù)據(jù)的丟失政钟。
????kafka為生產(chǎn)者提供了如下的三種可靠性級別路克,通過不同策略保證不同的可靠性保障樟结。
????其實此策略配置的就是leader將成功接收消息信息響應(yīng)給客戶端的時機(jī)。
????通過request.required.acks參數(shù)配置:
????1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader精算,leader收到數(shù)據(jù)后發(fā)送成功信息瓢宦,生產(chǎn)者收到后認(rèn)為發(fā)送數(shù)據(jù)成功,如果一直收不到成功消息灰羽,則生產(chǎn)者認(rèn)為發(fā)送數(shù)據(jù)失敗會自動重發(fā)數(shù)據(jù)驮履。
????當(dāng)leader宕機(jī)時,可能丟失數(shù)據(jù)廉嚼。
????0:生產(chǎn)者不停向leader發(fā)送數(shù)據(jù)玫镐,而不需要leader反饋成功消息。
????這種模式效率最高怠噪,可靠性最低恐似。可能在發(fā)送過程中丟失數(shù)據(jù)傍念,也可能在leader宕機(jī)時丟失數(shù)據(jù)矫夷。
????-1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader收到數(shù)據(jù)后要等到ISR列表中的所有副本都同步數(shù)據(jù)完成后憋槐,才向生產(chǎn)者發(fā)送成功消息双藕,如果一只收不到成功消息,則認(rèn)為發(fā)送數(shù)據(jù)失敗會自動重發(fā)數(shù)據(jù)秦陋。
????這種模式下可靠性很高蔓彩,但是當(dāng)ISR列表中只剩下leader時,當(dāng)leader宕機(jī)讓然有可能丟數(shù)據(jù)驳概。
????此時可以配置min.insync.replicas指定要求觀察ISR中至少要有指定數(shù)量的副本赤嚼,默認(rèn)該值為1,需要改為大于等于2的值
????這樣當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)給leader但是發(fā)現(xiàn)ISR中只有l(wèi)eader自己時顺又,會收到異常表明數(shù)據(jù)寫入失敗更卒,此時無法寫入數(shù)據(jù),保證了數(shù)據(jù)絕對不丟稚照。
????雖然不丟但是可能會產(chǎn)生冗余數(shù)據(jù)蹂空,例如生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader同步數(shù)據(jù)給ISR中的follower果录,同步到一半leader宕機(jī)上枕,此時選出新的leader,可能具有部分此次提交的數(shù)據(jù)弱恒,而生產(chǎn)者收到失敗消息重發(fā)數(shù)據(jù)辨萍,新的leader接受數(shù)據(jù)則數(shù)據(jù)重復(fù)了。
四返弹,消費者
4.1锈玉,最少一次消費爪飘,及應(yīng)用保證冪等性!
至多一次處理(At most once):消息絕對不會被重復(fù)投遞拉背,但是消息可能丟失
至少一次處理(At least once):消息絕對不會被丟失师崎,但是有可能重復(fù)被消費
精確的一次處理(Exactly once):消息系統(tǒng)的圣杯。所有的消息精確的被投遞一次椅棺。
“投遞”貌似不是準(zhǔn)確的語言描述犁罩,“處理”才是。無論怎么描述土陪,我們關(guān)心的是昼汗,消費者能否處理消息肴熏,以及處理的次數(shù)鬼雀。但是使用“處理”會使問題變得復(fù)雜。比如說蛙吏,消息必須投遞兩次才能被處理一次源哩。再比如,如果消費者在處理的過程中宕機(jī)鸦做,消息必須被第二次投遞(給另一個消費者)励烦。
其次,使用“處理”來表達(dá)會使得部分失斊糜铡(partial failure)變得頭疼坛掠。處理消息一般包括多個步驟。處理的開始到結(jié)束包括應(yīng)用的邏輯以及應(yīng)用與消息系統(tǒng)的通信治筒。應(yīng)用邏輯的部分失敗由應(yīng)用來處理屉栓。如果應(yīng)用處理的邏輯是事務(wù)的,結(jié)果是all or nothing, 那么應(yīng)用邏輯可以避免部分失敗耸袜。但是實際上友多,多個步驟往往涉及不同的系統(tǒng),使得事務(wù)性變得不可能堤框。如果我們考慮到通信域滥,應(yīng)用,緩存蜈抓,數(shù)據(jù)庫启绰,我們無法達(dá)到精確的一次處理(exactly-once processing).
所以,精確地一次只出現(xiàn)在如下情況中:消息的處理只包括消息系統(tǒng)本身沟使,并且消息系統(tǒng)本身的處理是事務(wù)的委可。在該限定場景下,我們可以處理消息格带,寫消息撤缴,發(fā)送消息被處理的ACK刹枉, 一切都在事務(wù)中。而這正是Kafka流能提供的屈呕。
但是微宝,如果消息處理是冪等(idempotent)的,我們就可以繞過基于事務(wù)的精確一次保證虎眨。如果消息處理是冪等的蟋软,我們可以安全的處理重復(fù)的消息。當(dāng)然嗽桩,并不是所有的消息處理都是冪等的岳守。
kafka 最多保證至少一次處理(At least once),可以保證不丟碌冶,但是可能會重復(fù)湿痢,為了解決重復(fù)需要引入唯一標(biāo)識和去重機(jī)制,kafka提供了GUID實現(xiàn)了唯一標(biāo)識扑庞,但是并沒有提供自帶的去重機(jī)制譬重,特定應(yīng)用需要基于業(yè)務(wù)規(guī)則自己去重做冪等。
五罐氨,KAFKA可靠消息實現(xiàn)