消息隊(duì)列(MQ)
相信大家對(duì)MQ
這個(gè)詞都不會(huì)陌生,不管用過(guò)還是沒(méi)用過(guò)的,大多會(huì)對(duì)他有一定的了解,
那么消息隊(duì)列有什么好處呢
- 解耦(接觸服務(wù)之間的耦合度關(guān)系)
- 削峰(例如我某個(gè)促銷活動(dòng)在某個(gè)時(shí)間點(diǎn)有非常大的流量涌入,這個(gè)時(shí)候用Mq做緩存是最好的方式了)
- 異步化(例如有些服務(wù)是我不需要在同步鏈中進(jìn)行調(diào)用的,那么可以用mq來(lái)做一個(gè)異步消費(fèi))
傳統(tǒng)MQ的缺點(diǎn)
MQ基本上和緩存一樣是居家必備之良藥。然而消息隊(duì)列雖然重要,但是同時(shí)其實(shí)是蠻重的一個(gè)組件浙垫。例如我們?cè)谟胷abbitMq的話斩箫,我們需要為它搭建一個(gè)服務(wù)端退腥,如果考慮到可用性,那么我們需要為服務(wù)端建立一個(gè)集群惕稻,同時(shí)蔗衡,我們?nèi)绻€上問(wèn)題可能還需要在Mq中做查找纤虽,那么這些工作就可能加大我們整體的工作量。
利用redis來(lái)實(shí)現(xiàn)MQ
所以就想能不能先簡(jiǎn)單的通過(guò)Redis來(lái)實(shí)現(xiàn)消息隊(duì)列呢绞惦?不考慮PubSub逼纸、分布式、持久化济蝉、事務(wù)等復(fù)雜的情況杰刽。就像JDK的各種Queue一樣。答案當(dāng)然是可以的王滤,因?yàn)镽edis提供的list數(shù)據(jù)結(jié)構(gòu)就非常適合做消息隊(duì)列贺嫂。大家可能會(huì)發(fā)現(xiàn),網(wǎng)上有很多redis的消息隊(duì)列雁乡,但是目前為止第喳,我沒(méi)有發(fā)現(xiàn)一個(gè)消息隊(duì)列是具有ack機(jī)制的。
這里我們會(huì)講述怎么利用list的api中的lpush/brpoplpush來(lái)實(shí)現(xiàn)一個(gè)具有ack機(jī)制的消息隊(duì)列
實(shí)現(xiàn)思路
初步實(shí)現(xiàn)
實(shí)現(xiàn)ack的話蔗怠,(暫時(shí)先不考慮集群版墩弯,只是單機(jī)版本)
- 我可以用lpush做生產(chǎn)者,每次有消息需要生產(chǎn)的時(shí)候寞射,就發(fā)送一個(gè)message到pending隊(duì)列中。
- brpoplpush做消費(fèi)者锌钮,每次取到消息的時(shí)候進(jìn)行業(yè)務(wù)消費(fèi)桥温。在消費(fèi)的同時(shí)吧消息放到另一個(gè)doing的隊(duì)列中
- 每次消費(fèi)者完成任務(wù),從doing隊(duì)列中刪除任務(wù)msg梁丘,用來(lái)告知這個(gè)消息被成功消費(fèi)掉了
- 然后開一個(gè)線程去定時(shí)輪詢查doing中侵浸,如果一定時(shí)間(架設(shè)我們的message實(shí)現(xiàn)了我們的協(xié)議旺韭,message中帶有任務(wù)開始的時(shí)間戳),這個(gè)任務(wù)還沒(méi)被消費(fèi)成功掏觉,那么就把這個(gè)doing隊(duì)列的那個(gè)就重新塞到pending的隊(duì)列里
發(fā)現(xiàn)問(wèn)題
但是這時(shí)候可能會(huì)出現(xiàn)這樣的問(wèn)題区端,我輪詢doing的隊(duì)列在取任務(wù)的時(shí)候可能因?yàn)槲蚁M(fèi)者的任務(wù)因?yàn)槟承┰蜃龅穆诵敲催@時(shí)候就會(huì)被重新塞會(huì)pending隊(duì)列里澳腹,但是過(guò)兩秒我的doing確實(shí)消費(fèi)完了织盼。
那么怎么解決這個(gè)問(wèn)題呢?
解決方式其實(shí)很簡(jiǎn)單,就是上面的進(jìn)行步驟3的時(shí)候,如果從doing隊(duì)列進(jìn)行刪除的時(shí)候酱塔,如果返回值表示刪除失敗的話,那么說(shuō)明我們的任務(wù)被系統(tǒng)認(rèn)為過(guò)期了沥邻,他被賽入pending中了,那么我們只需要在這個(gè)時(shí)候去pending中重新刪除這個(gè)message消息即可
延伸問(wèn)題
ok羊娃,那么大家覺得這時(shí)候已經(jīng)完工了嗎唐全?其實(shí)并沒(méi)有。蕊玷。邮利。為什么呢?
因?yàn)闀?huì)出現(xiàn)如下這樣一種比較極端的情況:
就是任務(wù)完成之后去doing隊(duì)列中刪除message失敗,然后去pending中刪除也失敗,因?yàn)橛锌赡茉谌蝿?wù)掃描的時(shí)候,吧任務(wù)剛放入pending隊(duì)列中垃帅,沒(méi)等doing完成呢延届,pending中重新放入的任務(wù)就被消費(fèi)了。那么這時(shí)候依然是消息出現(xiàn)重復(fù)
這種情況下的最佳解決方案是什么呢挺智?就是消費(fèi)端做好冪等性處理(其實(shí)像阿里的RocketMq)也會(huì)出現(xiàn)消息重復(fù)的情況(雖然極低概率),但是在Mq中祷愉,似乎設(shè)計(jì)一個(gè)精確只發(fā)一次的模型,是一件比較難的事情赦颇。
深層延伸的方案
上面的消息重復(fù)其實(shí)還是有優(yōu)化的余地,具體的實(shí)現(xiàn)思路如下:
- 優(yōu)化掃描的模型,吧掃描doing過(guò)期任務(wù)變成一個(gè)延遲掃描(如用delayedQueue實(shí)現(xiàn)延遲任務(wù)掃描)
- 吧每個(gè)執(zhí)行的任務(wù)模型用ExecutorService來(lái)管理二鳄,存儲(chǔ)正在執(zhí)行的Future
- 每次掃描到超時(shí)的任務(wù)就去內(nèi)存中查找這個(gè)任務(wù)的Future是否存在,如果存在則不需要吧doing的message放到pending中
- 如果需要超時(shí)機(jī)制的話,找到對(duì)應(yīng)的Future并且取消當(dāng)前任務(wù)的執(zhí)行媒怯,并把之前執(zhí)行的操作進(jìn)行業(yè)務(wù)回滾/rollback订讼,把message放到pending中
不過(guò)我并不推薦這一套方案,因?yàn)檫@一套方案過(guò)于復(fù)雜扇苞,本身就是不是我們用redis作為消息隊(duì)列的初衷欺殿。
總結(jié)
redis作為消息隊(duì)列是有很大的局限性的,本身作為一個(gè)以緩存/內(nèi)存存儲(chǔ)為主的東西,只是因?yàn)槟承゛pi上的特性鳖敷,我們得以實(shí)現(xiàn)一個(gè)簡(jiǎn)單的隊(duì)列服務(wù)脖苏,本身我們要選擇好業(yè)務(wù)的取舍,靈活的使用redis的MQ支持定踱,才能實(shí)現(xiàn)一個(gè)好的服務(wù)棍潘。
基于上述思想的代碼實(shí)踐我已經(jīng)放到了github上,部分代碼還在做成中。
github地址 : https://github.com/wgd12389/redisses/