一孝常、kafka
1吕漂、kafka消息模型總結(jié)
發(fā)送消息到topic状共,每個topic可以分成多個Partition套耕,每個Partition對用一個消費者消費,屬于無狀態(tài)消息口芍,Partition每個消息對應(yīng)唯一的offset箍铲,通過zk保存信息,消費端維護offset鬓椭,持久化屬于日志型持久話默認七天刪除消息颠猴。
2、消費失敗處理方案(個人思考)
業(yè)務(wù)處理異常時小染,暫不提交offset翘瓮,利用數(shù)據(jù)庫(關(guān)系型或非關(guān)系型)保存失敗的消息記錄,根據(jù)失敗策略處理相應(yīng)消息裤翩。保存好記錄之后可以提交offset资盅。
失敗處理可以隔五分鐘再往對應(yīng)的消息隊列發(fā)送該消息(發(fā)送成功就次數(shù)+1,將消息id也傳入消息隊列踊赠,方便記錄失敗次數(shù))復(fù)雜情況可能需要記錄消息失敗的次數(shù)呵扛,到達一定次數(shù)后,改為手工處理
3筐带、保證不丟失消息處理
參考:http://www.reibang.com/p/7a6deaba34d2
一般是要求起碼設(shè)置如下4個參數(shù):
1今穿、給topic設(shè)置replication.factor參數(shù):
這個值必須大于1,要求每個partition必須有至少2個副本在kafka服務(wù)端
2伦籍、設(shè)置min.insync.replicas參數(shù):
這個值必須大于1蓝晒,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯(lián)系腮出,沒掉隊,這樣才能確保leader掛了還有一個follower吧在producer端
3芝薇、設(shè)置acks=all:
這個是要求每條數(shù)據(jù)胚嘲,必須是寫入所有replica之后,才能認為是寫成功了在producer端
4洛二、設(shè)置retries=MAX
(很大很大很大的一個值馋劈,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試灭红,卡在這里了我們生產(chǎn)環(huán)境就是按照上述要求配置的侣滩,這樣配置之后,至少在kafka broker端就可以保證在leader所在broker發(fā)生故障变擒,進行l(wèi)eader切換時君珠,數(shù)據(jù)不會丟失
二、RabbitMQ
1娇斑、消費失敗處理方案
(1)相關(guān)配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
retry: #拋異常會按retry策略重發(fā)(建議不在程序內(nèi)拋異常策添,記錄失敗消息,然后確認)
enabled: true #允許重發(fā)
max-attempts: 5 #重發(fā)次數(shù)
initial-interval: 30000 #重發(fā)間隔時間
acknowledge-mode: manual #不確認宕機重啟時會重新消費毫缆,確認失敗會一直重發(fā)
publisher-confirms: true # 如果消息沒有到exchange,則confirm回調(diào),ack=false唯竹,
# 如果消息到達exchange,則confirm回調(diào),ack=true
publisher-returns: true #exchange到queue成功,則不回調(diào)return
#exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
消息手動確認模式的幾點說明:
1、監(jiān)聽的方法內(nèi)部必須使用channel進行消息確認苦丁,包括消費成功或消費失敗
2浸颓、如果不手動確認,也不拋出異常旺拉,消息不會自動重新推送(包括其他消費者)产上,因為對于rabbitmq來說始終沒有接收到消息消費是否成功的確認,并且Channel是在消費端有緩存的蛾狗,沒有斷開連接
3晋涣、如果rabbitmq斷開,連接后會自動重新推送(不管是網(wǎng)絡(luò)問題還是宕機)
4沉桌、如果消費端應(yīng)用重啟柳骄,消息會自動重新推送
5压彭、如果消費端處理消息的時候宕機碍沐,消息會自動推給其他的消費者
6升略、如果監(jiān)聽消息的方法拋出異常,消息會按照listener.retry的配置進行重發(fā)蔼夜,但是重發(fā)次數(shù)完了之后還拋出異常的話松嘶,消息不會重發(fā)(也不會重發(fā)到其他消費者),只有應(yīng)用重啟后會重新推送挎扰。因為retry是消費端內(nèi)部處理的翠订,包括異常也是內(nèi)部處理,對于rabbitmq是不知道的(此場景解決方案后面有)
7遵倦、spring.rabbitmq.listener.retry配置的重發(fā)是在消費端應(yīng)用內(nèi)處理的尽超,不是rabbitqq重發(fā)
(2)方案描述
參考:https://my.oschina.net/dengfuwei/blog/1595047
消費確認機制改為manual手動確認,在消費方法中try catch中梧躺,記錄消費失敗的消息似谁,然后basicAck確認,通過自定義重試策略取出失敗的消息重新消費掠哥,失敗達到一定次數(shù)手動處理
需要注意的 basicAck 方法需要傳遞兩個參數(shù):
(1)deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后巩踏,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息续搀,這個方法攜帶了一個 delivery tag塞琼, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調(diào)遞增的正整數(shù)禁舷,delivery tag 的范圍僅限于 Channel
(2)multiple:為了減少網(wǎng)絡(luò)流量彪杉,手動確認可以被批處理,當該參數(shù)為 true 時牵咙,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息
2派近、保證消息不丟失
失敗重發(fā)參考:https://www.cnblogs.com/xujishou/p/6288623.html
1、設(shè)置消息持久化
2洁桌、利用confirm模式渴丸,發(fā)送失敗重新放送
(很多帖子說,confirm模式但是confirm回調(diào)測試沒有消息數(shù)據(jù)無法重發(fā)另凌,建議:https://www.cnblogs.com/xujishou/p/6288623.html)
3谱轨、return exchange到隊列失敗回調(diào),可以獲取到消息相關(guān)消息可重發(fā)
confirm模式 重發(fā)消息途茫,生成CorrelationData碟嘴,重新發(fā)送
private CorrelationData getCorrelationData(String exchange, String routeKey, byte[] body) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReceivedExchange(exchange);
messageProperties.setReceivedRoutingKey(routeKey);
Message message = new Message(body, messageProperties);
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(message);
return correlationData;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("confirm消息發(fā)送成功:" + cause);
} else {
String msg = new String(correlationData.getReturnedMessage().getBody());
System.out.println("confirm消息發(fā)送失敗:" + msg);
MessageProperties messageProperties = correlationData.getReturnedMessage().getMessageProperties();
rabbitTemplate.convertAndSend(messageProperties.getReceivedExchange(),
messageProperties.getReceivedRoutingKey(),
correlationData.getReturnedMessage(),
correlationData);
}
}
publisher-confirms: true # 如果消息沒有到exchange,則confirm回調(diào),ack=false囊卜,
# 如果消息到達exchange,則confirm回調(diào),ack=true
publisher-returns: true #exchange到queue成功,則不回調(diào)return
#exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
三娜扇、疑問
發(fā)送消息時,消息發(fā)送成功栅组,業(yè)務(wù)失敗雀瓢。業(yè)務(wù)成功消息發(fā)送失敗玉掸?
保證業(yè)務(wù)處理成功后發(fā)送消息刃麸,發(fā)送失敗一直重試發(fā)送消息。
四司浪、demo
kafka:https://github.com/huangxiongbiao12/kafka.git
rabbitmq:https://github.com/huangxiongbiao12/rabbitmq-demo.git