1.mq原則
數(shù)據(jù)不能多抑诸,也不能少,不能多是說(shuō)消息不能重復(fù)消費(fèi)爹殊,這個(gè)我們上一節(jié)已解決蜕乡;不能少,就是說(shuō)不能丟失數(shù)據(jù)梗夸。如果mq傳遞的是非常核心的消息异希,支撐核心的業(yè)務(wù),那么這種場(chǎng)景是一定不能丟失數(shù)據(jù)的绒瘦。
2.丟失數(shù)據(jù)場(chǎng)景
丟數(shù)據(jù)一般分為兩種称簿,一種是mq把消息丟了,一種就是消費(fèi)時(shí)將消息丟了惰帽。下面從rabbitmq和kafka分別說(shuō)一下憨降,丟失數(shù)據(jù)的場(chǎng)景,
(1)rabbitmq
A:生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到rabbitmq的時(shí)候该酗,可能在傳輸過(guò)程中因?yàn)榫W(wǎng)絡(luò)等問(wèn)題而將數(shù)據(jù)弄丟了授药。
B:rabbitmq自己丟了數(shù)據(jù)
如果沒(méi)有開(kāi)啟rabbitmq的持久化士嚎,那么rabbitmq一旦重啟,那么數(shù)據(jù)就丟了悔叽。所依必須開(kāi)啟持久化將消息持久化到磁盤莱衩,這樣就算rabbitmq掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù)娇澎,一般數(shù)據(jù)不會(huì)丟失笨蚁。除非極其罕見(jiàn)的情況,rabbitmq還沒(méi)來(lái)得及持久化自己就掛了趟庄,這樣可能導(dǎo)致一部分?jǐn)?shù)據(jù)丟失括细。
C:消費(fèi)端弄丟了數(shù)據(jù)
主要是因?yàn)橄M(fèi)者消費(fèi)時(shí),剛消費(fèi)到戚啥,還沒(méi)有處理奋单,結(jié)果消費(fèi)者就掛了,這樣你重啟之后猫十,rabbitmq就認(rèn)為你已經(jīng)消費(fèi)過(guò)了览濒,然后就丟了數(shù)據(jù)。
(2)kafka
A:生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者沒(méi)有設(shè)置相應(yīng)的策略拖云,發(fā)送過(guò)程中丟失數(shù)據(jù)匾七。
B:kafka弄丟了數(shù)據(jù)
比較常見(jiàn)的一個(gè)場(chǎng)景,就是kafka的某個(gè)broker宕機(jī)了江兢,然后重新選舉partition的leader時(shí)昨忆。如果此時(shí)follower還沒(méi)來(lái)得及同步數(shù)據(jù),leader就掛了杉允,然后某個(gè)follower成為了leader邑贴,他就少了一部分?jǐn)?shù)據(jù)。
C:消費(fèi)者弄丟了數(shù)據(jù)
消費(fèi)者消費(fèi)到了這個(gè)數(shù)據(jù)叔磷,然后消費(fèi)之自動(dòng)提交了offset拢驾,讓kafka知道你已經(jīng)消費(fèi)了這個(gè)消息,當(dāng)你準(zhǔn)備處理這個(gè)消息時(shí)改基,自己掛掉了繁疤,那么這條消息就丟了。
3.如何防止消息丟失
(1)rabbitmq
A:生產(chǎn)者丟失消息
①:可以選擇使用rabbitmq提供是事物功能稠腊,就是生產(chǎn)者在發(fā)送數(shù)據(jù)之前開(kāi)啟事物架忌,然后發(fā)送消息叹放,如果消息沒(méi)有成功被rabbitmq接收到井仰,那么生產(chǎn)者會(huì)受到異常報(bào)錯(cuò)俱恶,這時(shí)就可以回滾事物,然后嘗試重新發(fā)送俐银;如果收到了消息,那么就可以提交事物荔烧。
channel.txSelect();//開(kāi)啟事物
try{
//發(fā)送消息
}catch(Exection e){
channel.txRollback()鹤竭;//回滾事物
//重新提交
}
缺點(diǎn):rabbitmq事物已開(kāi)啟景醇,就會(huì)變?yōu)橥阶枞僮鳎a(chǎn)者會(huì)阻塞等待是否發(fā)送成功吧寺,太耗性能會(huì)造成吞吐量的下降散劫。
②:可以開(kāi)啟confirm模式获搏。在生產(chǎn)者哪里設(shè)置開(kāi)啟了confirm模式之后,每次寫的消息都會(huì)分配一個(gè)唯一的id纬乍,然后如何寫入了rabbitmq之中蕾额,rabbitmq會(huì)給你回傳一個(gè)ack消息诅蝶,告訴你這個(gè)消息發(fā)送OK了调炬;如果rabbitmq沒(méi)能處理這個(gè)消息缰泡,會(huì)回調(diào)你一個(gè)nack接口棘钞,告訴你這個(gè)消息失敗了宜猜,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id绅喉,如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)叫乌。
//開(kāi)啟confirm
channel.confirm();
//發(fā)送成功回調(diào)
public void ack(String messageId){
}
// 發(fā)送失敗回調(diào)
public void nack(String messageId){
//重發(fā)該消息
}
二者不同
事務(wù)機(jī)制是同步的,你提交了一個(gè)事物之后會(huì)阻塞住憨奸,但是confirm機(jī)制是異步的,發(fā)送消息之后可以接著發(fā)送下一個(gè)消息排宰,然后rabbitmq會(huì)回調(diào)告知成功與否。
一般在生產(chǎn)者這塊避免丟失额各,都是用confirm機(jī)制国觉。
B:rabbitmq自己弄丟了數(shù)據(jù)
設(shè)置消息持久化到磁盤。設(shè)置持久化有兩個(gè)步驟:
①創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的麻诀,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
②發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2硬毕,這樣消息就會(huì)被設(shè)為持久化方式元践,此時(shí)rabbitmq就會(huì)將消息持久化到磁盤上。
必須要同時(shí)開(kāi)啟這兩個(gè)才可以童谒。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來(lái)象浑,只有消息持久化到了磁盤之后,才會(huì)通知生產(chǎn)者ack琅豆,這樣就算是在持久化之前rabbitmq掛了愉豺,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)茫因。
C:消費(fèi)者弄丟了數(shù)據(jù)
使用rabbitmq提供的ack機(jī)制蚪拦,首先關(guān)閉rabbitmq的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后节腐,在代碼里手動(dòng)調(diào)用ack外盯。這樣就可以避免消息還沒(méi)有處理完就ack摘盆。
(2)kafka
A:消費(fèi)端弄丟了數(shù)據(jù)
關(guān)閉自動(dòng)提交offset翼雀,在自己處理完畢之后手動(dòng)提交offset,這樣就不會(huì)丟失數(shù)據(jù)孩擂。
B:kafka弄丟了數(shù)據(jù)
一般要求設(shè)置4個(gè)參數(shù)來(lái)保證消息不丟失:
①給topic設(shè)置 replication.factor參數(shù):這個(gè)值必須大于1狼渊,表示要求每個(gè)partition必須至少有2個(gè)副本。
②在kafka服務(wù)端設(shè)置min.isync.replicas參數(shù):這個(gè)值必須大于1类垦,表示 要求一個(gè)leader至少感知到有至少一個(gè)follower在跟自己保持聯(lián)系正常同步數(shù)據(jù)狈邑,這樣才能保證leader掛了之后還有一個(gè)follower。
③在生產(chǎn)者端設(shè)置acks=all:表示 要求每條每條數(shù)據(jù)蚤认,必須是寫入所有replica副本之后米苹,才能認(rèn)為是寫入成功了
④在生產(chǎn)者端設(shè)置retries=MAX(很大的一個(gè)值,表示無(wú)限重試):表示 這個(gè)是要求一旦寫入事變砰琢,就無(wú)限重試
C:生產(chǎn)者弄丟了數(shù)據(jù)
如果按照上面設(shè)置了ack=all蘸嘶,則一定不會(huì)丟失數(shù)據(jù),要求是陪汽,你的leader接收到消息训唱,所有的follower都同步到了消息之后,才認(rèn)為本次寫成功了挚冤。如果沒(méi)滿足這個(gè)條件况增,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無(wú)限次训挡。
上一篇《如何保證消息不重復(fù)消費(fèi)》
下一篇《如何保證消息按順序執(zhí)行》