C1-RabbitMQ(消息隊(duì)列)--- 分解 B 2021年7月30日 23:54:04

分布式框架中間件總綱

http://www.reibang.com/p/00aa796bb5b8

友情鏈接(消息三解序)

1陋气、RabbitMQ(消息隊(duì)列)--- 分解 A
2劳吠、RabbitMQ(消息隊(duì)列)--- 分解 B
3、RabbitMQ(消息隊(duì)列)--- 分解 C
4巩趁、RabbitMQ(消息隊(duì)列)--- 面試題

本章目錄

一痒玩、發(fā)布確定
???????1、發(fā)布確認(rèn)原理
???????2议慰、發(fā)布確認(rèn)的策略
??????????????開啟發(fā)布確認(rèn)的方法
??????????????單個(gè)確認(rèn)發(fā)布
??????????????批量確認(rèn)發(fā)布
??????????????異步確認(rèn)發(fā)布
??????????????如何處理異步未確認(rèn)消息
??????????????以上 3 種發(fā)布確認(rèn)速度對(duì)比
二蠢古、交換機(jī)
???????1、Exchanges
???????2别凹、臨時(shí)隊(duì)列
???????3草讶、綁定(bindings)
???????4、Fanout
???????5炉菲、Direct exchange
???????6堕战、Topics
三择同、死信隊(duì)列
???????1格二、死信的概念
???????2、死信的來(lái)源
???????3、死信實(shí)戰(zhàn)
四沉御、延遲隊(duì)列
???????1、延遲隊(duì)列概念
???????2昭灵、延遲隊(duì)列運(yùn)用場(chǎng)景
???????3吠裆、RabbitMQ 中的 TTL
???????4伐谈、整合 springboot
???????5、隊(duì)列 TTL
???????6试疙、延時(shí)隊(duì)列優(yōu)化
???????7诵棵、Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列

一、發(fā)布確定

1祝旷、發(fā)布確認(rèn)原理

生產(chǎn)者將信道設(shè)置成 confirm 模式履澳,一旦信道進(jìn)入 confirm 模式,所有在該信道上面發(fā)布的消
息都將會(huì)被指派一個(gè)唯一的 ID(從 1 開始)怀跛,一旦消息被投遞到所有匹配的隊(duì)列之后距贷,broker 就會(huì)
發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了吻谋,
如果消息和隊(duì)列是可持久化的忠蝗,那么確認(rèn)消息會(huì)在將消息寫入磁盤之后發(fā)出,broker 回傳給生產(chǎn)
者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào)漓拾,此外 broker 也可以設(shè)置basic.ack 的
multiple 域阁最,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。

confirm 模式最大的好處在于他是異步的骇两,一旦發(fā)布一條消息速种,生產(chǎn)者應(yīng)用程序就可以在等信道
返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后低千,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方
法來(lái)處理該確認(rèn)消息哟旗,如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條 nack 消息栋操,
生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該 nack 消息闸餐。

2、發(fā)布確認(rèn)的策略

2.1矾芙、開啟發(fā)布確認(rèn)的方法

發(fā)布確認(rèn)默認(rèn)是沒(méi)有開啟的舍沙,如果要開啟需要調(diào)用方法 confirmSelect,每當(dāng)你要想使用發(fā)布
確認(rèn)剔宪,都需要在 channel 上調(diào)用該方法

        Channel channel = connection.createChannel ( );
        channel.confirmSelect ();

2.2拂铡、單個(gè)確認(rèn)發(fā)布

這是一種簡(jiǎn)單的確認(rèn)方式,它是一種 同步確認(rèn) 發(fā)布的方式葱绒,也就是發(fā)布一個(gè)消息之后只有它
被確認(rèn)發(fā)布感帅,后續(xù)的消息才能繼續(xù)發(fā)布,waitForConfirmsOrDie(long)這個(gè)方法只有在消息被確認(rèn)
的時(shí)候才返回,如果在指定時(shí)間范圍內(nèi)這個(gè)消息沒(méi)有被確認(rèn)那么它將拋出異常地淀。

這種確認(rèn)方式有一個(gè)最大的缺點(diǎn)就是:發(fā)布速度特別的慢失球,因?yàn)槿绻麤](méi)有確認(rèn)發(fā)布的消息就會(huì)
阻塞所有后續(xù)消息的發(fā)布,這種方式最多提供每秒不超過(guò)數(shù)百條發(fā)布消息的吞吐量。當(dāng)然對(duì)于某
些應(yīng)用程序來(lái)說(shuō)這可能已經(jīng)足夠了实苞。

public class ReleaseConfirmation {
    
        public static void main(String[] args) throws Exception{
            // 1.單個(gè)確認(rèn)發(fā)布
            ReleaseConfirmation.publishMessageIndividually();// 耗時(shí)27289ms
        }
    
        private static final int MESSAGE_COUNT = 999;
    
        public static void publishMessageIndividually() throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel ( );
            String queryName = UUID.randomUUID().toString();
            channel.queueDeclare (queryName, false, false, false, null);
    
            // 開始發(fā)布確認(rèn)
            channel.confirmSelect ( );
            long begin = System.currentTimeMillis ( );
    
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                // 發(fā)送消息
                channel.basicPublish ("", queryName, null, message.getBytes ( ));
                // 單個(gè)消息發(fā)送后豺撑,馬上進(jìn)行消息確認(rèn)
                boolean flag = channel.waitForConfirms ( );
                if (flag) {
                    //System.out.println ("消息發(fā)送成功");
                }
            }
    
            long end = System.currentTimeMillis ( );
            System.out.println ("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí)" + (end - begin) +
                    "ms");
        }
    }

2.3、批量確認(rèn)發(fā)布

上面那種方式非常慢黔牵,與單個(gè)等待確認(rèn)消息相比聪轿,先發(fā)布一批消息然后一起確認(rèn)可以極大地
提高吞吐量,當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問(wèn)題時(shí)猾浦,不知道是哪個(gè)消息出現(xiàn)
問(wèn)題了陆错,我們必須將整個(gè)批處理保存在內(nèi)存中,以記錄重要的信息而后重新發(fā)布消息金赦。當(dāng)然這種
方案仍然是同步的音瓷,也一樣阻塞消息的發(fā)布。

public class ReleaseConfirmation {

    public static void main(String[] args) throws Exception {
        // 1.單個(gè)確認(rèn)發(fā)布
        // ReleaseConfirmation.publishMessageIndividually ( );// 耗時(shí)27289ms
        // 2.批量確認(rèn)發(fā)布
        ReleaseConfirmation.publishMessageIndividually2 ( );// 耗時(shí)415ms
        // 
    }

    private static final int MESSAGE_COUNT = 999;

    public static void publishMessageIndividually2() throws Exception {

        Channel channel = RabbitMqUtils.getChannel ( );
        String queryName = UUID.randomUUID ( ).toString ( );
        channel.queueDeclare (queryName, false, false, false, null);

        // 開始發(fā)布確認(rèn)
        channel.confirmSelect ( );
        // 批量確認(rèn)消息大小
        int batchSize = 100;
        long begin = System.currentTimeMillis ( );

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            // 發(fā)送消息
            channel.basicPublish ("", queryName, null, message.getBytes ( ));
            // 判斷達(dá)到 100條消息的時(shí)候素邪,批量確認(rèn)一次
            if (i%batchSize==0) {
                // 發(fā)布確認(rèn)
                channel.waitForConfirms ();
            }
        }

