基于Rabbitmq實(shí)現(xiàn)延遲隊(duì)列

轉(zhuǎn)自 基于Rabbitmq實(shí)現(xiàn)延遲隊(duì)列

基于Rabbitmq實(shí)現(xiàn)延遲隊(duì)列

延遲隊(duì)列的使用場(chǎng)景

  1. 淘寶訂單業(yè)務(wù):下單后30min之內(nèi)沒(méi)有付款庐完,就自動(dòng)取消訂單勺美。
  2. 餓了嗎訂餐通知:下單成功后60s之后給用戶發(fā)送短信通知撒蟀。
  3. 關(guān)閉空閑連接:服務(wù)器中有很多客戶端的連接忆肾,空閑一段時(shí)間之后需要關(guān)閉之夺衍。
  4. 緩存:緩存中的對(duì)象,超過(guò)了空閑時(shí)間登刺,從緩存中移出。
  5. 任務(wù)超時(shí)處理:在網(wǎng)絡(luò)協(xié)議滑動(dòng)窗口請(qǐng)求應(yīng)答式交互時(shí)嗡呼,處理超時(shí)未響應(yīng)的請(qǐng)求纸俭。
  6. 失敗重試機(jī)制:業(yè)務(wù)操作失敗后,間隔一定的時(shí)間進(jìn)行失敗重試南窗。

這類(lèi)業(yè)務(wù)的特點(diǎn)就是:延遲工作揍很、失敗重試。一種比較笨的方式是使用后臺(tái)線程遍歷所有對(duì)象万伤,挨個(gè)檢查窒悔。這種方法雖然簡(jiǎn)單好用,但是對(duì)象數(shù)量過(guò)多時(shí)敌买,可能存在性能問(wèn)題简珠,檢查間隔時(shí)間不好設(shè)置,間隔時(shí)間過(guò)大虹钮,影響精確度聋庵,過(guò)小則存在效率問(wèn)題,而且做不到按超時(shí)的時(shí)間順序處理芙粱。

本地延遲隊(duì)列 DelayQueue

DelayQueue是一個(gè)無(wú)界的BlockingQueue祭玉,用于放置實(shí)現(xiàn)了Delayed接口的對(duì)象,其中的對(duì)象只能在其到期時(shí)才能從隊(duì)列中取走春畔。這種隊(duì)列是有序的攘宙,即隊(duì)頭對(duì)象的延遲到期時(shí)間最長(zhǎng)。Delayed擴(kuò)展了Comparable接口拐迁,比較的基準(zhǔn)為延時(shí)的時(shí)間值蹭劈,Delayed接口的實(shí)現(xiàn)類(lèi)getDelay的返回值應(yīng)為固定值(final)。DelayQueue內(nèi)部是使用PriorityQueue實(shí)現(xiàn)的线召。

DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的關(guān)鍵元素BlockingQueue铺韧、PriorityQueue、Delayed缓淹」颍可以這么說(shuō),DelayQueue是一個(gè)使用優(yōu)先隊(duì)列(PriorityQueue)實(shí)現(xiàn)的BlockingQueue讯壶,優(yōu)先隊(duì)列的比較基準(zhǔn)值是時(shí)間料仗。(注意:不能將null元素放置到這種隊(duì)列)

但是我們知道,利用DelayQueue實(shí)現(xiàn)的是一個(gè)單機(jī)的伏蚊、JVM內(nèi)存中的延遲隊(duì)列立轧,并沒(méi)有集群的支持,而且無(wú)法滿足在對(duì)業(yè)務(wù)系統(tǒng)泵機(jī)的時(shí)、消息消費(fèi)異常的時(shí)候做相應(yīng)的邏輯處理氛改。

基于分布式消息隊(duì)列RabbitMQ實(shí)現(xiàn)延遲隊(duì)列

RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列功能帐萎,但是可以通過(guò)以下特性模擬出延遲隊(duì)列的功能:

