SpringBoot + RabbitMQ 實現(xiàn)TTL轉(zhuǎn)死信隊列

  1. 引入依賴

     <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.66</version>
            </dependency>
    
        </dependencies>
    
  2. 加入配置

    spring.application.name=springboot_rabbitmq
    spring.rabbitmq.host=192.168.233.102
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.username=root
    spring.rabbitmq.password=123456
    spring.rabbitmq.port=5672
    
  3. 隊列配置

    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.amqp.core.*;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 定義訂單的隊列年缎、交換機(jī)荡碾、隊列綁定交換機(jī)庭瑰、設(shè)置路由key
     */
    @Configuration
    public class OrderRabbitConfig {
    
        @Bean
        public Queue orderQueueTTL(){
            Map<String, Object> arguments = new HashMap<>();
            // ttl = 30s
            arguments.put("x-message-ttl", 30 * 1000);
            // 設(shè)置關(guān)聯(lián)的死信隊列
            if (true) {
                arguments.put("x-dead-letter-exchange", "ex.order.dlx");
                arguments.put("x-dead-letter-routing-key", "order.pay.dlx");
            }
    
            return new Queue("q.order.ttl", false, false, false, arguments);
        }
    
        @Bean
        public Exchange orderExchangeTTL() {
            return new DirectExchange("ex.order.ttl", false, false);
        }
        @Bean
        public Binding orderBindingTTL(){
            return BindingBuilder.bind(orderQueueTTL()).to(orderExchangeTTL()).with("order.pay.ttl").noargs();
        }
    }
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 定義死信:隊列桂躏、交換機(jī)、隊列綁定交換機(jī)烧董、設(shè)置路由key
     */
    @Configuration
    public class DLXRabbitConfig {
    
        @Bean
        public Queue orderQueueDLX(){
            return new Queue("q.order.dlx", false, false, false, null);
        }
    
        @Bean
        public Exchange orderExchangeDLX() {
            return new DirectExchange("ex.order.dlx", false, false);
        }
    
        @Bean
        public Binding orderBindingDLX(){
            return BindingBuilder.bind(orderQueueDLX()).to(orderExchangeDLX()).with("order.pay.dlx").noargs();
        }
    }
    
  4. 訂單實現(xiàn)

    
    /**
     * 訂單控制器
     */
    @Controller
    @RequestMapping("/order")
    public class OrderController {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
        /**
         * 提交訂單
         * @return
         */
        @RequestMapping("/submit")
        @ResponseBody
        public String submit(){
            String orderNo = UUID.randomUUID().toString();
    
            final Order order = new Order(orderNo, "測試商品",
                    new BigDecimal(1000.00), new Date(), Order.STATUS_WAIT_PAY);
            OrderDB.orderMap.put(orderNo, order);
    
            rabbitTemplate.convertAndSend("ex.order.ttl", "order.pay.ttl", JSONObject.toJSON(order).toString());
    
            return "下單成功待支付:" + orderNo;
        }
    
        /**
         * 訂單列表
         * @return
         */
        @RequestMapping("/list")
        @ResponseBody
        public Object[] list(){
            return OrderDB.orderMap.values().toArray();
        }
    }
    
  1. 支付實現(xiàn)

    
    /**
     * 支付控制器
     */
    @Controller
    @RequestMapping("/pay")
    public class PayController {
    
        @Autowired
        CachingConnectionFactory factory;
    
        @RequestMapping("/{orderNo}")
        @ResponseBody
        public String pay(@PathVariable("orderNo") String orderNo){
    
            final Order order = OrderDB.orderMap.get(orderNo);
            // 從mq中拉取支付信息毁靶,確認(rèn)支付狀態(tài)
            if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) {
                final Connection connection = factory.createConnection();
                try(final Channel channel = connection.createChannel(true)){
                    // 主動拉取消息: 從隊列; 指定是否自動確認(rèn)消息
                    final GetResponse getResponse = channel.basicGet("q.order.ttl", true);
    //                需要注意的 basicAck 方法需要傳遞兩個參數(shù)
    //                deliveryTag(唯一標(biāo)識 ID):當(dāng)一個消費(fèi)者向 RabbitMQ 注冊后逊移,會建立起一個 Channel 预吆,RabbitMQ 會用 basic.deliver 方法向消費(fèi)者推送消息,這個方法攜帶了一個 delivery tag胳泉, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識 ID拐叉,是一個單調(diào)遞增的正整數(shù)岩遗,delivery tag 的范圍僅限于 Channel
    //                multiple:為了減少網(wǎng)絡(luò)流量,手動確認(rèn)可以被批處理凤瘦,當(dāng)該參數(shù)為 true 時宿礁,則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息
                    channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false);
    
                    // 更新訂單狀態(tài)
                    order.setOrderStatus(Order.STATUS_PAY_SUCCESS);
                    OrderDB.orderMap.put(orderNo, order);
                } catch (Exception e){
                    e.printStackTrace();
                } finally {
                    if (connection.isOpen()){
                        connection.close();
                    }
                }
    
                return "支付成功";
            } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) {
                return "支付已支付成功,請勿重復(fù)支付";
            } else {
                return "訂單已超時";
            }
    
        }
    
        /**
         * 監(jiān)聽訂單死信隊列消息
         */
        @RabbitListener(queues = "q.order.dlx")
        public void dixLinseed(String message){
            System.out.println(message);
            try {
                final Order order = JSONObject.parseObject(message, Order.class);
                if (order.getOrderStatus().equals(Order.STATUS_WAIT_PAY)) {
                    order.setOrderStatus(Order.STATUS_TIMEOUT);
                    OrderDB.orderMap.put(order.getOrderNo(), order);
                    System.out.println("訂單支付超時:" + order.getOrderNo());
                } else if (order.getOrderStatus().equals(Order.STATUS_PAY_SUCCESS)) {
                    System.out.println("訂單已支付成功廷粒,死信隊列監(jiān)聽處理窘拯。红且。坝茎。跳過");
                }
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    
    }
    

操作地址

提交訂單:
http://localhost:8080/order/submit
訂單列表:
http://localhost:8080/order/list
訂單支付:
http://localhost:8080/pay/{orderNo}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市暇番,隨后出現(xiàn)的幾起案子嗤放,更是在濱河造成了極大的恐慌,老刑警劉巖壁酬,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件次酌,死亡現(xiàn)場離奇詭異,居然都是意外死亡舆乔,警方通過查閱死者的電腦和手機(jī)岳服,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來希俩,“玉大人吊宋,你說我怎么就攤上這事⊙瘴洌” “怎么了璃搜?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鳞上。 經(jīng)常有香客問我这吻,道長,這世上最難降的妖魔是什么篙议? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任唾糯,我火速辦了婚禮,結(jié)果婚禮上鬼贱,老公的妹妹穿的比我還像新娘移怯。我一直安慰自己,他們只是感情好吩愧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布芋酌。 她就那樣靜靜地躺著,像睡著了一般雁佳。 火紅的嫁衣襯著肌膚如雪脐帝。 梳的紋絲不亂的頭發(fā)上同云,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天,我揣著相機(jī)與錄音堵腹,去河邊找鬼炸站。 笑死,一個胖子當(dāng)著我的面吹牛疚顷,可吹牛的內(nèi)容都是我干的旱易。 我是一名探鬼主播,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼腿堤,長吁一口氣:“原來是場噩夢啊……” “哼阀坏!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起笆檀,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤忌堂,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后酗洒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體士修,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年樱衷,在試婚紗的時候發(fā)現(xiàn)自己被綠了棋嘲。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡矩桂,死狀恐怖沸移,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耍鬓,我是刑警寧澤阔籽,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站牲蜀,受9級特大地震影響笆制,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜涣达,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一在辆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧度苔,春花似錦匆篓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至甩骏,卻和暖如春窗市,著一層夾襖步出監(jiān)牢的瞬間先慷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工咨察, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留论熙,地道東北人。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓摄狱,卻偏偏與公主長得像脓诡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子媒役,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,652評論 2 354