        long end = System.currentTimeMillis ( );
        System.out.println ("發(fā)布" + MESSAGE_COUNT + "個(gè)單獨(dú)確認(rèn)消息,耗時(shí)" + (end - begin) +
                "ms");
    }


2.4外莲、異步確認(rèn)發(fā)布

異步確認(rèn)雖然編程邏輯比上兩個(gè)要復(fù)雜,但是性價(jià)比最高兔朦,無(wú)論是可靠性還是效率都沒(méi)得說(shuō)偷线,
他是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功沽甥,
下面就讓我們來(lái)詳細(xì)講解異步確認(rèn)是怎么實(shí)現(xiàn)的声邦。


image.png
 public static void main(String[] args) throws Exception {
        // 1.單個(gè)確認(rèn)發(fā)布
        // ReleaseConfirmation.publishMessageIndividually ( );// 耗時(shí)27289ms
        // 2.批量確認(rèn)發(fā)布
        // ReleaseConfirmation.publishMessageIndividually2 ( );// 耗時(shí)415ms
        // 3.異步確認(rèn)發(fā)布
        ReleaseConfirmation.publishMessageIndividuallyAsync ();// 耗時(shí)133ms
    }

    private static final int MESSAGE_COUNT = 999;

    public static void publishMessageIndividuallyAsync() throws Exception {

        Channel channel = RabbitMqUtils.getChannel ( );
        String queryName = UUID.randomUUID ( ).toString ( );
        channel.queueDeclare (queryName, false, false, false, null);
        // 開始發(fā)布確認(rèn)
        channel.confirmSelect ( );
        long begin = System.currentTimeMillis ( );


        /**
         * 消息確認(rèn)成功的一個(gè)回調(diào)(函數(shù)式接口)
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println ("確認(rèn)的消息:" + deliveryTag);

        };

        /**
         * 消息確認(rèn)失敗的一個(gè)回調(diào)(函數(shù)式接口)
         * 1. 參數(shù)一:消息的標(biāo)識(shí)(標(biāo)記)
         * 2. 參數(shù)二:是否為批量確認(rèn)
         *
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println ("未確認(rèn)的消息:" + deliveryTag);
        };

        // 消息的監(jiān)聽器:用于監(jiān)聽哪些成功了,哪些失敗了
        channel.addConfirmListener (ackCallback, nackCallback);


        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            // 發(fā)送消息
            channel.basicPublish ("", queryName, null, message.getBytes ( ));
        }

        long end = System.currentTimeMillis ( );
        System.out.println ("發(fā)布" + MESSAGE_COUNT + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) +
                "ms");
    }

2.5摆舟、 如何處理異步未確認(rèn)消息

最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線程訪問(wèn)的隊(duì)列亥曹,
比如說(shuō)用 ConcurrentLinkedQueue 或者 ConcurrentSkipListMap 這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞。

 public static void main(String[] args) throws Exception {
        // 1.單個(gè)確認(rèn)發(fā)布
        // ReleaseConfirmation.publishMessageIndividually ( );// 耗時(shí)27289ms
        // 2.批量確認(rèn)發(fā)布
        // ReleaseConfirmation.publishMessageIndividually2 ( );// 耗時(shí)415ms
        // 3.異步確認(rèn)發(fā)布
        ReleaseConfirmation.publishMessageIndividuallyAsync ();// 耗時(shí)133ms
    }

    private static final int MESSAGE_COUNT = 999;

    public static void publishMessageIndividuallyAsync() throws Exception {

        Channel channel = RabbitMqUtils.getChannel ( );
        String queryName = UUID.randomUUID ( ).toString ( );
        channel.queueDeclare (queryName, false, false, false, null);
        // 開始發(fā)布確認(rèn)
        channel.confirmSelect ( );
        long begin = System.currentTimeMillis ( );

        // 線程安全且有序的的哈希表恨诱,適用于高并發(fā)場(chǎng)景
        ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<> ( );

        /**
         * 消息確認(rèn)成功的一個(gè)回調(diào)(函數(shù)式接口)
         */
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            // 2.處理異步未確認(rèn):刪除掉已經(jīng)確認(rèn)的消息媳瞪,剩下的就是未確認(rèn)的消息了
            if (multiple) {// 消息是批量的
                // headMap(),截取到指定集合
                ConcurrentNavigableMap<Long, String> confirmed = map.headMap (deliveryTag);
                confirmed.clear ();
                System.out.println (map.size ()+":最終:"+map );

            }else {// 單個(gè)消息只刪除對(duì)應(yīng)的序號(hào)鍵
                map.remove (deliveryTag);

            }

            System.out.println ("確認(rèn)的消息:" + deliveryTag);

        };

        /**
         * 消息確認(rèn)失敗的一個(gè)回調(diào)(函數(shù)式接口)
         * 1. 參數(shù)一:消息的標(biāo)識(shí)(標(biāo)記)
         * 2. 參數(shù)二:是否為批量確認(rèn)
         *
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            String message = map.get (deliveryTag);
            System.out.println ("未確認(rèn)的消息序號(hào):" + deliveryTag+",未確認(rèn)的消息內(nèi)容:"+message);
            // 再次發(fā)送失敗的(單次),若是批量則拿map操作
            channel.basicPublish ("", queryName, null, message.getBytes ( ));
            
        };

        // 消息的監(jiān)聽器:用于監(jiān)聽哪些成功了照宝,哪些失敗了
        channel.addConfirmListener (ackCallback, nackCallback);


        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            // 發(fā)送消息
            channel.basicPublish ("", queryName, null, message.getBytes ( ));
            // 1.處理異步未確認(rèn):記錄所有要發(fā)送的消息蛇受,Key:序號(hào),value:消息
            map.put (channel.getNextPublishSeqNo (),message);
        }

        long end = System.currentTimeMillis ( );
        System.out.println ("發(fā)布" + MESSAGE_COUNT + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) +
                "ms");

    }

2.6厕鹃、以上 3 種發(fā)布確認(rèn)速度對(duì)比

單獨(dú)發(fā)布消息:同步等待確認(rèn)兢仰,簡(jiǎn)單,但吞吐量非常有限剂碴。
批量發(fā)布消息:批量同步等待確認(rèn)把将,簡(jiǎn)單,合理的吞吐量忆矛,一旦出現(xiàn)問(wèn)題但很難推斷出是哪條消息出現(xiàn)了問(wèn)題察蹲。
異步處理:最佳性能和資源使用,在出現(xiàn)錯(cuò)誤的情況下可以很好地控制,但是實(shí)現(xiàn)起來(lái)稍微難些

二递览、交換機(jī)

假設(shè)工作隊(duì)列背后叼屠,每個(gè)任務(wù)都恰好交付給一個(gè)消費(fèi)者(工作進(jìn)程)瞳腌。在這一部分中绞铃,我們將做一些完全不同的事情-
(我們將消息傳達(dá)給多個(gè)消費(fèi)者。這種模式稱為 ”發(fā)布/訂閱”)嫂侍。
為了說(shuō)明這種模式儿捧,我們將構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將由兩個(gè)程序組成:第一個(gè)程序?qū)l(fā)出日志消息挑宠,
第二個(gè)程序是消費(fèi)者菲盾。其中我們會(huì)啟動(dòng)兩個(gè)消費(fèi)者,其中一個(gè)消費(fèi)者接收到消息后把日志存儲(chǔ)在磁盤碎浇,

1攒盈、Exchanges

1.1、Exchanges概念

RabbitMQ 消息傳遞模型的核心思想是: 生產(chǎn)者生產(chǎn)的消息從不會(huì)直接發(fā)送到隊(duì)列。實(shí)際上垄提,通常生產(chǎn)者甚至都不知道這些消息傳遞到了哪些隊(duì)列中铡俐。

相反,生產(chǎn)者只能將消息發(fā)送到交換機(jī)(exchange)速妖,交換機(jī)工作的內(nèi)容非常簡(jiǎn)單高蜂,一方面它接收來(lái)
自生產(chǎn)者的消息,另一方面將它們推入隊(duì)列罕容。交換機(jī)必須確切知道如何處理收到的消息备恤。是應(yīng)該把這些消
息放到特定隊(duì)列還是說(shuō)把他們到許多隊(duì)列中還是說(shuō)應(yīng)該丟棄它們。這就的由交換機(jī)的類型來(lái)決定锦秒。


image.png

1.2露泊、Exchanges類型

直接(direct), 主題(topic) ,標(biāo)題(headers) , 扇出(fanout)

1.3、無(wú)名Exchanges

前面部分我們對(duì) exchange 一無(wú)所知旅择,但仍然能夠?qū)⑾l(fā)送到隊(duì)列惭笑,原因是因?yàn)槲覀兪褂玫氖悄J(rèn)交換,我們通過(guò)空字符串("")進(jìn)行標(biāo)識(shí)生真。


image.png

第一個(gè)參數(shù)是交換機(jī)的名稱沉噩。空字符串表示默認(rèn)或無(wú)名稱交換機(jī):
消息能路由發(fā)送到隊(duì)列中其實(shí)是由 routingKey(bindingkey)綁定 key 指定的柱蟀,如果它存在的話

2川蒙、臨時(shí)隊(duì)列(未持久化)

每當(dāng)我們連接到 Rabbit 時(shí),我們都需要一個(gè)全新的空隊(duì)列长已,為此我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱的隊(duì)列畜眨,或者能讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱那就更好了昼牛。其次一旦我們斷開了消費(fèi)者的連接,隊(duì)列將被自動(dòng)刪除康聂。

創(chuàng)建臨時(shí)隊(duì)列的方式:

String queueName = channel.queueDeclare().getQueue();

創(chuàng)建出來(lái)之后長(zhǎng)成這樣:


image.png

3贰健、綁定(bindings)

什么是 bingding 呢,binding 其實(shí)是 exchange 和 queue 之間的橋梁恬汁,它告訴我們 exchange 和那個(gè)隊(duì)
列進(jìn)行了綁定關(guān)系伶椿。比如說(shuō)下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進(jìn)行了綁定


image.png
image.png

1、通過(guò)客戶端創(chuàng)建測(cè)試隊(duì)列


image.png

2蕊连、通過(guò)客戶端創(chuàng)建測(cè)試交換機(jī)悬垃、


image.png

3游昼、用創(chuàng)建好的測(cè)試交換機(jī)綁定測(cè)試隊(duì)列
image.png

4甘苍、Fanout(扇出,也稱發(fā)布/訂閱模式)

Fanout 這種類型非常簡(jiǎn)單烘豌。正如從名稱中猜到的那樣载庭,它是將接收到的所有消息廣播到它知道的
所有隊(duì)列中。系統(tǒng)中默認(rèn)有些 exchange 類型


image.png
根據(jù)下圖的關(guān)系編寫一個(gè)實(shí)戰(zhàn) demo:
image.png

Logs 和臨時(shí)隊(duì)列的綁定關(guān)系如下圖:


image.png

ReceiveLogs01:消費(fèi)者01代碼

package com.kk.rabbitmq.fanout;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

public class ReceiveLogs01 {


    // 交換機(jī)名稱
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)
        channel.exchangeDeclare (EXCHANGE_NAME,"fanout");

        // 聲明一個(gè)臨時(shí)隊(duì)列,作用
        // 1廊佩、隊(duì)列名隨機(jī)
        // 2囚聚、當(dāng)消費(fèi)者斷開與隊(duì)列的連接時(shí),隊(duì)列就自動(dòng)刪除
        String queueName = channel.queueDeclare ( ).getQueue ( );

        // 綁定交換機(jī)與隊(duì)列标锄,參數(shù)
        // 1顽铸、隊(duì)列
        // 2、交換機(jī)
        // 3料皇、Routing key
        channel.queueBind (queueName,EXCHANGE_NAME,"");
        System.out.println ("等待消息接收谓松,把接收到的消息打印出來(lái).............." );

        DeliverCallback deliverCallback = (consumerTag,message)->{
        // 接收消息的回調(diào)
            System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
        };

        // 當(dāng)消費(fèi)者取消消息的回調(diào)
        CancelCallback cancelCallback=(consumerTag)->{

        };

        // 設(shè)置消息接收等回調(diào)
        // 1、隊(duì)列名
        // 2践剂、是否自動(dòng)接收
        // 3鬼譬、接收消息的回調(diào)
        // 4、當(dāng)消費(fèi)者取消消息的回調(diào)
        channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
    }
}


ReceiveLogs01:消費(fèi)者01代碼逊脯,跟01一樣

EmitLog:生產(chǎn)者代碼

package com.kk.rabbitmq.fanout;
import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class EmitLog {

    // 交換機(jī)名稱
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)(已經(jīng)被消費(fèi)者聲明出來(lái)优质,所以生產(chǎn)者不需要重復(fù)聲明)
        //channel.exchangeDeclare (EXCHANGE_NAME, "fanout");

        Scanner sc = new Scanner(System.in);
        System.out.println("請(qǐng)輸入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}

扇出測(cè)試結(jié)果


image.png
image.png
image.png

5、Direct exchange(直接交換機(jī)军洼,也稱路由模式)

5.1巩螃、簡(jiǎn)單說(shuō)明和扇出的區(qū)別

扇出的 routing key 都是相同的,所以生產(chǎn)者發(fā)出匕争,兩個(gè)消費(fèi)者同時(shí)收到避乏,
而直接路由則是兩個(gè) routing key 不相同 ,可以直接交換機(jī)是扇出的變種

5.2汗捡、介紹

上一節(jié)中的我們的日志系統(tǒng)將所有消息廣播給所有消費(fèi)者淑际,對(duì)此我們想做一些改變畏纲,例如我們希
望將日志消息寫入磁盤的程序僅接收嚴(yán)重錯(cuò)誤(errros),而不存儲(chǔ)哪些警告(warning)或信息(info)日志
消息避免浪費(fèi)磁盤空間春缕。Fanout 這種交換類型并不能給我們帶來(lái)很大的靈活性-它只能進(jìn)行無(wú)意識(shí)的
廣播盗胀,在這里我們將使用 direct 這種類型來(lái)進(jìn)行替換,這種類型的工作方式是锄贼,消息只去到它綁定的
routingKey 隊(duì)列中去票灰。(針對(duì)性)


image.png

在上面這張圖中,我們可以看到 X 綁定了兩個(gè)隊(duì)列宅荤,綁定類型是 direct屑迂。隊(duì)列Q1 綁定鍵為 orange,
隊(duì)列 Q2 綁定鍵有兩個(gè):一個(gè)綁定鍵為 black冯键,另一個(gè)綁定鍵為 green.

在這種綁定情況下惹盼,生產(chǎn)者發(fā)布消息到 exchange 上,綁定鍵為 orange 的消息會(huì)被發(fā)布到隊(duì)列
Q1惫确。綁定鍵為 black 和 green 的消息會(huì)被發(fā)布到隊(duì)列 Q2手报,其他消息類型的消息將被丟棄。

5.3改化、多重綁定
image.png

當(dāng)然如果 exchange 的綁定類型是direct掩蛤,但是它綁定的多個(gè)隊(duì)列的 key 如果都相同,在這種情
況下雖然綁定類型是 direct 但是它表現(xiàn)的就和 fanout 有點(diǎn)類似了陈肛,就跟廣播差不多揍鸟。

5.4、實(shí)戰(zhàn):根據(jù)下圖寫出代碼
image.png

image.png

生產(chǎn)者:需要注意句旱,這次寫法和扇出不同阳藻,在生產(chǎn)者創(chuàng)建交換機(jī),所以先啟動(dòng)生產(chǎn)者

package com.kk.rabbitmq.exchange.direct;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class DirectEmitLog {

    // 交換機(jī)名稱
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)(已經(jīng)被消費(fèi)者聲明出來(lái)前翎,所以生產(chǎn)者不需要重復(fù)聲明)
        channel.exchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner sc = new Scanner(System.in);
        System.out.println("請(qǐng)輸入要發(fā)送的 routing key值");
        String message2 = null;
        while (sc.hasNext()) {
            System.out.println("請(qǐng)輸入要發(fā)送的 routing key值");

            String message = sc.nextLine();

            if ("info".equals (message)){
                System.out.println ("請(qǐng)輸入要發(fā)送的內(nèi)容" );
                message2 = sc.nextLine();
            }else if ("warning".equals (message)){
                System.out.println ("請(qǐng)輸入要發(fā)送的內(nèi)容" );
                message2 = sc.nextLine();
            }
            // 參數(shù)二:發(fā)送時(shí)被綁定的 routing key
            channel.basicPublish(EXCHANGE_NAME, message, null, message2.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息:" + message2+",給 "+message);
            continue;
        }
    }
}

消費(fèi)者01

package com.kk.rabbitmq.exchange.direct;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

public class DirectReceiveLogs01 {
    
    // 交換機(jī)名稱
    public static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)(交給生產(chǎn)者聲明)
        //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");

        // 聲明一個(gè)臨時(shí)隊(duì)列,作用
        // 1稚配、隊(duì)列名隨機(jī)
        // 2、當(dāng)消費(fèi)者斷開與隊(duì)列的連接時(shí)港华,隊(duì)列就自動(dòng)刪除
        String queueName = channel.queueDeclare ( ).getQueue ( );

        // 綁定交換機(jī)與隊(duì)列道川,參數(shù)
        // 1、隊(duì)列
        // 2立宜、交換機(jī)
        // 3冒萄、Routing key
        channel.queueBind (queueName,EXCHANGE_NAME,"info");
        System.out.println ("等待消息接收,把接收到的消息打印出來(lái).............." );

        DeliverCallback deliverCallback = (consumerTag,message)->{
        // 接收消息的回調(diào)
            System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
        };

        // 當(dāng)消費(fèi)者取消消息的回調(diào)
        CancelCallback cancelCallback=(consumerTag)->{

        };

        // 設(shè)置消息接收等回調(diào)
        // 1橙数、隊(duì)列名
        // 2尊流、是否自動(dòng)接收
        // 3、接收消息的回調(diào)
        // 4灯帮、當(dāng)消費(fèi)者取消消息的回調(diào)
        channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
    }
}

消費(fèi)者02 同上

測(cè)試結(jié)果


image.png

image.png

image.png

6崖技、Topics(主題交換機(jī)=扇出+直接)

6.1逻住、主題交換機(jī)=扇出+直接

簡(jiǎn)單來(lái)說(shuō)就是,扇出只能廣播全部迎献,直接直接有選擇的單個(gè)瞎访,
主題交換機(jī)則把他們的有點(diǎn)融合了

6.2、Topic 的規(guī)范

1吁恍、routing_key 不能隨意寫扒秸,必須是一個(gè)單詞列表,以點(diǎn)號(hào)分隔開
eg:"stock.usd.nyse"冀瓦, "nyse.vmw"伴奥,"quick.orange.rabbit"

2、單詞列表最多不能超過(guò) 255 個(gè)字節(jié)

3翼闽、
*(星號(hào))可以代替一個(gè)單詞
#(井號(hào))可以替代零個(gè)或多個(gè)單詞

6.3拾徙、Topic 的規(guī)范案例

(1)綁定的是中間帶 orange 帶 3 個(gè)單詞的字符串
(\*.orange.\*)
(2)綁定的是 最后一個(gè)單詞是 rabbit 的 3 個(gè)單詞
(*.*.rabbit)
第一個(gè)單詞是 lazy 的多個(gè)單詞
(lazy.#)

(3)下圖是一個(gè)隊(duì)列綁定關(guān)系圖,我們來(lái)看看他們之間數(shù)據(jù)接收情況是怎么樣的


image.png

quick.orange.rabbit ???------》 ???被隊(duì)列 Q1Q2 接收到
lazy.orange.elephant ???------》 ???被隊(duì)列 Q1Q2 接收到
quick.orange.fox ???------》 ???被隊(duì)列 Q1 接收到
lazy.brown.fox ???------》 ???被隊(duì)列 Q2 接收到
lazy.pink.rabbit ???------》 ???雖然滿足兩個(gè)綁定但只被隊(duì)列 Q2 接收一次
quick.brown.fox ???------》 ???不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄
quick.orange.male.rabbit ???------》 ???是四個(gè)單詞不匹配任何綁定會(huì)被丟棄
lazy.orange.male.rabbit ???------》 ???是四個(gè)單詞但匹配 Q2

注意:

當(dāng)一個(gè)隊(duì)列綁定鍵是#,那么這個(gè)隊(duì)列將接收所有數(shù)據(jù)肄程,就有點(diǎn)像 fanout 了
如果隊(duì)列綁定鍵當(dāng)中沒(méi)有#和*出現(xiàn)锣吼,那么該隊(duì)列綁定類型就是 direct 了

6.4、Topic 實(shí)戰(zhàn)

生產(chǎn)者:

package com.kk.rabbitmq.exchange.topic;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class TopicEmitLog {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel ( );

        // 聲明交換機(jī)類型
        channel.exchangeDeclare (EXCHANGE_NAME,"topic");

        /**
         * Q1-->綁定的是
         * 中間帶 orange 帶 3 個(gè)單詞的字符串(*.orange.*)
         * Q2-->綁定的是
         * 最后一個(gè)單詞是 rabbit 的 3 個(gè)單詞(*.*.rabbit)
         * 第一個(gè)單詞是 lazy 的多個(gè)單詞(lazy.#)
         *
         */
        Map<String, String> bindingKeyMap = new HashMap<> ();
        bindingKeyMap.put("quick.orange.rabbit","被隊(duì)列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant","被隊(duì)列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox","被隊(duì)列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox","被隊(duì)列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個(gè)綁定但只被隊(duì)列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會(huì)被任何隊(duì)列接收到會(huì)被丟棄");
        bindingKeyMap.put("quick.orange.male.rabbit","是四個(gè)單詞不匹配任何綁定會(huì)被丟棄");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四個(gè)單詞但匹配 Q2");

        // 等待消費(fèi)者啟動(dòng)
        Thread.sleep (10000);

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet ( )) {
            String bindingKey = bindingKeyEntry.getKey ( );
            String message = bindingKeyEntry.getValue ( );
            channel.basicPublish (EXCHANGE_NAME,bindingKey,null,message.getBytes ("utf-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}

消費(fèi)者01:

package com.kk.rabbitmq.exchange.topic;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

public class TopicReceiveLogs01 {

    // 交換機(jī)名稱
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)(交給生產(chǎn)者聲明)
        //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");

        //聲明 Q1 隊(duì)列與綁定關(guān)系
        String queueName="Q1";
        channel.queueDeclare (queueName,false,false,false,null);
        channel.queueBind (queueName,EXCHANGE_NAME,"*.orange.*");

        System.out.println ("等待消息接收蓝厌,把接收到的消息打印出來(lái).............." );

        DeliverCallback deliverCallback = (consumerTag,message)->{
        // 接收消息的回調(diào)
            System.out.println ("ReceiveLogs01,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
        };

        // 當(dāng)消費(fèi)者取消消息的回調(diào)
        CancelCallback cancelCallback=(consumerTag)->{

        };

        // 設(shè)置消息接收等回調(diào)
        // 1、隊(duì)列名
        // 2古徒、是否自動(dòng)接收
        // 3拓提、接收消息的回調(diào)
        // 4、當(dāng)消費(fèi)者取消消息的回調(diào)
        channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
    }
}

消費(fèi)者02:

package com.kk.rabbitmq.exchange.topic;

import com.kk.rabbitmq.util.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

public class TopicReceiveLogs02 {

    // 交換機(jī)名稱
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明一個(gè)交換機(jī)(交給生產(chǎn)者聲明)
        //channel.exchangeDeclare (EXCHANGE_NAME,"fanout");

        //聲明 Q1 隊(duì)列與綁定關(guān)系
        String queueName="Q2";
        channel.queueDeclare (queueName,false,false,false,null);
        channel.queueBind (queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind (queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println ("等待消息接收隧膘,把接收到的消息打印出來(lái).............." );

        DeliverCallback deliverCallback = (consumerTag,message)->{
        // 接收消息的回調(diào)
            System.out.println ("ReceiveLogs02,接收到的消息:"+ new String (message.getBody (),"UTF-8")+",當(dāng)前時(shí)間:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date () ));
        };

        // 當(dāng)消費(fèi)者取消消息的回調(diào)
        CancelCallback cancelCallback=(consumerTag)->{

        };

        // 設(shè)置消息接收等回調(diào)
        // 1代态、隊(duì)列名
        // 2、是否自動(dòng)接收
        // 3疹吃、接收消息的回調(diào)
        // 4蹦疑、當(dāng)消費(fèi)者取消消息的回調(diào)
        channel.basicConsume (queueName,true, deliverCallback,cancelCallback);
    }
}

測(cè)試結(jié)果:


image.png
image.png
image.png

三、死信隊(duì)列

1萨驶、死信隊(duì)列概念

先從概念解釋上搞清楚這個(gè)定義歉摧,死信,顧名思義就是無(wú)法被消費(fèi)的消息腔呜,字面意思可以這樣理
解叁温,一般來(lái)說(shuō),producer 將消息投遞到 broker 或者直接到queue 里了核畴,consumer 從 queue 取出消息
進(jìn)行消費(fèi)膝但,但某些時(shí)候由于特定的原因 導(dǎo)致 queue 中的某些消息無(wú)法被消費(fèi),這樣的消息如果沒(méi)有
后續(xù)的處理谤草,就變成了死信跟束,有死信自然就有了死信隊(duì)列莺奸。

應(yīng)用場(chǎng)景:為了保證訂單業(yè)務(wù)的消息數(shù)據(jù)不丟失,需要使用到 RabbitMQ 的死信隊(duì)列機(jī)制冀宴,當(dāng)消息
消費(fèi)發(fā)生異常時(shí)憾筏,將消息投入死信隊(duì)列中.還有比如說(shuō): 用戶在商城下單成功并點(diǎn)擊去支付后在指定時(shí)
間未支付時(shí)自動(dòng)失效

2、死信隊(duì)列原由

1花鹅、消息 TTL 過(guò)期
2氧腰、隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了,無(wú)法再添加數(shù)據(jù)到 mq 中)
3刨肃、消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false

3古拴、死信隊(duì)列實(shí)戰(zhàn)

3.1、代碼架構(gòu)圖

image.png

normal-queue(是普通隊(duì)列)真友,會(huì)因?yàn)檫@三種情況轉(zhuǎn)發(fā)到dead_exchange(死信交換機(jī)),
并且設(shè)置 routingKey(lisi(李四))黄痪,成為死信隊(duì)列(dead-queue)

3.2、消息TTL 過(guò)期

消費(fèi)者C1 代碼盔然,啟動(dòng)之后關(guān)閉該消費(fèi)者 模擬其接收不到消息(第一個(gè)啟動(dòng))

// 消費(fèi)者 1
public class Consumer1 {

    // 普通交換機(jī)名稱
    private final static String NORMAL_EXCHANGE = "normal_exchange";
    // 死信交換機(jī)名稱
    private static final String DEAD_EXCHANGE = "dead_exchange";
    // 聲明普通隊(duì)列
    private final static String NORMAL_QUERY = "normal_query";
    // 聲明死信隊(duì)列
    private static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明死信交換機(jī)和普通交換機(jī)桅打,類型為 direct
        channel.exchangeDeclare (NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare (DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        // 普通隊(duì)列綁定 死信交換機(jī)信息
        Map<String, Object> params = new HashMap<> ( );
        // 普通隊(duì)列設(shè)置 死信交換機(jī)
        params.put ("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 普通隊(duì)列設(shè)置 死信的 routingkey
        params.put ("x-dead-letter-routing-key", "lisi");
        // 聲明普通隊(duì)列 并且根據(jù)參數(shù)信息綁定 死信交換機(jī)
        channel.queueDeclare (NORMAL_QUERY,false,false,false,params);
        channel.queueBind (NORMAL_QUERY, NORMAL_EXCHANGE, "zhangsan");


        // 聲明死信隊(duì)列 并 綁定 死信交換機(jī)與 routingKey
        channel.queueDeclare (DEAD_QUEUE, false, false, false, null);
        channel.queueBind (DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        System.out.println ("C1等待接受消息..." );

        channel.basicConsume (NORMAL_QUERY,true,(consumerTag,message)->{
            System.out.println (new java.lang.String (message.getBody (),"UTF-8"));
        },consumerTag -> {});

    }
}

啟動(dòng)C1后創(chuàng)建,普通和死信交換機(jī)已經(jīng)對(duì)應(yīng)的隊(duì)列和綁定


image.png

生產(chǎn)者(先關(guān)閉已經(jīng)啟動(dòng)好的C1愈案,在啟動(dòng)生產(chǎn)者挺尾,模擬TTL過(guò)期)

// 生產(chǎn)者
public class Productr {

    // 普通交換機(jī)名稱
    private final static String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel ( );

        // 死信消息 設(shè)置TTL值
        // 設(shè)置 10000ms 為 10s

        AMQP.BasicProperties preperties =
                new AMQP.BasicProperties ( ).builder ( ).expiration ("10000").build ( );

        for (int i = 0; i < 11; i++) {
            String message = "info:"+i;
            channel.basicPublish (NORMAL_EXCHANGE,"zhangsan",preperties,message.getBytes ());
        }
    }
}

啟動(dòng)生產(chǎn)者之后,沒(méi)有消費(fèi)者消費(fèi)站绪,過(guò)期10s后進(jìn)入了死信隊(duì)列


image.png

消費(fèi)者C2 代碼(啟動(dòng)C2遭铺,消費(fèi)死信隊(duì)列的消息)

// 消費(fèi)者 2 負(fù)責(zé)接收死信隊(duì)列的消息
public class Consumer2 {
    // 聲明死信隊(duì)列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        System.out.println ("C2等待接受消息..." );

        // 接收死信隊(duì)列的消息
        channel.basicConsume (DEAD_QUEUE,true,(consumerTag,message)->{
            System.out.println (new String (message.getBody (),"UTF-8"));
        },consumerTag -> {});
    }
}

3.3、隊(duì)列達(dá)到最大長(zhǎng)度(在TTL案例的代碼上更改)

改動(dòng)消費(fèi)者C1代碼:增加隊(duì)列(操作的第二步)


image.png

生產(chǎn)者:去掉TTL(啟動(dòng)生產(chǎn)者之前恢准,先關(guān)閉已經(jīng)啟動(dòng)的消費(fèi)者1)(操作的第三步)


image.png

C2代碼不變

注意:?jiǎn)⒂们跋葎h除原本普通隊(duì)列(因?yàn)閰?shù)被修改了)魂挂,多了LIM標(biāo)簽既隊(duì)列長(zhǎng)度限制(操作的第一步)
image.png