Per-Queue Message TTL RabbitMQ可以對(duì)消息和隊(duì)列設(shè)置TTL(過(guò)期時(shí)間)。

RabbitMQ針對(duì)隊(duì)列中的消息過(guò)期時(shí)間(Time To Live, TTL)有兩種方法可以設(shè)置胜卤。第一種方法是通過(guò)隊(duì)列屬性設(shè)置疆导,隊(duì)列中所有消息都有相同的過(guò)期時(shí)間。第二種方法是對(duì)消息進(jìn)行單獨(dú)設(shè)置葛躏,每條消息TTL可以不同澈段。如果上述兩種方法同時(shí)使用,則消息的過(guò)期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)舰攒。消息在隊(duì)列的生存時(shí)間一旦超過(guò)設(shè)置的TTL值败富,就成為dead message,消費(fèi)者將無(wú)法再收到該消息。

Dead Letter Exchanges 死信消息

利用DLX芒率,當(dāng)消息在一個(gè)隊(duì)列中變成死信后囤耳,它能被重新publish到另一個(gè)Exchange篙顺,這個(gè)Exchange就是DLX偶芍。消息變成死信有以下幾種情況:

  1. 消息被拒絕(basic.reject or basic.nack)并且requeue=false
  2. 消息TTL過(guò)期
  3. 隊(duì)列達(dá)到最大長(zhǎng)度

DLX同一般的Exchange沒(méi)有區(qū)別,它能在任何的隊(duì)列上被指定德玫,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性匪蟀。當(dāng)隊(duì)列中有死信消息時(shí),RabbitMQ就會(huì)自動(dòng)的將死信消息重新發(fā)布到設(shè)置的Exchange中去宰僧,進(jìn)而被路由到另一個(gè)隊(duì)列材彪,publish可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能琴儿。

結(jié)合以上兩個(gè)特性段化,就可以模擬出延遲消息的功能.

流程圖

image.png

源代碼

package hbec.app.stock.rabbitmq.utils;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;

/**
 * @Description <strong>基于RabbitMQ實(shí)現(xiàn)的分布式延遲重試隊(duì)列</strong>
 *
 * <ul>
 *      <li>delayExchangeName : 交換器名稱(chēng)</li>
 *      <li>delayQueueName : 延遲隊(duì)列名稱(chēng)</li>
 *      <li>delayRoutingKeyName : 路由器名稱(chēng)</li>
 *      <li>perDelayQueueMessageTTL : 延遲隊(duì)列中message的默認(rèn)ttl</li>
 * </ul>
 * 通過(guò){@link RabbitMQDelayQueue#put(byte[], long, TimeUnit)}首次進(jìn)入延遲隊(duì)列的消息,
 * 其ttl = min(message ttl, per queue message ttl)造成,
 * 消息被Reject/nack之后變成死信消息显熏,其自帶message ttl失效,
 * 以后將按照{(diào)@link #perDelayQueueMessageTTL}指定的延遲時(shí)間投遞給經(jīng)由{@link RabbitMQDelayQueue#consumerRegister}注冊(cè)的消費(fèi)者晒屎,直到消息被Ack.
 *
 * @author roc roc.fly@qq.com
 * @date Dec 9, 2016 3:29:39 PM
 */
public class RabbitMQDelayQueue {

