前言
??那天我和同事一起吃完晚飯回公司加班诈豌,然后就群里就有人@我說xxx商戶說收不到推送仆救,一開始覺得沒啥。我第一反應(yīng)是不是極光沒注冊(cè)上矫渔,就讓客服通知商戶彤蔽,重新登錄下試試。這邊打開極光推送的后臺(tái)進(jìn)行檢查庙洼。后面反應(yīng)收不到推送的越來越多顿痪,我就知道這事情不簡(jiǎn)單。
事故經(jīng)過
??由于大量商戶反應(yīng)收不到推送油够,我第一反應(yīng)是不是推送系統(tǒng)掛了蚁袭,導(dǎo)致沒有進(jìn)行推送。于是讓運(yùn)維老哥檢查推送系統(tǒng)各節(jié)點(diǎn)的情況叠聋,發(fā)現(xiàn)都正常撕阎。于是打開RabbitMQ的管控臺(tái)看了一下,人都蒙了碌补。已經(jīng)有幾萬條消息處于ready
狀態(tài)虏束,還有幾百條unacked
的消息。
??我以為推送服務(wù)和MQ連接斷開了厦章,導(dǎo)致無法推送消息镇匀,于是讓運(yùn)維重啟推送服務(wù),將所有的推送服務(wù)重啟完袜啃,發(fā)現(xiàn)unacked
的消息全部變成ready
汗侵,但是沒過多久又有幾百條unacked
的消息了,這個(gè)就很明顯了能消費(fèi)群发,沒有進(jìn)行ack
呀晰韵。
??當(dāng)時(shí)我以為是網(wǎng)絡(luò)問題,導(dǎo)致mq無法接收到ack
熟妓,讓運(yùn)維老哥檢查了一下雪猪,發(fā)現(xiàn)網(wǎng)絡(luò)沒問題。現(xiàn)在看是真的是傻起愈,網(wǎng)絡(luò)有問題連接都連不上只恨。由于確定的是無法ack
造成的,立馬將ack模式
由原來的manual
改成auto
緊急發(fā)布抬虽。將所有的節(jié)點(diǎn)升級(jí)好以后官觅,發(fā)現(xiàn)推送正常了。
??你以為這就結(jié)束了其實(shí)并沒有阐污,沒過多久發(fā)現(xiàn)有一臺(tái)MQ服務(wù)出現(xiàn)異常休涤,由于生產(chǎn)采用了鏡像隊(duì)列
,立即將這臺(tái)有問題的MQ從集群中移除笛辟。直接進(jìn)行重置功氨,然后加入回集群。這事情算是告一段落了隘膘。此時(shí)已經(jīng)接近24:00了疑故。
??時(shí)間來到第二天上午10:00,運(yùn)維那邊又出現(xiàn)報(bào)警了弯菊,說推送系統(tǒng)有臺(tái)機(jī)器纵势,磁盤快被寫滿了,并且占用率很高管钳。我的乖乖從昨晚到現(xiàn)在寫了快40G的日志钦铁,一看報(bào)錯(cuò)信息瞬間就明白問題出在哪里了。麻溜的把bug
修了緊急發(fā)布才漆。
吐槽一波公司的ELK牛曹,壓根就沒有收集到這個(gè)報(bào)錯(cuò)信息,導(dǎo)致我沒有及時(shí)發(fā)現(xiàn)醇滥。
事故重現(xiàn)-隊(duì)列阻塞
MQ配置
spring:
# 消息隊(duì)列
rabbitmq:
host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# 消息發(fā)送確認(rèn)
publisher-confirm-type: correlated
# 開啟發(fā)送失敗退回
publisher-returns: true
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次請(qǐng)求中預(yù)處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: manual
問題代碼
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
??看起來好像沒啥問題黎比。由于和交易系統(tǒng)約定好超营,訂單數(shù)據(jù)需要先轉(zhuǎn)換json
串,然后再使用AES
進(jìn)行加密阅虫,所以這邊需要演闭,先進(jìn)行解密然后在進(jìn)行解析。才能得到訂單數(shù)據(jù)颓帝。
??為了防止消息丟失米碰,交易系統(tǒng)做了失敗重發(fā)
機(jī)制,防止消息丟失购城,不巧的是重發(fā)的時(shí)候沒有對(duì)訂單數(shù)據(jù)進(jìn)行加密吕座。這就導(dǎo)致推送系統(tǒng),在解密的時(shí)候出異常瘪板,從而無法進(jìn)行ack
吴趴。
默默的吐槽一句:人在家中坐,鍋從天上來篷帅。
模擬推送
發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
??觀察日志發(fā)下史侣,雖然有報(bào)錯(cuò),但是還能正常進(jìn)行推送魏身。但是RabbitMQ已經(jīng)出現(xiàn)了一條unacked
的消息惊橱。
繼續(xù)發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
??這個(gè)時(shí)候你會(huì)發(fā)現(xiàn)控制臺(tái)報(bào)錯(cuò),當(dāng)然錯(cuò)誤信息是解密失敗箭昵,但是正常的消息卻沒有被消費(fèi)税朴,這個(gè)時(shí)候其實(shí)隊(duì)列已經(jīng)阻塞了。
??從RabbitMQ
管控臺(tái)也可以看到家制,剛剛發(fā)送的的3條消息處于ready
狀態(tài)正林。這個(gè)時(shí)候就如果一直有消息進(jìn)入,都會(huì)堆積在隊(duì)里里面無法被消費(fèi)颤殴。
再發(fā)送3條正常的消息
curl http://localhost:8080/sendMsg/3
分析原因
??上面說了是由于沒有進(jìn)行ack
導(dǎo)致隊(duì)里阻塞觅廓。那么問題來了,這是為什么呢涵但?其實(shí)這是RabbitMQ
的一種保護(hù)機(jī)制杈绸。防止當(dāng)消息激增的時(shí)候,海量的消息進(jìn)入consumer
而引發(fā)consumer
宕機(jī)矮瘟。
??RabbitMQ提供了一種QOS(服務(wù)質(zhì)量保證)功能瞳脓,即在非自動(dòng)確認(rèn)的消息的前提下,限制信道上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量澈侠〗俨啵可以通過設(shè)置PrefetchCount
實(shí)現(xiàn)。
??舉例說明:可以理解為在consumer
前面加了一個(gè)緩沖容器,容器能容納最大的消息數(shù)量就是PrefetchCount
烧栋。如果容器沒有滿RabbitMQ
就會(huì)將消息投遞到容器內(nèi)写妥,如果滿了就不投遞了。當(dāng)consumer
對(duì)消息進(jìn)行ack
以后就會(huì)將此消息移除劲弦,從而放入新的消息耳标。
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: manual
prefetch參數(shù)就是PrefetchCount
??通過上面的配置發(fā)現(xiàn)prefetch
我只配置了2醇坝,并且concurrency
配置的只有1邑跪,所以當(dāng)我發(fā)送了2條錯(cuò)誤消息以后,由于解密失敗這2條消息一直沒有被ack
呼猪。將緩沖區(qū)沾滿了画畅,這個(gè)時(shí)候RabbitMQ
認(rèn)為這個(gè)consumer
已經(jīng)沒有消費(fèi)能力了就不繼續(xù)給它推送消息了,所以就造成了隊(duì)列阻塞宋距。
判斷隊(duì)列是否有阻塞的風(fēng)險(xiǎn)轴踱。
??當(dāng)ack
模式為manual
,并且線上出現(xiàn)了unacked
消息谚赎,這個(gè)時(shí)候不用慌淫僻。由于QOS是限制信道channel
上的消費(fèi)者所能保持的最大未確認(rèn)的數(shù)量。所以允許出現(xiàn)unacked
的數(shù)量可以通過channelCount * prefetchCount * 節(jié)點(diǎn)數(shù)量
得出壶唤。
channlCount
就是由concurrency
,max-concurrency
決定的雳灵。
-
min
=concurrency * prefetch * 節(jié)點(diǎn)數(shù)量
-
max
=max-concurrency * prefetch * 節(jié)點(diǎn)數(shù)量
由此可以的出結(jié)論
-
unacked_msg_count
<min
隊(duì)列不會(huì)阻塞。但需要及時(shí)處理unacked
的消息闸盔。 -
unacked_msg_count
>=min
可能會(huì)出現(xiàn)堵塞悯辙。 -
unacked_msg_count
>=max
隊(duì)列一定阻塞。
這里需要好好理解一下迎吵。
處理方法
其實(shí)處理的方法很簡(jiǎn)單躲撰,將解密和解析的方法放入try catch
中就解決了這樣不管解密正常與否,消息都會(huì)被簽收击费。如果出錯(cuò)將會(huì)輸出錯(cuò)誤日志拢蛋,讓開發(fā)人員進(jìn)行處理了。
對(duì)于這個(gè)就需要有日志監(jiān)控系統(tǒng)蔫巩,來及時(shí)告警了谆棱。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
注意的點(diǎn)
??unacked
的消息在consumer
切斷連接后(重啟),會(huì)自動(dòng)回到隊(duì)頭批幌。
事故重現(xiàn)-磁盤占用飆升
??一開始我不知道代碼有問題础锐,就是以為單純的沒有進(jìn)行ack
所以將ack
模式改成auto
自動(dòng),緊急升級(jí)了荧缘,這樣不管正常與否皆警,消息都會(huì)被簽收,所以在當(dāng)時(shí)確實(shí)是解決了問題截粗。
??其實(shí)現(xiàn)在回想起來是非常危險(xiǎn)的操作的信姓,將ack
模式改成auto
自動(dòng)鸵隧,這樣會(huì)使QOS不生效。會(huì)出現(xiàn)大量消息涌入consumer
從而造成consumer
宕機(jī)意推,可以是因?yàn)楫?dāng)時(shí)在晚上豆瘫,交易比較少,并且推送系統(tǒng)有多個(gè)節(jié)點(diǎn)菊值,才沒出現(xiàn)問題外驱。
問題代碼
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// 解密和解析
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// 模擬推送
pushMsg(orderDto);
}catch (Exception e){
log.error("推送失敗-錯(cuò)誤信息:{},消息內(nèi)容:{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// 消息簽收
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
配置文件
listener:
simple:
# 消費(fèi)端最小并發(fā)數(shù)
concurrency: 1
# 消費(fèi)端最大并發(fā)數(shù)
max-concurrency: 5
# 一次處理的消息數(shù)量
prefetch: 2
# 手動(dòng)應(yīng)答
acknowledge-mode: auto
??由于當(dāng)時(shí)不知道交易系統(tǒng)的重發(fā)機(jī)制,重發(fā)時(shí)沒有對(duì)訂單數(shù)據(jù)加密的bug腻窒,所以還是會(huì)發(fā)出少量有誤的消息昵宇。
發(fā)送1條錯(cuò)誤的消息
curl http://localhost:8080/sendErrorMsg/1
原因
??RabbitMQ
消息監(jiān)聽程序異常時(shí),consumer
會(huì)向rabbitmq server
發(fā)送Basic.Reject
儿子,表示消息拒絕接受瓦哎,由于Spring
默認(rèn)requeue-rejected
配置為true
,消息會(huì)重新入隊(duì)柔逼,然后rabbitmq server
重新投遞蒋譬。就相當(dāng)于死循環(huán)了,所以控制臺(tái)在瘋狂刷錯(cuò)誤日志造成磁盤利用率飆升的原因愉适。
解決方法
??將default-requeue-rejected: false
即可犯助。
總結(jié)
- 個(gè)人建議,生產(chǎn)環(huán)境不建議使用自動(dòng)ack儡毕,這樣會(huì)QOS無法生效也切。
- 在使用手動(dòng)ack的時(shí)候,需要非常注意消息簽收腰湾。
- 其實(shí)在將有問題的MQ重置時(shí)雷恃,是將錯(cuò)誤的消息給清除才沒有問題了,相當(dāng)于是消息丟失了费坊。
try {
// 業(yè)務(wù)邏輯倒槐。
}catch (Exception e){
// 輸出錯(cuò)誤日志。
}finally {
// 消息簽收附井。
}
參考資料
代碼地址
結(jié)尾
??如果有人告訴你遇到線上事故不要慌讨越,除非是超級(jí)大佬久經(jīng)沙場(chǎng)。否則就是瞎扯淡永毅,你讓他來試試把跨,看看他會(huì)不會(huì)大腦一片空白,直冒汗沼死。
??如果覺得對(duì)你有幫助着逐,可以多多評(píng)論,多多點(diǎn)贊哦,也可以到我的主頁看看耸别,說不定有你喜歡的文章健芭,也可以隨手點(diǎn)個(gè)關(guān)注哦,謝謝秀姐。
??我是不一樣的科技宅慈迈,每天進(jìn)步一點(diǎn)點(diǎn),體驗(yàn)不一樣的生活省有。我們下期見痒留!