RabbitMQ延遲隊列及消息延遲推送實現(xiàn)詳解

這篇文章主要介紹了RabbitMQ延遲隊列及消息延遲推送實現(xiàn)詳解,文中通過示例代碼介紹的非常詳細(xì)伴网,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
應(yīng)用場景
目前常見的應(yīng)用軟件都有消息的延遲推送的影子洁段,應(yīng)用也極為廣泛阁谆,例如:
?淘寶七天自動確認(rèn)收貨。在我們簽收商品后漾肮,物流系統(tǒng)會在七天后延時發(fā)送一個消息給支付系統(tǒng)冗茸,通知支付系統(tǒng)將款打給商家,這個過程持續(xù)七天评腺,就是使用了消息中間件的延遲推送功能。
?12306 購票支付確認(rèn)頁面淑掌。我們在選好票點擊確定跳轉(zhuǎn)的頁面中往往都會有倒計時蒿讥,代表著 30 分鐘內(nèi)訂單不確認(rèn)的話將會自動取消訂單。其實在下訂單那一刻開始購票業(yè)務(wù)系統(tǒng)就會發(fā)送一個延時消息給訂單系統(tǒng),延時30分鐘芋绸,告訴訂單系統(tǒng)訂單未完成媒殉,如果我們在30分鐘內(nèi)完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息侥钳。
在上面兩種場景中,如果我們使用下面兩種傳統(tǒng)解決方案無疑大大降低了系統(tǒng)的整體性能和吞吐量:
?使用 redis 給訂單設(shè)置過期時間柄错,最后通過判斷 redis 中是否還有該訂單來決定訂單是否已經(jīng)完成舷夺。這種解決方案相較于消息的延遲推送性能較低,因為我們知道 redis 都是存儲于內(nèi)存中售貌,我們遇到惡意下單或者刷單的將會給內(nèi)存帶來巨大壓力给猾。
?使用傳統(tǒng)的數(shù)據(jù)庫輪詢來判斷數(shù)據(jù)庫表中訂單的狀態(tài),這無疑增加了IO次數(shù)颂跨,性能極低敢伸。
?使用 jvm 原生的 DelayQueue ,也是大量占用內(nèi)存恒削,而且沒有持久化策略池颈,系統(tǒng)宕機(jī)或者重啟都會丟失訂單信息。
消息延遲推送的實現(xiàn)
在 RabbitMQ 3.6.x 之前我們一般采用死信隊列+TTL過期時間來實現(xiàn)延遲隊列钓丰,我們這里不做過多介紹躯砰,可以參考之前文章來了解:TTL、死信隊列
在 RabbitMQ 3.6.x 開始携丁,RabbitMQ 官方提供了延遲隊列的插件琢歇,可以下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載



首先我們創(chuàng)建交換機(jī)和消息隊列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class MQConfig {
 
  public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
  public static final String LAZY_QUEUE = "MQ.LazyQueue";
  public static final String LAZY_KEY = "lazy.#";
 
  @Bean
  public TopicExchange lazyExchange(){
    //Map<String, Object> pros = new HashMap<>();
    //設(shè)置交換機(jī)支持延遲消息推送
    //pros.put("x-delayed-message", "topic");
    TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
    exchange.setDelayed(true);
    return exchange;
  }
 
  @Bean
  public Queue lazyQueue(){
    return new Queue(LAZY_QUEUE, true);
  }
 
  @Bean
  public Binding lazyBinding(){
    return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
  }
}

我們在 Exchange 的聲明中可以設(shè)置exchange.setDelayed(true)來開啟延遲隊列梦鉴,也可以設(shè)置為以下內(nèi)容傳入交換機(jī)聲明的方法中李茫,因為第一種方式的底層就是通過這種方式來實現(xiàn)的。

//Map<String, Object> pros = new HashMap<>();
//設(shè)置交換機(jī)支持延遲消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

發(fā)送消息時我們需要指定延遲推送的時間肥橙,我們這里在發(fā)送消息的方法中傳入?yún)?shù) new MessagePostProcessor() 是為了獲得 Message對象魄宏,因為需要借助 Message對象的api 來設(shè)置延遲時間。

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
public class MQSender {
 
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
  //confirmCallback returnCallback 代碼省略存筏,請參照上一篇
  