3.4、消息被拒(在TTL案例的代碼上更改)

改動(dòng)消費(fèi)者C1代碼:拒絕和接收區(qū)分


image.png

生產(chǎn)者(沒(méi)有TTL的設(shè)置)和C2不變

先啟動(dòng)C1馁筐,然后生產(chǎn)者


image.png

四涂召、延遲隊(duì)列(死信隊(duì)列中的一種)

1、延遲隊(duì)列概念

延時(shí)隊(duì)列,隊(duì)列內(nèi)部是有序的敏沉,最重要的特性就體現(xiàn)在它的延時(shí)屬性上果正,延時(shí)隊(duì)列中的元素是希望
在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō)赦抖,延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的
元素的隊(duì)列舱卡。

2、延遲隊(duì)列運(yùn)用場(chǎng)景

1.訂單在十分鐘之內(nèi)未支付則自動(dòng)取消
2.新創(chuàng)建的店鋪队萤,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品轮锥,則自動(dòng)發(fā)送消息提醒。
3.用戶注冊(cè)成功后要尔,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒舍杜。
4.用戶發(fā)起退款新娜,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
5.預(yù)定會(huì)議后既绩,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議

這些場(chǎng)景都有一個(gè)特點(diǎn)概龄,需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:
發(fā)生訂單生成事件饲握,在十分鐘之后檢查該訂單支付狀態(tài)私杜,然后將未支付的訂單進(jìn)行關(guān)閉;看起來(lái)似乎
使用定時(shí)任務(wù)救欧,一直輪詢數(shù)據(jù)衰粹,每秒查一次,取出需要被處理的數(shù)據(jù)笆怠,然后處理不就完事了嗎铝耻?如果
數(shù)據(jù)量比較少,確實(shí)可以這樣做蹬刷,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求瓢捉,
如果對(duì)于時(shí)間不是嚴(yán)格限制,而是寬松意義上的一周办成,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支
付的賬單泡态,確實(shí)也是一個(gè)可行的方案。但對(duì)于數(shù)據(jù)量比較大诈火,并且時(shí)效性較強(qiáng)的場(chǎng)景兽赁,如:“訂單十
分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多冷守,活動(dòng)期間甚至?xí)_(dá)到百萬(wàn)甚至千萬(wàn)
級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的惊科,很可能在一秒內(nèi)無(wú)法完成所有訂單
的檢查拍摇,同時(shí)會(huì)給數(shù)據(jù)庫(kù)帶來(lái)很大壓力,無(wú)法滿足業(yè)務(wù)要求而且性能低下馆截。


