用rebbitMq來(lái)實(shí)現(xiàn)你的延遲隊(duì)列功能

延遲隊(duì)列

在我們的上一篇文章使用delayedQueue實(shí)現(xiàn)你本地的延遲隊(duì)列
中提到了延遲隊(duì)列的作用.

但是我們知道鹏往,利用delayedQueue實(shí)現(xiàn)的是一個(gè)單機(jī)的,而且是內(nèi)存中的延遲隊(duì)列,他并沒(méi)有一個(gè)集群的支持藏雏,并且需要在對(duì)泵機(jī)的時(shí)候涯塔,消息消費(fèi)異常的時(shí)候做相應(yīng)的邏輯處理。

那么這樣做的話(huà)惯驼,我們需要的工作量還是很大的蹲嚣,有沒(méi)有什么東西是讓我們不做這一部分的工作也能實(shí)現(xiàn)延遲隊(duì)列的功能?

當(dāng)然有了祟牲。答案是:rabbitMq

利用rabbitMq來(lái)實(shí)現(xiàn)延遲隊(duì)列的功能

那么如何利用rabbitMq來(lái)實(shí)現(xiàn)延遲隊(duì)列的功能呢隙畜?

請(qǐng)先注意一點(diǎn),RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列功能说贝,但是可以通過(guò)以下特性模擬出延遲隊(duì)列的功能议惰。那么這是通過(guò)哪些特性呢,那就讓我們來(lái)認(rèn)識(shí)一下這兩個(gè)特性吧.

  • Per-Queue Message TTL

    RabbitMQ可以對(duì)消息和隊(duì)列設(shè)置TTL(過(guò)期時(shí)間)。

    RabbitMQ針對(duì)隊(duì)列中的消息過(guò)期時(shí)間(Time To Live, TTL)有兩種方法可以設(shè)置乡恕。第一種方法是通過(guò)隊(duì)列屬性設(shè)置言询,隊(duì)列中所有消息都有相同的過(guò)期時(shí)間。第二種方法是對(duì)消息進(jìn)行單獨(dú)設(shè)置傲宜,每條消息TTL可以不同运杭。如果上述兩種方法同時(shí)使用,則消息的過(guò)期時(shí)間以?xún)烧咧gTTL較小的那個(gè)數(shù)值為準(zhǔn)函卒。消息在隊(duì)列的生存時(shí)間一旦超過(guò)設(shè)置的TTL值县习,就成為dead message,消費(fèi)者將無(wú)法再收到該消息。

  • Dead Letter Exchanges

    利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后躁愿,它能被重新publish到另一個(gè)Exchange叛本,這個(gè)Exchange就是DLX。消息變成死信一向有以下幾種情況:

    • 消息被拒絕(basic.reject or basic.nack)并且requeue=false
    • 消息TTL過(guò)期
    • 隊(duì)列達(dá)到最大長(zhǎng)度

    DLX也是一下正常的Exchange同一般的Exchange沒(méi)有區(qū)別彤钟,它能在任何的隊(duì)列上被指定来候,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性,當(dāng)這個(gè)隊(duì)列中有死信時(shí)逸雹,RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange中去营搅,進(jìn)而被路由到另一個(gè)隊(duì)列,publish可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理梆砸,這個(gè)特性可以彌補(bǔ)RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能转质。

結(jié)合以上兩個(gè)特性,就可以模擬出延遲消息的功能.

基于x-dead-letter-routing-key的單條消息延遲隊(duì)列的java代碼實(shí)現(xiàn)

生產(chǎn)者(發(fā)送)端代碼:


import java.util.HashMap;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    //隊(duì)列名稱(chēng)  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] argv) throws Exception  
    {  
        
        /** 
         * 創(chuàng)建連接連接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        //設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名  
        factory.setHost("localhost");  
        //創(chuàng)建一個(gè)連接  
        Connection connection = factory.newConnection();
        //創(chuàng)建一個(gè)頻道  
        Channel channel = connection.createChannel();  
        
        
 
        
        //指定一個(gè)隊(duì)列  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
        //發(fā)送的消息  
        String message = "hello world!"+System.currentTimeMillis();;  

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
        AMQP.BasicProperties properties = builder.expiration("2000").deliveryMode(2).build();  
        //往隊(duì)列中發(fā)出一條消息     這時(shí)候要發(fā)送的隊(duì)列不應(yīng)該是QUEUE_NAME帖世,這樣才能進(jìn)行轉(zhuǎn)發(fā)的
        channel.basicPublish("", "DELAY_QUEUE", properties, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'" );  
        //關(guān)閉頻道和連接  
        channel.close();  
        connection.close();  
     }  
}

消費(fèi)者(接受)端代碼:


import java.util.HashMap;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    // 隊(duì)列名稱(chēng)
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
        // channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);

        HashMap<String, Object> arguments = new HashMap<String, Object>();  
        arguments.put("x-dead-letter-exchange", "amq.direct");  
        arguments.put("x-dead-letter-routing-key", QUEUE_NAME);  
        channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments); 
        
        //聲明隊(duì)列休蟹,主要為了防止消息接收者先運(yùn)行此程序,隊(duì)列還不存在時(shí)創(chuàng)建隊(duì)列日矫。  
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 
      //創(chuàng)建隊(duì)列消費(fèi)者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        //指定消費(fèi)隊(duì)列  
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true)  
        {  
            //nextDelivery是一個(gè)阻塞方法(內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'"+ "'   [當(dāng)前系統(tǒng)時(shí)間戳]" +System.currentTimeMillis());  
        }  
    }
}

參考資料

http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
https://www.cloudamqp.com/docs/delayed-messages.html
http://www.netfoucs.com/article/xtjsxtj/73636.html#

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末赂弓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子哪轿,更是在濱河造成了極大的恐慌盈魁,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件窃诉,死亡現(xiàn)場(chǎng)離奇詭異杨耙,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)飘痛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)珊膜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人敦冬,你說(shuō)我怎么就攤上這事∥ň冢” “怎么了脖旱?”我有些...
    開(kāi)封第一講書(shū)人閱讀 169,301評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)介蛉。 經(jīng)常有香客問(wèn)我萌庆,道長(zhǎng),這世上最難降的妖魔是什么币旧? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 60,078評(píng)論 1 300
  • 正文 為了忘掉前任践险,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘巍虫。我一直安慰自己彭则,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布占遥。 她就那樣靜靜地躺著俯抖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瓦胎。 梳的紋絲不亂的頭發(fā)上芬萍,一...
    開(kāi)封第一講書(shū)人閱讀 52,682評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音搔啊,去河邊找鬼柬祠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛负芋,可吹牛的內(nèi)容都是我干的漫蛔。 我是一名探鬼主播,決...
    沈念sama閱讀 41,155評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼示罗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惩猫!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起蚜点,我...
    開(kāi)封第一講書(shū)人閱讀 40,098評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤轧房,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后绍绘,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體奶镶,經(jīng)...
    沈念sama閱讀 46,638評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評(píng)論 3 342
  • 正文 我和宋清朗相戀三年陪拘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了厂镇。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,852評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡左刽,死狀恐怖捺信,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情欠痴,我是刑警寧澤迄靠,帶...
    沈念sama閱讀 36,520評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站喇辽,受9級(jí)特大地震影響掌挚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜菩咨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評(píng)論 3 335
  • 文/蒙蒙 一吠式、第九天 我趴在偏房一處隱蔽的房頂上張望陡厘。 院中可真熱鬧,春花似錦特占、人聲如沸糙置。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,674評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)罢低。三九已至,卻和暖如春胖笛,著一層夾襖步出監(jiān)牢的瞬間网持,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,788評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工长踊, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留功舀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,279評(píng)論 3 379
  • 正文 我出身青樓身弊,卻偏偏與公主長(zhǎng)得像辟汰,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子阱佛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 顧名思義帖汞,延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列。而一般的隊(duì)列凑术,消息一旦入隊(duì)了之后就會(huì)被消費(fèi)者馬上消費(fèi)翩蘸。 ...
    Java架構(gòu)閱讀 2,191評(píng)論 3 24
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)淮逊,斷路器催首,智...
    卡卡羅2017閱讀 134,715評(píng)論 18 139
  • 背景 何為延遲隊(duì)列? 顧名思義泄鹏,延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列郎任。而一般的隊(duì)列,消息一旦入隊(duì)了之后就...
    wooyoo閱讀 3,454評(píng)論 0 17
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器备籽。支持消息的持久化舶治、事務(wù)、擁塞控...
    jiangmo閱讀 10,369評(píng)論 2 34
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過(guò)消息的發(fā)送與接收的方式進(jìn)行通信车猬,當(dāng)消息接收方服務(wù)忙或不可用時(shí)霉猛,其提供了一...
    zhuke閱讀 4,476評(píng)論 0 12