  public void sendLazy(Object message){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 時間戳 全局唯一
    CorrelationData correlationData = new CorrelationData("12345678909"+new Date());
 
    //發(fā)送消息時指定 header 延遲時間
    rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
        new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        //設(shè)置消息持久化
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        //message.getMessageProperties().setHeader("x-delay", "6000");
        message.getMessageProperties().setDelay(6000);
        return message;
      }
    }, correlationData);
  }
}

我們可以觀察 setDelay(Integer i)底層代碼娜庇,也是在 header 中設(shè)置 x-delay。等同于我們手動設(shè)置 header
message.getMessageProperties().setHeader("x-delay", "6000");

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
  if (delay == null || delay < 0) {
    this.headers.remove(X_DELAY);
  }
  else {
    this.headers.put(X_DELAY, delay);
  }
}

消費(fèi)端進(jìn)行消費(fèi)

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.Map;
 
@Component
public class MQReceiver {
 
  @RabbitListener(queues = "MQ.LazyQueue")
  @RabbitHandler
  public void onLazyMessage(Message msg, Channel channel) throws IOException{
    long deliveryTag = msg.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, true);
    System.out.println("lazy receive " + new String(msg.getBody()));
 
  }
測試結(jié)果
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
 
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
 
  @Autowired
  private MQSender mqSender;
 
  @Test
  public void sendLazy() throws Exception {
    String msg = "hello spring boot";
 
    mqSender.sendLazy(msg + ":");
  }
}

果然在 6 秒后收到了消息 lazy receive hello spring boot方篮。
文章來源于網(wǎng)絡(luò)名秀。
感謝大家閱讀,歡迎大家私信討論藕溅。給大家推薦一個Java技術(shù)交流群:473984645里面會分享一些資深架構(gòu)師錄制的視頻資料:有Spring匕得,MyBatis,Netty源碼分析,高并發(fā)汁掠、高性能略吨、分布式、微服務(wù)架構(gòu)的原理考阱,JVM性能優(yōu)化翠忠、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領(lǐng)取免費(fèi)的學(xué)習(xí)資源乞榨,目前受益良多秽之!
推薦大家閱讀:
Java高級架構(gòu)學(xué)習(xí)資料分享+架構(gòu)師成長之路?
個人整理了更多資料以PDF文件的形式分享給大家,需要查閱的程序員朋友可以來免費(fèi)領(lǐng)取吃既。還有我的學(xué)習(xí)筆記PDF文件也免費(fèi)分享給有需要朋友考榨!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市鹦倚,隨后出現(xiàn)的幾起案子河质,更是在濱河造成了極大的恐慌,老刑警劉巖震叙,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掀鹅,死亡現(xiàn)場離奇詭異,居然都是意外死亡媒楼,警方通過查閱死者的電腦和手機(jī)淫半,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來匣砖,“玉大人科吭,你說我怎么就攤上這事『秭辏” “怎么了对人?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拂共。 經(jīng)常有香客問我牺弄,道長,這世上最難降的妖魔是什么宜狐? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任势告,我火速辦了婚禮,結(jié)果婚禮上抚恒,老公的妹妹穿的比我還像新娘咱台。我一直安慰自己,他們只是感情好俭驮,可當(dāng)我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布回溺。 她就那樣靜靜地躺著春贸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪遗遵。 梳的紋絲不亂的頭發(fā)上萍恕,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天,我揣著相機(jī)與錄音车要,去河邊找鬼允粤。 笑死,一個胖子當(dāng)著我的面吹牛翼岁,可吹牛的內(nèi)容都是我干的类垫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼登澜,長吁一口氣:“原來是場噩夢啊……” “哼阔挠!你這毒婦竟也來了飘庄?” 一聲冷哼從身側(cè)響起脑蠕,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎跪削,沒想到半個月后谴仙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡碾盐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年晃跺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毫玖。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡掀虎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出付枫,到底是詐尸還是另有隱情烹玉,我是刑警寧澤,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布阐滩,位于F島的核電站二打,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏掂榔。R本人自食惡果不足惜继效,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望装获。 院中可真熱鬧瑞信,春花似錦、人聲如沸穴豫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至潘鲫,卻和暖如春翁逞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背溉仑。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工挖函, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人浊竟。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓怨喘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親振定。 傳聞我的和親對象是個殘疾皇子必怜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內(nèi)容