image.png

3后德、RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一個(gè)消息或者隊(duì)列的屬性尸曼,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,
單位是毫秒。換句話說(shuō)树瞭,如果一條消息設(shè)置了 TTL 屬性或者進(jìn)入了設(shè)置TTL 屬性的隊(duì)列,那么這
條消息如果在TTL 設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi)拯勉,則會(huì)成為"死信"豹障。如果同時(shí)配置了隊(duì)列的TTL 和消息的
TTL,那么較小的那個(gè)值將會(huì)被使用宿接,有兩種方式設(shè)置 TTL赘淮。

方式一:消息設(shè)置TTL辕录,針對(duì)每條消息設(shè)置TTL


image.png

方式二:隊(duì)列設(shè)置TTL


image.png

4、整合 springboot

1梢卸、引入坐標(biāo)

    <!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--RabbitMQ 依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 測(cè)試依賴-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2走诞、修改配置文件

spring.rabbitmq.host=106.52.23.202
spring.rabbitmq.port=5672
spring.rabbitmq.username=mykk
spring.rabbitmq.password=aaa666

3、添加Swagger 配置類

package com.kk.rabbit.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket webApiConfig() {
        return new Docket (DocumentationType.SWAGGER_2)
                .groupName ("webApi")
                .apiInfo (webinfo ( ))
                .select ( ).build ( );
    }

    
    // 文檔注釋信息蛤高,沒(méi)什么用
    private ApiInfo webinfo() {
        return new ApiInfoBuilder ( )
                .title ("rabbitmq 接口文檔")
                .description ("本文檔描述了 rabbitmq 微服務(wù)接口定義")
                .version ("1.0")
                .contact (new Contact ("mykk", "http://jianshu.com","763856958@qq.com"))
                .build ( );
    }
}

