前言
那天我和同事一起吃完晚飯回公司加班,然后就群里就有人@我說xxx商戶說收不到推送剩失,一開始覺得沒啥屈尼。
我第一反應(yīng)是不是沒注冊(cè)上,就讓客服通知商戶拴孤,重新登錄下試試脾歧。這邊打開推送的后臺(tái)進(jìn)行檢查。后面反應(yīng)收不到推送的越來越多演熟,我就知道這事情不簡(jiǎn)單鞭执。
事故經(jīng)過
由于大量商戶反應(yīng)收不到推送司顿,我第一反應(yīng)是不是推送系統(tǒng)掛了,導(dǎo)致沒有進(jìn)行推送兄纺。于是讓運(yùn)維老哥檢查推送系統(tǒng)各節(jié)點(diǎn)的情況大溜,發(fā)現(xiàn)都正常。
于是打開[RabbitMQ]
的管控臺(tái)看了一下估脆,人都蒙了钦奋。已經(jīng)有幾萬條消息處于ready
狀態(tài),還有幾百條unacked
的消息疙赠。
我以為推送服務(wù)和MQ
連接斷開了锨苏,導(dǎo)致無法推送消息,于是讓運(yùn)維重啟推送服務(wù)棺聊,將所有的推送服務(wù)重啟完伞租,發(fā)現(xiàn)unacked
的消息全部變成ready
,但是沒過多久又有幾百條unacked
的消息了限佩,這個(gè)就很明顯了能消費(fèi)葵诈,沒有進(jìn)行ack
呀。
當(dāng)時(shí)我以為是網(wǎng)絡(luò)問題祟同,導(dǎo)致mq無法接收到ack
作喘,讓運(yùn)維老哥檢查了一下,發(fā)現(xiàn)網(wǎng)絡(luò)沒問題≡纬牵現(xiàn)在看是真的是傻泞坦,網(wǎng)絡(luò)有問題連接都連不上。由于確定的是無法ack
造成的砖顷,立馬將ack模式
由原來的manual
改成auto
緊急發(fā)布贰锁。將所有的節(jié)點(diǎn)升級(jí)好以后,發(fā)現(xiàn)推送正常了滤蝠。
你以為這就結(jié)束了其實(shí)并沒有豌熄,沒過多久發(fā)現(xiàn)有一臺(tái)MQ服務(wù)出現(xiàn)異常,由于生產(chǎn)采用了鏡像隊(duì)列
物咳,立即將這臺(tái)有問題的MQ從集群中移除锣险。直接進(jìn)行重置,然后加入回集群览闰。這事情算是告一段落了芯肤。此時(shí)已經(jīng)接近24:00了。
時(shí)間來到第二天上午10:00压鉴,運(yùn)維那邊又出現(xiàn)報(bào)警了崖咨,說推送系統(tǒng)有臺(tái)機(jī)器,磁盤快被寫滿了晴弃,并且占用率很高掩幢。
我的乖乖從昨晚到現(xiàn)在寫了快40G的日志逊拍,一看報(bào)錯(cuò)信息瞬間就明白問題出在哪里了。麻溜的把bug
修了緊急發(fā)布际邻。
“
吐槽一波公司的ELK芯丧,壓根就沒有收集到這個(gè)報(bào)錯(cuò)信息,導(dǎo)致我沒有及時(shí)發(fā)現(xiàn)世曾。
事故重現(xiàn)-隊(duì)列阻塞
MQ配置
spring:
# 消息隊(duì)列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息發(fā)送確認(rèn)
publisher-confirm-type: correlated
# 開啟發(fā)送失敗退回
publisher-returns: true
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次請(qǐng)求中預(yù)處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: manual
問題代碼
[@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
} ](http://mp.weixin.qq.com/s?__biz=MzI3ODcxMzQzMw==&mid=2247493133&idx=2&sn=19a6cbb8eff82a9bdfe00a7868723f7e&chksm=eb50633bdc27ea2d964b921b9a9bcc3ddab97cebc6ed53211cdc3b0aa15fc09902e6071fac8d&scene=21#wechat_redirect)
看起來好像沒啥問題轮听。由于和交易系統(tǒng)約定好骗露,訂單數(shù)據(jù)需要先轉(zhuǎn)換json串,然后再使用AES進(jìn)行加密血巍,所以這邊需要萧锉,先進(jìn)行解密然后在進(jìn)行解析。才能得到訂單數(shù)據(jù)述寡。
為了防止消息丟失柿隙,交易系統(tǒng)做了失敗重發(fā)機(jī)制,防止消息丟失鲫凶,不巧的是重發(fā)的時(shí)候沒有對(duì)訂單數(shù)據(jù)進(jìn)行加密禀崖。這就導(dǎo)致推送系統(tǒng),在解密的時(shí)候出異常螟炫,從而無法進(jìn)行ack波附。
“默默的吐槽一句:人在家中坐,鍋從天上來昼钻。
模擬推送
推送代碼
發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
觀察日志發(fā)下掸屡,雖然有報(bào)錯(cuò),但是還能正常進(jìn)行推送换吧。但是[RabbitMQ]已經(jīng)出現(xiàn)了一條
unacked
的消息折晦。繼續(xù)發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
這個(gè)時(shí)候你會(huì)發(fā)現(xiàn)控制臺(tái)報(bào)錯(cuò)钥星,當(dāng)然錯(cuò)誤信息是解密失敗沾瓦,但是正常的消息卻沒有被消費(fèi),這個(gè)時(shí)候其實(shí)隊(duì)列已經(jīng)阻塞了谦炒。從[RabbitMQ] 管控臺(tái)也可以看到贯莺,剛剛發(fā)送的的3條消息處于ready
狀態(tài)。這個(gè)時(shí)候就如果一直有消息進(jìn)入宁改,都會(huì)堆積在隊(duì)里里面無法被消費(fèi)缕探。
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
分析原因
上面說了是由于沒有進(jìn)行ack
導(dǎo)致隊(duì)里阻塞。那么問題來了还蹲,這是為什么呢爹耗?其實(shí)這是[RabbitMQ]
的一種保護(hù)機(jī)制耙考。防止當(dāng)消息激增的時(shí)候,海量的消息進(jìn)入consumer
而引發(fā)consumer
宕機(jī)潭兽。
RabbitMQ提供了一種QOS(服務(wù)質(zhì)量保證)功能倦始,即在非自動(dòng)確認(rèn)的消息的前提下,限制信道上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量山卦⌒兀可以通過設(shè)置PrefetchCount
實(shí)現(xiàn)。
舉例說明:可以理解為在consumer
前面加了一個(gè)緩沖容器账蓉,容器能容納最大的消息數(shù)量就是PrefetchCount
枚碗。
如果容器沒有滿[RabbitMQ]
就會(huì)將消息投遞到容器內(nèi),如果滿了就不投遞了铸本。當(dāng)consumer
對(duì)消息進(jìn)行ack
以后就會(huì)將此消息移除肮雨,從而放入新的消息。
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: manual
“
prefetch參數(shù)就是PrefetchCount
通過上面的配置發(fā)現(xiàn)prefetch
我只配置了2箱玷,并且concurrency
配置的只有1酷含,所以當(dāng)我發(fā)送了2條錯(cuò)誤消息以后,由于解密失敗這2條消息一直沒有被ack
汪茧。將緩沖區(qū)沾滿了椅亚,這個(gè)時(shí)候[RabbitMQ]
認(rèn)為這個(gè)consumer
已經(jīng)沒有消費(fèi)能力了就不繼續(xù)給它推送消息了,所以就造成了隊(duì)列阻塞舱污。
判斷隊(duì)列是否有阻塞的風(fēng)險(xiǎn)呀舔。
當(dāng)ack
模式為manual
,并且線上出現(xiàn)了unacked
消息扩灯,這個(gè)時(shí)候不用慌媚赖。由于QOS是限制信道channel
上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量。所以允許出現(xiàn)unacked
的數(shù)量可以通過channelCount * prefetchCount * 節(jié)點(diǎn)數(shù)量
得出珠插。
“
channlCount
就是由concurrency
,max-concurrency
決定的惧磺。
-
min
=concurrency * prefetch * 節(jié)點(diǎn)數(shù)量
-
max
=max-concurrency * prefetch * 節(jié)點(diǎn)數(shù)量
由此可以的出結(jié)論
-
unacked_msg_count < min
隊(duì)列不會(huì)阻塞。但需要及時(shí)處理unacked的
消息捻撑。 -
unacked_msg_count >= min
可能會(huì)出現(xiàn)堵塞磨隘。 -
unacked_msg_count >= max
隊(duì)列一定阻塞。
這里需要好好理解一下顾患。
處理方法
其實(shí)處理的方法很簡(jiǎn)單番捂,將解密和解析的方法放入[try catch
]中就解決了這樣不管解密正常與否,消息都會(huì)被簽收江解。如果出錯(cuò)將會(huì)輸出錯(cuò)誤日志设预,讓開發(fā)人員進(jìn)行處理了。
“對(duì)于這個(gè)就需要有日志監(jiān)控系統(tǒng)犁河,來及時(shí)告警了鳖枕。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
注意的點(diǎn)
unacked
的消息在consumer
切斷連接后(重啟)魄梯,會(huì)自動(dòng)回到隊(duì)頭。
事故重現(xiàn)-磁盤占用飆升
一開始我不知道代碼有問題宾符,就是以為單純的沒有進(jìn)行ack
所以將ack
模式改成auto
自動(dòng)画恰,緊急升級(jí)了,這樣不管正常與否吸奴,消息都會(huì)被簽收允扇,所以在當(dāng)時(shí)確實(shí)是解決了問題。
其實(shí)現(xiàn)在回想起來是非常危險(xiǎn)的操作的则奥,將ack
模式改成auto
自動(dòng)考润,這樣會(huì)使QOS
不生效。會(huì)出現(xiàn)大量消息涌入consumer
從而造成consumer
宕機(jī)读处,可以是因?yàn)楫?dāng)時(shí)在晚上糊治,交易比較少,并且推送系統(tǒng)有多個(gè)節(jié)點(diǎn)罚舱,才沒出現(xiàn)問題井辜。
問題代碼
[@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
} ](http://mp.weixin.qq.com/s?__biz=MzI3ODcxMzQzMw==&mid=2247494139&idx=2&sn=a33f1781bf8ee8094827118a4ffe394d&chksm=eb506ccddc27e5db0191cce12d753b683de89f2cf285410b514fed9bfa46f5a9b1f4b817968f&scene=21#wechat_redirect)
配置文件
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: auto
由于當(dāng)時(shí)不知道交易系統(tǒng)的重發(fā)機(jī)制,重發(fā)時(shí)沒有對(duì)訂單數(shù)據(jù)加密的bug管闷,所以還是會(huì)發(fā)出少量有誤的消息粥脚。
發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
原因
[RabbitMQ]
消息監(jiān)聽程序異常時(shí),consumer
會(huì)向rabbitmq server
發(fā)送Basic.Reject
包个,表示消息拒絕接受刷允,由于Spring
默認(rèn)requeue-rejected
配置為true
,消息會(huì)重新入隊(duì)碧囊,然后rabbitmq server
重新投遞树灶。就相當(dāng)于死循環(huán)了,所以控制臺(tái)在瘋狂刷錯(cuò)誤日志造成磁盤利用率飆升的原因糯而。
解決方法
將default-requeue-rejected: false
即可天通。
總結(jié)
個(gè)人建議,生產(chǎn)環(huán)境不建議使用自動(dòng)ack熄驼,這樣會(huì)QOS無法生效像寒。
在使用手動(dòng)ack的時(shí)候,需要非常注意消息簽收谜洽。
其實(shí)在將有問題的MQ重置時(shí)萝映,是將錯(cuò)誤的消息給清除才沒有問題了,相當(dāng)于是消息丟失了阐虚。
try {
// 業(yè)務(wù)邏輯。
}catch (Exception e){
// 輸出錯(cuò)誤日志蚌卤。
}finally {
// 消息簽收实束。
}
結(jié)尾
如果有人告訴你遇到線上事故不要慌奥秆,除非是超級(jí)大佬久經(jīng)沙場(chǎng)。否則就是瞎扯淡咸灿,你讓他來試試构订,看看他會(huì)不會(huì)大腦一片空白,直冒汗避矢。
如果覺得對(duì)你有幫助悼瘾,可以多多評(píng)論,多多點(diǎn)贊哦审胸,也可以隨手點(diǎn)個(gè)關(guān)注哦亥宿,謝謝。