    private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);

    private static final String POSTFIX_TASK = "_task";
    // direct類(lèi)型 交換器
    public static final String EXCHANGE_TYPE_DIRECT = "direct";

    private Connection connection;
    //注冊(cè)消費(fèi)者
    private ConsumerRegister consumerRegister;

    //任務(wù)隊(duì)列配置
    private String taskExchangeName;
    private String taskQueueName;
    private String taskRoutingKeyName;

    //延遲隊(duì)列配置
    private String delayExchangeName;
    private String delayQueueName;
    private String delayRoutingKeyName;

    //延遲隊(duì)列中的消息ttl
    private long perDelayQueueMessageTTL;

    public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName, String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws IOException {
        this.connection = connection;
        this.consumerRegister = consumerRegister;
        this.delayExchangeName = delayExchangeName;
        this.delayQueueName = delayQueueName;
        this.delayRoutingKeyName = delayRoutingKeyName;
        this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
        this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
        this.taskQueueName = delayQueueName + POSTFIX_TASK;
        this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
        init();
        registerConsumer();
    }


    /**
     *
     * @Description 注冊(cè)消費(fèi)者
     * @author roc roc.fly@qq.com
     * @date Dec 29, 2016 1:36:25 PM
     */
    public interface ConsumerRegister {
        public Consumer register(Channel channel) throws IOException;
    }

    /**
     * 注冊(cè)帶有ttl的queue和對(duì)應(yīng)的任務(wù)隊(duì)列
     *
     * @throws IOException
     * @author roc
     */
    private void init() throws IOException {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(taskExchangeName, EXCHANGE_TYPE_DIRECT, true);
        channel.exchangeDeclare(delayExchangeName, EXCHANGE_TYPE_DIRECT, true);

        // 任務(wù)隊(duì)列 B
        HashMap<String, Object> argumentsTask = Maps.newHashMap();
        argumentsTask.put("x-dead-letter-exchange", delayExchangeName);
        argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName);
        channel.queueDeclare(taskQueueName, true, false, false, argumentsTask);
        channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName);

        // 延遲隊(duì)列 A
        HashMap<String, Object> argumentsDelay = Maps.newHashMap();
        argumentsDelay.put("x-dead-letter-exchange", taskExchangeName);
        argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName);
        argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL);
        channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay);
        channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName);

        channel.close();
    }

    /**
     * 注冊(cè)消費(fèi)者
     * @throws IOException
     * @author roc
     */
    private void registerConsumer() throws IOException {
        LOGGER.info("register consumer ->{}", this);
        Channel channel = connection.createChannel();
        Consumer consumer = consumerRegister.register(channel);
        channel.basicConsume(taskQueueName, false, consumer);
        LOGGER.info("register consumer ->{} success", this);
    }

    /**
     * 消息入隊(duì)
     *
     * @param body 消息內(nèi)容
     * @param timeout 超時(shí)時(shí)間
     * @param unit 超時(shí)時(shí)間單位
     * @throws IOException
     * @author roc
     */
    public void put(byte[] body, long timeout, TimeUnit unit) throws IOException {

        Preconditions.checkNotNull(body);
        Preconditions.checkArgument(timeout >= 0);
        Preconditions.checkNotNull(unit);

        LOGGER.info("put element to delay queue ->{}", body.hashCode());
        Channel channel = null;
        try {
            channel = connection.createChannel();
            // deliveryMode=2 標(biāo)識(shí)任務(wù)的持久性
            long millis = unit.toMillis(timeout);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build();
            channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body);
            LOGGER.info("put element to delay queue success");
        } finally {
            if (null != channel)
                channel.close();
        }
    }

    public static class Builder {

        private Connection connection;
        private ConsumerRegister consumerRegister;

        private String delayExchangeName;
        private String delayQueueName;
        private String delayRoutingKeyName;

        private long perDelayQueueMessageTTL;

        public Builder setConnection(Connection connection) {
            this.connection = connection;
            return this;
        }

        public Builder setDelayExchangeName(String delayExchangeName) {
            this.delayExchangeName = delayExchangeName;
            return this;
        }

        public Builder setDelayQueueName(String delayQueueName) {
            this.delayQueueName = delayQueueName;
            return this;
        }

        public Builder setDelayRoutingKeyName(String delayRoutingKeyName) {
            this.delayRoutingKeyName = delayRoutingKeyName;
            return this;
        }

        public Builder setConsumerRegister(ConsumerRegister consumerRegister) {
            this.consumerRegister = consumerRegister;
            return this;
        }

        public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) {
            this.perDelayQueueMessageTTL = unit.toMillis(timeout);;
            return this;
        }

        public RabbitMQDelayQueue build() throws IOException {
            Preconditions.checkNotNull(connection);
            Preconditions.checkNotNull(delayExchangeName);
            Preconditions.checkNotNull(delayQueueName);
            Preconditions.checkNotNull(delayRoutingKeyName);
            Preconditions.checkNotNull(consumerRegister);
            return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName, delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL);
        }

    }

}