5蚣旱、隊(duì)列 TTL

1、代碼架構(gòu)圖(P是生產(chǎn)者戴陡,C是消費(fèi)者塞绿,XA,XB,YD均為routingKey )
創(chuàng)建兩個(gè)隊(duì)列 QA 和 QB,兩者隊(duì)列 TTL 分別設(shè)置為 10S 和 40S猜欺,然后在創(chuàng)建一個(gè)交換機(jī) X 和死信交
換機(jī) Y位隶,它們的類型都是direct,創(chuàng)建一個(gè)死信隊(duì)列 QD开皿,它們的綁定關(guān)系如下:


image.png

2涧黄、配置文件類代碼

package com.kk.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {

    // 普通交換機(jī)
    private final static String X_EXCHANGE = "X";
    // 死信交換機(jī)
    private final static String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 隊(duì)列 A
    private final static String QUERY_A = "QA";
    // 隊(duì)列 B
    private final static String QUERY_B = "QB";
    // 死信隊(duì)列
    private final static String QUERY_DEAD_LETTER = "QD";

    // 聲明 xExchange 用于后面注入
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange (X_EXCHANGE);
    }

    // 聲明 yExchange 用于后面注入
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange (Y_DEAD_LETTER_EXCHANGE);
    }

    // 聲明隊(duì)列 A ttl 為 10s 并綁定到對(duì)應(yīng)的死信交換
    @Bean("queryA")
    public Queue queueA(){
        Map<String, Object> args = new HashMap<> ( );
        // 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 聲明當(dāng)前隊(duì)列綁定的死信 routingKey
        args.put ("x-dead-letter-routing-key","YD");
        // 聲明當(dāng)前隊(duì)列的 TTL
        args.put ("x-message-ttl",10000);

        return QueueBuilder.durable (QUERY_A).withArguments (args).build ();
    }

    // 聲明隊(duì)列 B ttl 為 40s 并綁定到對(duì)應(yīng)的死信交換
    @Bean("queryB")
    public Queue queueB(){
        Map<String, Object> args = new HashMap<> ( );
        // 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 聲明當(dāng)前隊(duì)列綁定的死信 routingKey
        args.put ("x-dead-letter-routing-key","YD");
        // 聲明當(dāng)前隊(duì)列的 TTL
        args.put ("x-message-ttl",40000);

        return QueueBuilder.durable (QUERY_B).withArguments (args).build ();
    }


    // 聲明隊(duì)列 A 綁定 X 交換機(jī)
    @Bean
    public Binding queueaBindingX(@Qualifier("queryA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind (queueA).to (xExchange).with ("XA");
    }
    // 聲明隊(duì)列 B 綁定 X 交換機(jī)
    @Bean
    public Binding queuebBindingX(@Qualifier("queryB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind (queueB).to (xExchange).with ("XB");
    }

    // 聲明死信隊(duì)列 QD
    @Bean("queueD")
    public Queue queueD(){
        return new Queue (QUERY_DEAD_LETTER);
    }

    // 聲明死信隊(duì)列 QD 的綁定關(guān)系
    @Bean
    public Binding deadLetterBindingQD(@Qualifier("queueD")Queue queueD,
                                       @Qualifier("yExchange")DirectExchange yExchange){
        return BindingBuilder.bind (queueD).to (yExchange).with ("YD");
    }
}

3、消息生產(chǎn)者

package com.kk.rabbit.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("當(dāng)前時(shí)間:{},發(fā)送一條信息給兩個(gè) TTL 隊(duì)列:{}", new Date (), message);
        rabbitTemplate.convertAndSend ("X","XA","消息來(lái)自 ttl 為 10S 的隊(duì)列: "+message);
        rabbitTemplate.convertAndSend ("X","XB","消息來(lái)自 ttl 為 40S 的隊(duì)列: "+message);
    }

}

