RabbitMQ實現(xiàn)延遲任務

摘自:https://www.cnblogs.com/haoxinyue/p/6613706.html

使用RabbitMQ實現(xiàn)延遲任務

場景一:物聯(lián)網(wǎng)系統(tǒng)經常會遇到向終端下發(fā)命令领突,如果命令一段時間沒有應答,就需要設置成超時。

場景二:訂單下單之后30分鐘后俏讹,如果用戶沒有付錢靠柑,則系統(tǒng)自動取消訂單具壮。

上述類似的需求是我們經常會遇見的問題梯轻。最常用的方法是定期輪訓數(shù)據(jù)庫,設置狀態(tài)输莺。在數(shù)據(jù)量小的時候并沒有什么大的問題,但是數(shù)據(jù)量一大輪訓數(shù)據(jù)庫的方式就會變得特別耗資源漾稀。當面對千萬級模闲、上億級數(shù)據(jù)量時,本身寫入的IO就比較高崭捍,導致長時間查詢或者根本就查不出來尸折,更別說分庫分表以后了。除此之外殷蛇,還有優(yōu)先級隊列实夹,基于優(yōu)先級隊列的JDK延遲隊列,時間輪等方式粒梦。但如果系統(tǒng)的架構中本身就有RabbitMQ的話亮航,那么選擇RabbitMQ來實現(xiàn)類似的功能也是一種選擇。

使用RabbitMQ來實現(xiàn)延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange匀们,通過這兩者的組合來實現(xiàn)上述需求缴淋。

消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL泄朴。對隊列設置就是隊列沒有消費者連著的保留時間重抖,也可以對每一個單獨的消息做單獨的設置。超過了這個時間祖灰,我們認為這個消息就死了钟沛,稱之為死信。如果隊列設置了局扶,消息也設置了恨统,那么會取小的。所以一個消息如果被路由到不同的隊列中三妈,這個消息死亡的時間有可能不一樣(不同的隊列設置)畜埋。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務的關鍵畴蒲。

可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間由捎,兩者是一樣的效果。只是expiration字段是字符串參數(shù)饿凛,所以要寫個int類型的字符串:

byte[] messageBodyBytes = "Hello, world!".getBytes();

AMQP.BasicProperties properties =new AMQP.BasicProperties();

properties.setExpiration("60000");

channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

當上面的消息扔到隊列中后狞玛,過了60秒软驰,如果沒有被消費,它就死了心肪。不會被消費者消費到锭亏。這個消息后面的,沒有“死掉”的消息對頂上來硬鞍,被消費者消費慧瘤。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去固该。單靠死信還不能實現(xiàn)延遲任務锅减,還要靠Dead Letter Exchange。

Dead Letter Exchanges

Exchage的概念在這里就不在贅述伐坏,可以從這里進行了解怔匣。一個消息在滿足如下條件下,會進死信路由桦沉,記住這里是路由而不是隊列每瞒,一個路由可以對應很多隊列。

1. 一個消息被Consumer拒收了纯露,并且reject方法的參數(shù)里requeue是false剿骨。也就是說不會被再次放在隊列里,被其他消費者使用埠褪。

2. 上面的消息的TTL到了浓利,消息過期了。

3. 隊列的長度限制滿了钞速。排在前面的消息會被丟棄或者扔到死信路由上贷掖。

Dead Letter Exchange其實就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣玉工。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了羽资,會自動觸發(fā)消息的轉發(fā)淘菩,發(fā)送到Dead Letter Exchange中去遵班。

實現(xiàn)延遲隊列

延遲任務通過消息的TTL和Dead Letter Exchange來實現(xiàn)。我們需要建立2個隊列潮改,一個用于發(fā)送消息狭郑,一個用于消息過期后的轉發(fā)目標隊列。


生產者輸出消息到Queue1汇在,并且這個消息是設置有有效時間的翰萨,比如60s。消息會在Queue1中等待60s糕殉,如果沒有消費者收掉的話亩鬼,它就是被轉發(fā)到Queue2殖告,Queue2有消費者,收到雳锋,處理延遲任務黄绩。

具體實現(xiàn)步驟如下:

第一步, 首先需要創(chuàng)建2個隊列玷过。Queue1和Queue2爽丹。Queue1是一個消息緩沖隊列,在這個隊列里面實現(xiàn)消息的過期轉發(fā)辛蚊。如下圖粤蝎,設置Dead letter exchange和Dead letter routing key。設置這兩個屬性就是當消息在這個隊列中expire后袋马,采用哪個路由發(fā)送初澎。這個dlx的exchange需要事先創(chuàng)建好,就是一個普通的exchange飞蛹。由于我們還需要向Queue1發(fā)送消息谤狡,那么還需要創(chuàng)建一個exchange,并且和Queue1綁定卧檐。例子中墓懂,exchange同樣取名:queue1。

我們還需要建一個Queue2霉囚,這個隊列用于消息在Queue1中過期后轉發(fā)的目標隊列捕仔。所以這個Queue2隊列建好以后,需要綁定Queue1設置的死信路由:dlx盈罐。完成Queue2的綁定以后榜跌,環(huán)境就搭建完成了。