測(cè)試代碼

package hbec.app.stock.rabbitmq.utils;

import hbec.app.stock.rabbitmq.utils.RabbitMQDelayQueue.ConsumerRegister;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 測(cè)試demo
 *
 */
public class RabbitMQDelayQueueTest {

    public static void main(String[] args) throws IOException {
        delayQueue();
    }

    public static void delayQueue() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        Address address = new Address("10.0.30.67", 56720);
        Connection connection = factory.newConnection(new Address[] { address });

        RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder().setConnection(connection).setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS).setDelayExchangeName("delay_exchange_roc").setDelayQueueName("delay_queue_roc").setDelayRoutingKeyName("delay_routing_key_roc").setConsumerRegister(new ConsumerRegister() {
            @Override
            public Consumer register(Channel channel) throws IOException {
                return new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                        long deliveryTag = envelope.getDeliveryTag();
                        String exchange = envelope.getExchange();
                        String routingKey = envelope.getRoutingKey();
                        // TODO do something
                        String content = new String(body, Charset.forName("utf-8"));
                        System.out.println("receive message --- > " + content);

                        Map<String, Object> headers = properties.getHeaders();
                        if (headers != null) {
                            List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
                            System.out.println("xDeath--- > " + xDeath);
                            if (xDeath != null && !xDeath.isEmpty()) {
                                Map<String, Object> entrys = xDeath.get(0);
                            }
                        }
                        // 消息拒收
                        // if(do something) 消息重新入隊(duì)
                            getChannel().basicReject(deliveryTag, false);
                        // else 消息應(yīng)答
                        // getChannel().basicAck(deliveryTag, false);
                    }
                };
            }
        }).build();

        delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);

    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末喘蟆,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鼓鲁,更是在濱河造成了極大的恐慌蕴轨,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件骇吭,死亡現(xiàn)場(chǎng)離奇詭異橙弱,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)膘螟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)成福,“玉大人,你說(shuō)我怎么就攤上這事荆残∨” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵内斯,是天一觀的道長(zhǎng)蕴潦。 經(jīng)常有香客問(wèn)我,道長(zhǎng)俘闯,這世上最難降的妖魔是什么潭苞? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮真朗,結(jié)果婚禮上此疹,老公的妹妹穿的比我還像新娘。我一直安慰自己遮婶,他們只是感情好蝗碎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著旗扑,像睡著了一般蹦骑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上臀防,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天眠菇,我揣著相機(jī)與錄音,去河邊找鬼袱衷。 笑死捎废,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的致燥。 我是一名探鬼主播登疗,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼篡悟!你這毒婦竟也來(lái)了谜叹?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤搬葬,失蹤者是張志新(化名)和其女友劉穎荷腊,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體急凰,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡女仰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年猜年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疾忍。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡乔外,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出一罩,到底是詐尸還是另有隱情杨幼,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布聂渊,位于F島的核電站差购,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏汉嗽。R本人自食惡果不足惜欲逃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望饼暑。 院中可真熱鬧稳析,春花似錦、人聲如沸弓叛。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)邪码。三九已至裕菠,卻和暖如春咬清,著一層夾襖步出監(jiān)牢的瞬間闭专,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工旧烧, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留影钉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓掘剪,卻偏偏與公主長(zhǎng)得像平委,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夺谁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容