4赋荆、消息消費(fèi)者

package com.kk.rabbit.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String (message.getBody (), "utf-8");
        log.info("當(dāng)前時(shí)間:{},收到死信隊(duì)列信息{}", new Date ().toString(), msg);
    }
}

5笋妥、測(cè)試
url輸入內(nèi)容:http://127.0.0.1:8080/ttl/sendMsg/阿k,你好呀

輸出結(jié)果:
當(dāng)前時(shí)間:Thu Jul 29 00:01:37 CST 2021,發(fā)送一條信息給兩個(gè) TTL 隊(duì)列:阿k,你好呀
當(dāng)前時(shí)間:Thu Jul 29 00:01:47 CST 2021,收到死信隊(duì)列信息消息來(lái)自 ttl 為 10S 的隊(duì)列: 阿k,你好呀
當(dāng)前時(shí)間:Thu Jul 29 00:02:17 CST 2021,收到死信隊(duì)列信息消息來(lái)自 ttl 為 40S 的隊(duì)列: 阿k,你好呀

6、總結(jié)和分析
第一條消息在 10S 后變成了死信消息窄潭,然后被消費(fèi)者消費(fèi)掉春宣,第二條消息在 40S 之后變成了死信消息,
然后被消費(fèi)掉嫉你,這樣一個(gè)延時(shí)隊(duì)列就打造完成了