第二步盅粪,實現(xiàn)消息的Producer钓葫。由于我們的目的是讓進入Queue1的消息過期,然后自動轉送到Queue2中票顾,所以發(fā)送的時候础浮,需要設置過期時間。

ConnectionFactory factory =new ConnectionFactory();

? ? ? ? ? ? factory.setUsername("bsp");

? ? ? ? ? ? factory.setPassword("123456");

? ? ? ? ? ? factory.setVirtualHost("/");

? ? ? ? ? ? factory.setHost("10.23.22.42");

? ? ? ? ? ? factory.setPort(5672);

? ? ? ? ? ? conn = factory.newConnection();

? ? ? ? ? ? channel = conn.createChannel();

? ? ? ? ? ? byte[] messageBodyBytes = "Hello, world!".getBytes();

? ? ? ? ? ? bytei = 10;

? ? ? ? ? ? while(i-- > 0) {? ? ? ? ? ? ? ?

? ? ? ? ? ? ? ? channel.basicPublish("queue1", "queue1",newAMQP.BasicProperties.Builder().expiration(String.valueOf(i * 1000)).build(),

? ? ? ? ? ? ? ? ? ? ? ? newbyte[] { i });

? ? ? ? ? ? }

上面的代碼我模擬了1-10號消息奠骄,消息的內容里面是1-10豆同。過期的時間是10-1秒。這里要注意含鳞,雖然10是第一個發(fā)送影锈,但是它過期的時間最長。


第三步,實現(xiàn)消息的Consumer鸭廷。Consumer就是延遲任務的具體實施者枣抱。由于具體的任務往往是一個比較耗時的任務,所以一般來說辆床,任務一般在異步線程中執(zhí)行沃但。

ConnectionFactory factory =new ConnectionFactory();

factory.setUsername("bsp");

factory.setPassword("123456");

factory.setVirtualHost("/");

factory.setHost("10.23.22.42");

factory.setPort(5672);

conn = factory.newConnection();

channel = conn.createChannel();

channel.basicConsume("queue2",true, "consumer",new DefaultConsumer(channel) {

@OverridepublicvoidhandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body)throws IOException {longdeliveryTag = envelope.getDeliveryTag();

? ? ? ? ? ? ? ? ? ? //do some work asyncSystem.out.println(body[0]);}

});


運行后如上面的程序,過了10s以后佛吓,消費者開始收到數(shù)據(jù)宵晚,但是它是一次性收到如下結果:

10、9 维雇、8 淤刃、7 、6吱型、5 逸贾、4 、3 津滞、2 铝侵、1

Consumer第一個收到的還是10。雖然10是第一個放進隊列触徐,但是它的過期時間最長咪鲜。所以由此可見,即使一個消息比在同一隊列中的其他消息提前過期撞鹉,提前過期的也不會優(yōu)先進入死信隊列疟丙,它們還是按照入庫的順序讓消費者消費。如果第一進去的消息過期時間是1小時鸟雏,那么死信隊列的消費者也許等1小時才能收到第一個消息享郊。參考官方文檔發(fā)現(xiàn)“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當過期的消息到了隊列的頂端(隊首),才會被真正的丟棄或者進入死信隊列孝鹊。

所以在考慮使用RabbitMQ來實現(xiàn)延遲任務隊列的時候炊琉,需要確保業(yè)務上每個任務的延遲時間是一致的。如果遇到不同的任務類型需要不同的延時的話又活,需要為每一種不同延遲時間的消息建立單獨的消息隊列苔咪。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市皇钞,隨后出現(xiàn)的幾起案子悼泌,更是在濱河造成了極大的恐慌松捉,老刑警劉巖夹界,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡可柿,警方通過查閱死者的電腦和手機鸠踪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來复斥,“玉大人营密,你說我怎么就攤上這事∧慷В” “怎么了评汰?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長痢虹。 經常有香客問我被去,道長,這世上最難降的妖魔是什么奖唯? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任惨缆,我火速辦了婚禮,結果婚禮上丰捷,老公的妹妹穿的比我還像新娘坯墨。我一直安慰自己,他們只是感情好病往,可當我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布捣染。 她就那樣靜靜地躺著,像睡著了一般停巷。 火紅的嫁衣襯著肌膚如雪液斜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天叠穆,我揣著相機與錄音少漆,去河邊找鬼。 笑死硼被,一個胖子當著我的面吹牛示损,可吹牛的內容都是我干的。 我是一名探鬼主播嚷硫,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼检访,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了仔掸?” 一聲冷哼從身側響起脆贵,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎起暮,沒想到半個月后卖氨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年筒捺,在試婚紗的時候發(fā)現(xiàn)自己被綠了柏腻。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡系吭,死狀恐怖五嫂,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情肯尺,我是刑警寧澤沃缘,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站则吟,受9級特大地震影響孩灯,放射性物質發(fā)生泄漏。R本人自食惡果不足惜逾滥,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一峰档、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧寨昙,春花似錦讥巡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至捉蚤,卻和暖如春抬驴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背缆巧。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工布持, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人陕悬。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓题暖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親捉超。 傳聞我的和親對象是個殘疾皇子胧卤,可洞房花燭夜當晚...
    茶點故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內容