不過(guò)月帝,如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求幽污,就要新增一個(gè)隊(duì)列嚷辅,這里只有 10S 和 40S
兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理距误,那么就需要增加TTL 為一個(gè)小時(shí)的隊(duì)列簸搞,如果是預(yù)定會(huì)議室然
后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求准潭?

此時(shí)延遲隊(duì)列的優(yōu)化來(lái)了趁俊,寶

6、延時(shí)隊(duì)列優(yōu)化(基于死信隊(duì)列優(yōu)化)

1刑然、代碼架構(gòu)圖:新增了一個(gè)隊(duì)列 QC,綁定關(guān)系如下,該隊(duì)列不設(shè)置TTL 時(shí)間


image.png

2寺擂、配置文件類,擴(kuò)展代碼

//------------------------------基于死信隊(duì)列的延遲優(yōu)化擴(kuò)展---start-------------------
    // 普通隊(duì)列(擴(kuò)展)
    private final static String QUERY_C = "QC";

    // 聲明隊(duì)列 C ,綁定死信交換機(jī)(擴(kuò)展)
    @Bean("queryC")
    public Queue queueC(){
        Map<String, Object> args = new HashMap<> ( );
        // 聲明當(dāng)前隊(duì)列綁定的死信交換機(jī)
        args.put ("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 聲明當(dāng)前隊(duì)列綁定的死信 routingKey
        args.put ("x-dead-letter-routing-key","YD");
        // 無(wú)聲明TTL
        return QueueBuilder.durable (QUERY_C).withArguments (args).build ();
    }
    // 聲明隊(duì)列 C 綁定 X 交換機(jī)(擴(kuò)展)
    @Bean
    public Binding queueaCindingX(@Qualifier("queryC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind (queueC).to (xExchange).with ("XC");
    }
//------------------------------基于死信隊(duì)列的延遲優(yōu)化擴(kuò)展---end---------------------

3沽讹、消息生產(chǎn)者般卑,擴(kuò)展代碼


    // 基于死信隊(duì)列的 延遲優(yōu)化測(cè)試
    @GetMapping("sendMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,
                        @PathVariable String ttlTime) {
        log.info("當(dāng)前時(shí)間:{},發(fā)送一條時(shí)長(zhǎng){}毫秒 TTL 信息給隊(duì)列 C:{}", new Date(),ttlTime, message);
        rabbitTemplate.convertAndSend ("X", "XC", message, msg -> {
            msg.getMessageProperties ( ).setExpiration (ttlTime);
            return msg;
        });
    }

4、測(cè)試
發(fā)送請(qǐng)求:

  1. http://127.0.0.1:8080/ttl/sendMsg/測(cè)試基于死信隊(duì)列的延遲優(yōu)化---1/20000
  2. http://127.0.0.1:8080/ttl/sendMsg/測(cè)試基于死信隊(duì)列的延遲優(yōu)化---2/2000

輸出結(jié)果:


image.png

5爽雄、測(cè)試后的分析:
看起來(lái)似乎沒(méi)什么問(wèn)題蝠检,但是在最開始的時(shí)候,就介紹過(guò)如果使用在消息屬性上設(shè)置 TTL 的方式挚瘟,消
息可能并不會(huì)按時(shí)“死亡“叹谁,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列乘盖,
如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng)焰檩,而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行订框。

此刻這個(gè)問(wèn)題就需要插件了析苫,寶

7、Rabbitmq 插件實(shí)現(xiàn)延遲隊(duì)列

概述:
如果不能實(shí)現(xiàn)在消息粒度上的 TTL穿扳,并使其在設(shè)置的TTL 時(shí)間及時(shí)死亡衩侥,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列。
然后插件對(duì)交換機(jī)進(jìn)行擴(kuò)展矛物,即可解決茫死。

1、安裝延時(shí)隊(duì)列插件

第一步:官網(wǎng)上下載 https://www.rabbitmq.com/community-plugins.html履羞,下載
rabbitmq_delayed_message_exchange 插件峦萎,然后解壓放置到 RabbitMQ 的插件目錄。
或者去QQ群:62263397獲取

第二步:將插件拷貝到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins 下
命令: cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

image.png

第三步:安裝插件忆首,然后再重啟 rabbitMQ
安裝插件命令(無(wú)需指定版本號(hào)):rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟命令:systemctl restart rabbitmq-server

此時(shí)客戶端就多了一個(gè)交換機(jī)類型


image.png

2爱榔、代碼架構(gòu)圖
在這里新增了一個(gè)隊(duì)列delayed.queue,一個(gè)自定義交換機(jī) delayed.exchange,綁定關(guān)系如下:


image.png

3糙及、配置文件類代碼
在我們自定義的交換機(jī)中搓蚪,這是一種新的交換類型,該類型消息支持延遲投遞機(jī)制 消息傳遞后并
不會(huì)立即投遞到目標(biāo)隊(duì)列中丁鹉,而是存儲(chǔ)在 mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,當(dāng)達(dá)到投遞時(shí)間時(shí)悴能,
才投遞到目標(biāo)隊(duì)列中揣钦。

package com.kk.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {

    // 隊(duì)列
    private final static String DELAYED_QUEUE_NAME = "delayed.queue";
    // 交換機(jī)
    private final static String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // routingKey
    private final static String DELAYED_ROUTING_KEY = "delayed.routingkey";

    // bean若沒(méi)有指定名字,方法名默認(rèn)就是

    // 聲明延遲隊(duì)列
    @Bean
    public Queue delayedQueue(){
        return new Queue (DELAYED_QUEUE_NAME);
    }

    // 聲明交換機(jī)漠酿,基于插件的自定義交換機(jī)
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<> ( );
        // 設(shè)置延遲類型為直接類型
        arguments.put ("x-delayed-type","direct");

        /**
         * 1. 交換機(jī)名稱
         * 2. 交換機(jī)類型
         * 3. 是否需要持久化
         * 4. 是否需要自動(dòng)刪除
         * 5. 其他參數(shù)
         */
        return new CustomExchange (DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
    }

    // 延遲隊(duì)列綁定延遲交換機(jī)
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue")Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange customExchange){
        // noargs 構(gòu)建
        return BindingBuilder.bind (queue).to (customExchange).with (DELAYED_ROUTING_KEY).noargs ();
    }
}

4冯凹、生產(chǎn)者代碼

    // 基于延遲交換機(jī)
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,
                        @PathVariable Integer delayTime) {

        rabbitTemplate.convertAndSend ("delayed.exchange", "delayed.routingkey", message, correlationData -> {
            correlationData.getMessageProperties ().setDelay (delayTime);
            return correlationData;
        });

        log.info (" 當(dāng) 前 時(shí) 間 : {}, 發(fā) 送 一 條 延 遲 {} 毫秒的信息給隊(duì)列 delayed.queue:{}", new
                Date ( ), delayTime, message);
    }

5、消費(fèi)者代碼

    public static final String DELAYED_QUEUE_NAME = "delayed.queue";

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) throws Exception {
        String msg = new String (message.getBody (), "utf-8");
        log.info("當(dāng)前時(shí)間:{},收到延時(shí)隊(duì)列的消息:{}", new Date().toString(), msg);
    }

6、測(cè)試
發(fā)送請(qǐng)求:

  1. http://127.0.0.1:8080/ttl/sendDelayMsg/測(cè)試基于插件的延遲交換機(jī)---1/20000
  2. http://127.0.0.1:8080/ttl/sendDelayMsg/測(cè)試基于插件的延遲交換機(jī)---2/2000

輸出結(jié)果:第二個(gè)消息被先消費(fèi)掉了宇姚,符合預(yù)期


image.png

7匈庭、結(jié)論

延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用
RabbitMQ 的特性浑劳,如:消息可靠發(fā)送阱持、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正
確處理的消息不會(huì)被丟棄魔熏。另外衷咽,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題蒜绽,不會(huì)因?yàn)?br> 單個(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失镶骗。
當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇躲雅,比如利用 Java 的 DelayQueue鼎姊,利用 Redis 的 zset,利用 Quartz
或者利用 kafka 的時(shí)間輪相赁,這些方式各有特點(diǎn),看需要適用的場(chǎng)景

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末相寇,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子噪生,更是在濱河造成了極大的恐慌裆赵,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跺嗽,死亡現(xiàn)場(chǎng)離奇詭異战授,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)桨嫁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門植兰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)璃吧,“玉大人楣导,你說(shuō)我怎么就攤上這事⌒蟀ぃ” “怎么了筒繁?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)巴元。 經(jīng)常有香客問(wèn)我毡咏,道長(zhǎng),這世上最難降的妖魔是什么逮刨? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任呕缭,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘恢总。我一直安慰自己迎罗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布片仿。 她就那樣靜靜地躺著纹安,像睡著了一般。 火紅的嫁衣襯著肌膚如雪滋戳。 梳的紋絲不亂的頭發(fā)上钻蔑,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音奸鸯,去河邊找鬼咪笑。 笑死,一個(gè)胖子當(dāng)著我的面吹牛娄涩,可吹牛的內(nèi)容都是我干的窗怒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼蓄拣,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扬虚!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起球恤,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤辜昵,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后咽斧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體堪置,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年张惹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了舀锨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宛逗,死狀恐怖坎匿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情雷激,我是刑警寧澤替蔬,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站屎暇,受9級(jí)特大地震影響进栽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜恭垦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧番挺,春花似錦唠帝、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至粪摘,卻和暖如春瀑晒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背徘意。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工苔悦, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人椎咧。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓玖详,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親勤讽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蟋座,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359