MQ(二) - RabbitMQ消息模型

1.消息模型

根據(jù)官方文檔得知,RabbitMQ有七種消息模型:


image-20200420162349911.png

image-20200420162457226.png

image-20200420162430281.png

1.1 Hello World消息模型

1.1.1 介紹

image-20200420162810992.png

翻譯成中文如下:

RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息囤屹。你可以把它想象成一個(gè)郵局:當(dāng)你把你想寄出的郵件放進(jìn)一個(gè)郵箱里時(shí)觅廓,你可以確信郵件的收件人最終會(huì)收到郵件漓柑。在這個(gè)類比中竟宋,RabbitMQ是一個(gè)郵箱膳帕、一個(gè)郵局和一個(gè)郵遞員粘捎。

RabbitMQ與郵局的主要區(qū)別在于,它不處理紙張备闲,而是接受晌端、存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制的數(shù)據(jù)信息塊。

1.1.2 模型圖

image-20200420162548840.png
  • P (Produce) 生產(chǎn)者恬砂,主要是生產(chǎn)消息咧纠,以及發(fā)送消息。說(shuō)白了就是一個(gè)發(fā)送消息的應(yīng)用程序


    image-20200420163700439.png
  • 隊(duì)列 (queue) 隊(duì)列是RabbitMQ中的郵箱的名稱泻骤。盡管消息流經(jīng)RabbitMQ和您的應(yīng)用程序漆羔,但它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制的約束狱掂,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)演痒。許多生產(chǎn)者可以將消息發(fā)送到一個(gè)隊(duì)列,而許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)
image-20200420163826815.png
  • C (consumer) 消費(fèi)和接受有著相似的含義趋惨。消費(fèi)者者是一個(gè)主要等待接收消息的程序


    image-20200420163923433.png

    注意:生產(chǎn)者鸟顺、消費(fèi)者和代理不必駐留在同一主機(jī)上;事實(shí)上器虾,在大多數(shù)應(yīng)用程序中讯嫂,它們不必駐留在同一主機(jī)上。應(yīng)用程序也可以同時(shí)是生產(chǎn)者和消費(fèi)者兆沙。

1.1.3 代碼實(shí)現(xiàn)

接下來(lái)我們采用java語(yǔ)言編寫生產(chǎn)者程序欧芽,以及消費(fèi)者程序來(lái)感受一下其魅力。

生者者:Send

消費(fèi)者:Consumer

  • 生產(chǎn)者代碼實(shí)現(xiàn):

    • 模型圖:


      image-20200420164345046.png
由圖中可知葛圃,生產(chǎn)者不僅要生產(chǎn)消息千扔,還要將消息發(fā)送到指定隊(duì)列:
  • 創(chuàng)建 springboot項(xiàng)目(wangzh-rabbitmq)


    image-20200420164715769.png
  • 導(dǎo)入amqp依賴

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 編寫工具類,用來(lái)獲取連接

    package com.mq.rabbit.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class ConnectionUtil {
        public static Connection getConnection() throws Exception {
            // 1. 創(chuàng)建連接工廠库正,用來(lái)獲取連接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2. 設(shè)置基本信息
            // 設(shè)置rabbitmq所在地址
            connectionFactory.setHost("192.168.169.130");
            // 設(shè)置用戶名曲楚,我們先前創(chuàng)建了一個(gè)wangzh的用戶
            connectionFactory.setUsername("wangzh");
            // 設(shè)置密碼
            connectionFactory.setPassword("wangzh");
            // 設(shè)置端口 這個(gè)端口是amqp協(xié)議的端口
            connectionFactory.setPort(5672);
            return connectionFactory.newConnection();
        }
    }
    
    
  • 編寫發(fā)送端程序

    package com.mq.rabbit.helloworld;
    
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
        /**
         * 隊(duì)列名字
         */
        private static final String QUEUE_NAME="hello_word";
    
        public static void main(String[] args) {
            try {
                // 1. 獲取連接
                Connection connection = ConnectionUtil.getConnection();
                /*
                 *   2. 創(chuàng)建通道
                 *      生產(chǎn)者發(fā)送消息到隊(duì)列中需要借助通道
                 */
                Channel channel = connection.createChannel();
                /**
                 *  3. (創(chuàng)建)聲明隊(duì)列
                 *    如果名字所對(duì)應(yīng)的隊(duì)列存在,那么就不存創(chuàng)建隊(duì)列褥符,而是去時(shí)使用現(xiàn)成對(duì)的隊(duì)列
                 *    如果名字對(duì)應(yīng)的對(duì)應(yīng)不存在洞渤,那么就去創(chuàng)建隊(duì)列
                 *    第一個(gè)參數(shù): 隊(duì)列的名字
                 *    第二個(gè)參數(shù): 是否聲明一個(gè)持久化隊(duì)列,true表示會(huì)將消息持久化
                 *    第三個(gè)參數(shù): 是否聲明一個(gè)獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列属瓣,斷開后自動(dòng)刪除), true表示聲明成獨(dú)占隊(duì)列
                 *    第四個(gè)參數(shù): 是否聲明一個(gè)刪除隊(duì)列(消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列),true表示聲明成刪除隊(duì)列
                 *    第五個(gè)參數(shù): 隊(duì)列其他參數(shù)
                 */
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                // 消息
                String msg = "I am OK";
                /**
                 * 將消息存入隊(duì)列中
                 *  第一個(gè)參數(shù):使交換機(jī)的名字 我們后面再將交換機(jī)
                 *  第二個(gè)參數(shù):隊(duì)列映射的路由key,我們后面再講
                 *  第三個(gè)參數(shù): 隊(duì)列消息其他屬性
                 *  第四個(gè)參數(shù): 發(fā)送消息的主體
                */
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    
                System.out.println("發(fā)送成功");
                // 關(guān)閉資源
                channel.close();
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
  • 執(zhí)行程序


    image-20200421093625708.png
  • 查看管理頁(yè)面


    image-20200421093724551.png
image-20200421093801454.png
image-20200421094139819.png

通過(guò)上圖我們可以看到當(dāng)生產(chǎn)者發(fā)送消息到隊(duì)列中時(shí)载迄,管理界面就能看到這個(gè)隊(duì)列,以及隊(duì)列里面的消息數(shù)抡蛙。

注意:我們只是在控制臺(tái)看到消息护昧,并不會(huì)去消費(fèi)這個(gè)消息。

  • 編寫消費(fèi)者

    package com.mq.rabbit.helloworld;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer {
        private static final String QUEUE_NAME="hello_word";
    
        public static void main(String[] args) throws Exception {
            // 1.獲取連接
            Connection connection = ConnectionUtil.getConnection();
            // 2.創(chuàng)建通道粗截,消費(fèi)者從隊(duì)列中獲取消息也是借助通道
            Channel channel = connection.createChannel();
            /*
             *   3.聲明隊(duì)列
             *      如果隊(duì)列不存在就會(huì)創(chuàng)建隊(duì)列
             *      由于我們?cè)谏a(chǎn)者者那邊已經(jīng)創(chuàng)建好了隊(duì)列
             *      那么消費(fèi)者這邊就不會(huì)創(chuàng)建隊(duì)列
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            /*
             *  4. 監(jiān)聽隊(duì)列惋耙,如果隊(duì)列中有消息,就直接拿過(guò)來(lái)
             *      第一個(gè)參數(shù):隊(duì)列名字
             *      第二個(gè)參數(shù):是否進(jìn)行消息自動(dòng)確認(rèn)熊昌,后面我們講ack參數(shù)時(shí)再說(shuō)
             *      第三個(gè)參數(shù):回調(diào)對(duì)象绽榛,從隊(duì)列中主動(dòng)獲取消息
             */
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
                /*
                 * consumerTag:消費(fèi)者標(biāo)簽與消費(fèi)者相關(guān)
                 * envelope:消息的打包數(shù)據(jù)
                 * properties:消息的頭部數(shù)據(jù)
                 * body:消息主體
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(consumerTag);
                    System.out.println(envelope);
                    System.out.println(properties);
                    System.out.println("消費(fèi)的消息:" + new String(body));
                }
            });
        }
    }
    
  • 執(zhí)行結(jié)果


    image-20200421101003782.png

    image-20200421101051070.png

由上圖可知:當(dāng)消息被消費(fèi)后,隊(duì)列里面就沒有這一條消息了婿屹。同時(shí)消費(fèi)者的應(yīng)用程序并沒有停止灭美,而是一致在運(yùn)行著,一致在監(jiān)聽隊(duì)列昂利。

自此我們一個(gè)簡(jiǎn)單的Helloword消息模型就寫完了

1.1.4 ACK 機(jī)制

我們來(lái)思考一下有沒有上述的例子什么問(wèn)題届腐??蜂奸?

1.消費(fèi)者當(dāng)消費(fèi)消息后犁苏,MQ就會(huì)把隊(duì)列中的消息刪除,那么MQ怎么就知道消息被消費(fèi)了呢扩所?

2.當(dāng)消費(fèi)者領(lǐng)取消息后围详,還沒有消費(fèi)就掛掉了,或者是發(fā)生異常祖屏,那么MQ就無(wú)法得知消息有沒有被消費(fèi)掉助赞。

為了解決上述問(wèn)題,RabbitMQ提供了一個(gè)消息確認(rèn)機(jī)制(ACK機(jī)制)赐劣,當(dāng)消費(fèi)者把隊(duì)列中的消息消費(fèi)以后嫉拐,會(huì)向Rabbi發(fā)送一個(gè)ACK,告訴MQ消息已經(jīng)被消費(fèi)了,你可以把消息刪除了魁兼。

不過(guò)這種發(fā)送ACK有兩種方式:

  • 自動(dòng)發(fā)送ACK:消息一旦被接收婉徘,自動(dòng)向MQ發(fā)送ACK

    • 代碼實(shí)現(xiàn):

      image-20200421104826462.png

      如圖,當(dāng)設(shè)置為true時(shí)咐汞,就會(huì)當(dāng)消費(fèi)者消費(fèi)完消息盖呼,自動(dòng)的向發(fā)送ACK

    • 缺陷:

      為了演示我先向MQ中發(fā)送一條消息:

      image-20200421105032433.png

      接下來(lái)修改我們消費(fèi)者的代碼:


      image-20200421105412559.png
    @Override
   public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        int a = 10 / 0;
        System.out.println("消費(fèi)的消息:" + new String(body));
   }

運(yùn)行結(jié)果:


image-20200421105523908.png
image-20200421105543290.png

我們發(fā)現(xiàn)在消費(fèi)消息之前,拋出了異常化撕,也就是說(shuō)我們消息還沒有被消費(fèi)几晤,此時(shí)MQ就把隊(duì)列中的消息給刪除了。說(shuō)明消息丟失了植阴。

  • 手動(dòng)ACK:消息接收后蟹瘾,不會(huì)自動(dòng)發(fā)送ACK圾浅,需要手動(dòng)發(fā)送

    • 準(zhǔn)備工作

      為了演示,我們向MQ中發(fā)送一條消息


      image-20200421110251192.png
    • 修改消費(fèi)者代碼

      package com.mq.rabbit.helloworld;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer {
          private static final String QUEUE_NAME = "hello_word";
      
          public static void main(String[] args) throws Exception {
              // 1.獲取連接
              Connection connection = ConnectionUtil.getConnection();
              // 2.創(chuàng)建通道憾朴,消費(fèi)者從隊(duì)列中獲取消息也是借助通道
              Channel channel = connection.createChannel();
              /*
               *   3.聲明隊(duì)列
               *      如果隊(duì)列不存在就會(huì)創(chuàng)建隊(duì)列
               *      由于我們?cè)谏a(chǎn)者者那邊已經(jīng)創(chuàng)建好了隊(duì)列
               *      那么消費(fèi)者這邊就不會(huì)創(chuàng)建隊(duì)列
               */
              channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      
              /*
               *  4. 監(jiān)聽隊(duì)列狸捕,如果隊(duì)列中有消息,就直接拿過(guò)來(lái)
               *      第一個(gè)參數(shù):隊(duì)列名字
               *      第二個(gè)參數(shù):是否進(jìn)行消息自動(dòng)確認(rèn),false代表不再向MQ發(fā)送ACK
               *      第三個(gè)參數(shù):回調(diào)對(duì)象众雷,從隊(duì)列中主動(dòng)獲取消息
               */
              channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                  /*
                   * consumerTag:消費(fèi)者標(biāo)簽與消費(fèi)者相關(guān)
                   * envelope:消息的打包數(shù)據(jù)
                   * properties:消息的頭部數(shù)據(jù)
                   * body:消息主體
                   */
      
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消費(fèi)的消息:" + new String(body));
                      /*
                       * 1. 第一個(gè)參數(shù)是 傳輸?shù)臉?biāo)簽
                       * 2. 是否要確認(rèn)所有的消息 true:確認(rèn)所有信息灸拍,包括提供的傳輸標(biāo)簽
                       *                         false: 僅確認(rèn)提供的傳輸標(biāo)簽
                       */
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      

      這樣就實(shí)現(xiàn)了手動(dòng)發(fā)送ACK

  • 對(duì)比

    上述可知,兩種發(fā)送ACK的方式砾省。那么我們到底用哪種方式:

    • 如果消息不是特別重要鸡岗,即使丟失了對(duì)系統(tǒng)沒有什么影響,那么采用ACK比較方便
    • 如果消息非常重要编兄,不允許丟失轩性,那么最好選擇手動(dòng)發(fā)送ACK。

2.work模型

work模型稱為:工作隊(duì)列模式

2.1介紹

2.1.1 模型圖

image-20200421141641967.png

2.1.2 官方介紹

image-20200421141813471.png

大概意思如下:

在第一個(gè)教程中翻诉,我們編寫了從命名隊(duì)列發(fā)送和接收消息的程序炮姨。在本例中,我們將創(chuàng)建一個(gè)工作隊(duì)列碰煌,用于在多個(gè)工人之間分發(fā)耗時(shí)的任務(wù)舒岸。

工作隊(duì)列(也稱為任務(wù)隊(duì)列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),而必須等待它完成芦圾。相反蛾派,我們把任務(wù)安排在以后完成。我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列个少。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)洪乍。當(dāng)您運(yùn)行許多工作人員時(shí),任務(wù)將在他們之間共享夜焦。

這個(gè)概念在web應(yīng)用程序中特別有用壳澳,因?yàn)樵诙痰腍TTP請(qǐng)求窗口中無(wú)法處理復(fù)雜的任務(wù)。

接下來(lái)我們用java代碼去模擬這個(gè)過(guò)程:

P 生產(chǎn)者: 發(fā)布任務(wù)(生產(chǎn)消息)

image-20200421142725181.png

C1 消費(fèi)者1: 獲取任務(wù)并完成任務(wù)

C2 消費(fèi)者2: 獲取任務(wù)并完成任務(wù)

2.2 編碼實(shí)現(xiàn)

2.2.1 生產(chǎn)者

  • 代碼

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            /*
            *  生產(chǎn)者發(fā)布20個(gè)任務(wù)
            */
            for (int i = 1; i <= 20; i++) {
                String msg = "hello work: " + i;
                channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    }
    
    

2.2.2 消費(fèi)者1

  • 代碼

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
    
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        System.out.println("消費(fèi)者1:" + new String(body));
                        // 耗時(shí)操作
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }
    
    

2.2.3 消費(fèi)者2

  • 代碼

    package com.mq.rabbit.work;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
    
        private static final String QUEUE_NAME = "hello_work";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者2:" + new String(body));
                        channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }
    
    

2.2.4 結(jié)果分析

  • 先執(zhí)行消費(fèi)者1和消費(fèi)者2茫经,然后再執(zhí)行生產(chǎn)者


    image-20200421152445061.png
image-20200421152501650.png

生產(chǎn)者總共發(fā)布了20條消息巷波,其中消費(fèi)者1和消費(fèi)者2分別消費(fèi)了10條。這就是工作隊(duì)列機(jī)制卸伞,將消息數(shù)平分給不同的消費(fèi)者去消費(fèi)抹镊。

2.2.5 存在的問(wèn)題

通過(guò)上述例子發(fā)現(xiàn)以下幾個(gè)問(wèn)題:

  • 消費(fèi)者1去處理消息比較耗時(shí),消費(fèi)者2處理的消息比較快荤傲。但是他們處理的消息量是一樣垮耳。

  • 當(dāng)消費(fèi)者2處理完成以后,一直處于空閑狀態(tài),而消息1卻一直在忙碌

這明顯是不合理的终佛。按照正確的做法應(yīng)該是消費(fèi)者2處理消息快俊嗽,多分配一些消息去處理。消費(fèi)者1處理消息慢就少分配一些消息铃彰,能者多勞乌询。那么該怎么去實(shí)現(xiàn)呢?

RabbitMQ中提供了一個(gè)basicQos方法以及 prefetchCount=1設(shè)置豌研。其功能就是告訴MQ一次不要向消費(fèi)者發(fā)送多條消息,等消息者把消息處理并確認(rèn)完成唬党。才會(huì)再次發(fā)送下一條消息鹃共。相反,如果消費(fèi)者還是處于忙率中驶拱,那么MQ就會(huì)把消息分派給不是很忙碌的消費(fèi)者霜浴。

2.2.6 改造消費(fèi)者

  • 代碼


    image-20200421154754149.png
package com.mq.rabbit.work;

import com.mq.rabbit.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {

    private static final String QUEUE_NAME = "hello_work";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.basicQos(1);
        
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("消費(fèi)者1:" + new String(body));
                    // 耗時(shí)操作
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

  • 啟動(dòng)測(cè)試


    image-20200421154924090.png
image-20200421154958331.png

這樣我們就實(shí)現(xiàn)了能者多勞

3.發(fā)布/訂閱模型

3.1 思考

通過(guò)上述模型我們可以指導(dǎo),同一條消息只能發(fā)送給一個(gè)消費(fèi)者蓝纲,但如果說(shuō)我想要把一個(gè)消息發(fā)給多個(gè)消費(fèi)者阴孟,這又該怎么做呢?

3.2 介紹

3.2.1 官方介紹

image-20200421161335546.png

大體意思如下:

在之前的模式中税迷,我們創(chuàng)建了一個(gè)工作隊(duì)列永丝。工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)都被精確地傳遞給一個(gè)工人。在這一部分中箭养,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者傳遞一條消息慕嚷。這種模式稱為“發(fā)布/訂閱”。

3.2.2 模型圖

image-20200421161949840.png

生產(chǎn)者把消息發(fā)送給交換機(jī)X(圖中藍(lán)紫色部分)毕泌,交換機(jī)X將消息轉(zhuǎn)發(fā)到不同的隊(duì)列中喝检。

  • 1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
  • 每一個(gè)消費(fèi)者都有自己的隊(duì)列
  • 生產(chǎn)者是將消息發(fā)送到交換機(jī)撼泛,交換機(jī)把消息轉(zhuǎn)發(fā)到了隊(duì)列
  • 每一個(gè)隊(duì)列都需要綁定交換機(jī)
  • 一條消息被多個(gè)消費(fèi)者消費(fèi)

3.2.3 交換機(jī)

  • 介紹


    image-20200421162256433.png

    大體意思如下:

    交換機(jī)

    在之前的模型中挠说,我們直接向隊(duì)列發(fā)送和接收消息。現(xiàn)在是時(shí)候在Rabbit中引入完整的消息傳遞模型了愿题。

    • 生產(chǎn)者是發(fā)送消息的用戶應(yīng)用程序损俭。

    • 隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。

    • 消費(fèi)者是接收消息的用戶應(yīng)用程序抠忘。

    RabbitMQ消息傳遞模型的核心思想是撩炊,生產(chǎn)者從不將任何消息直接發(fā)送到隊(duì)列。實(shí)際上崎脉,生產(chǎn)者常常根本不知道消息是否會(huì)被傳遞到任何隊(duì)列拧咳。

    相反,生產(chǎn)者只能向交換機(jī)發(fā)送消息囚灼。一方面交換機(jī)接收來(lái)自生產(chǎn)者的消息骆膝,另一方面它將它們推送到隊(duì)列中祭衩。

3.2.4 交換機(jī)類型

  • Fanout 廣播,將消息轉(zhuǎn)發(fā)到所有綁定交換機(jī)的隊(duì)列上
  • Direct 定向阅签,將消息轉(zhuǎn)發(fā)到符合指定routing key的隊(duì)列上
  • Topic 通配符掐暮, 把 消息轉(zhuǎn)發(fā)符合routing pattern(路由模式)的隊(duì)列

注意:Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力政钟,因此如果沒有任何隊(duì)列與Exchange綁定路克,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失养交!

3.2.5 發(fā)布/訂閱模型-Fanout

  • 介紹

    Fanout類型也稱為廣播類型精算,這種類型有以下特點(diǎn):

    • 每個(gè)隊(duì)列都要綁定到交換機(jī),且生產(chǎn)者發(fā)送的消息只能發(fā)送到交換機(jī)碎连,由交換機(jī)決定將消息發(fā)送到哪個(gè)隊(duì)列灰羽。生產(chǎn)者無(wú)法決定,甚至生產(chǎn)者都不知道消息被轉(zhuǎn)發(fā)到了哪個(gè)隊(duì)列上


      image-20200421163949519.png
    • 每一個(gè)消費(fèi)者都需要有自己的隊(duì)列鱼辙,可以有多個(gè)消費(fèi)者


      image-20200421164114761.png
    • 交換機(jī)會(huì)把所有消息轉(zhuǎn)發(fā)到每一個(gè)綁定到交換機(jī)上的隊(duì)列


      image-20200421164147200.png
  • 編碼實(shí)現(xiàn)

    • 生產(chǎn)者

      1. 生產(chǎn)者跟隊(duì)列沒有關(guān)系廉嚼,只跟交換機(jī)有關(guān)系
      2. 發(fā)送消息發(fā)送到交換機(jī),不是隊(duì)列上
      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import org.springframework.amqp.core.ExchangeTypes;
      
      public class Producer {
          private static final String EXCHANGE_NAME = "amq.fanout";
          public static void main(String[] args) throws Exception {
              // 1.獲取連接
              Connection connection = ConnectionUtil.getConnection();
              // 2. 創(chuàng)建通道
              Channel channel = connection.createChannel();
              /*
               *   3.聲明交換機(jī)
               *     第一個(gè)參數(shù):交換機(jī)名字
               *      第二個(gè)參數(shù): 交換機(jī)類型
               */
              channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
              String msg = "hello exchange";
              channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
              channel.close();
              connection.close();
          }
      }
      
      
    • 消費(fèi)者1

      1.消費(fèi)者需要綁定到隊(duì)列上倒戏,每一個(gè)消費(fèi)者有自己的隊(duì)列

      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer1 {
          private static final String QUEUE_NAME = "consumer_queue_1";
          private static final String EXCHANGE_NAME = "amq.fanout";
      
          public static void main(String[] args) throws Exception {
              // 1.獲取連接
              Connection connection = ConnectionUtil.getConnection();
              // 2.創(chuàng)建通道
              Channel channel = connection.createChannel();
              // 3.聲明隊(duì)列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              /*
               * 4.將隊(duì)列綁定到交換機(jī)
               *     第一個(gè)參數(shù) 隊(duì)列名字
               *     第二個(gè)參數(shù) 交換機(jī)名字
               *     第三個(gè)參數(shù) 路由key 后面再說(shuō)這個(gè)
               */
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              // 5. 監(jiān)聽隊(duì)列獲取消息
              channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消費(fèi)者1:" + new String(body));
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
      
    • 消費(fèi)者2

      package com.mq.rabbit.fanout;
      
      import com.mq.rabbit.util.ConnectionUtil;
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      
      public class Consumer2 {
          private static final String QUEUE_NAME = "consumer_queue_2";
          private static final String EXCHANGE_NAME = "amq.fanout";
      
          public static void main(String[] args) throws Exception {
              // 1.獲取連接
              Connection connection = ConnectionUtil.getConnection();
              // 2.創(chuàng)建通道
              Channel channel = connection.createChannel();
              // 3.聲明隊(duì)列
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
              /*
               * 4.將隊(duì)列綁定到交換機(jī)
               *     第一個(gè)參數(shù) 隊(duì)列名字
               *     第二個(gè)參數(shù) 交換機(jī)名字
               *     第三個(gè)參數(shù) 路由key 后面再說(shuō)這個(gè)
               */
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
      
              // 5. 監(jiān)聽隊(duì)列獲取消息
              channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.out.println("消費(fèi)者2:" + new String(body));
                      channel.basicAck(envelope.getDeliveryTag(),false);
                  }
              });
          }
      }
      
      
    • 啟動(dòng)測(cè)試:

      • 如果先啟動(dòng)生產(chǎn)者怠噪,那么就會(huì)創(chuàng)建一個(gè)交換機(jī),并且給交換機(jī)發(fā)送消息峭梳,但是我們此時(shí)還沒有啟動(dòng)消費(fèi)者舰绘,所以交換機(jī)里面的消息也會(huì)丟失

      • 如果先啟動(dòng)消費(fèi)者,那么隊(duì)列綁定的交換機(jī)并不存在葱椭,所以也沒法綁定捂寿,從而拋出異常


        image-20200422092203108.png
        Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'fanout2_exchange' in vhost '/', class-id=50, method-id=20)
          at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
          at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
          at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
          at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
          at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
          at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
          at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
          at java.lang.Thread.run(Thread.java:745)
        
        
      • 解決辦法

        先啟動(dòng)一次生產(chǎn)者,創(chuàng)建交換機(jī)孵运,創(chuàng)建交換機(jī)秦陋,由于交換機(jī)不能存儲(chǔ)消息。所以消息就會(huì)丟失


        image-20200422092430797.png
  再啟動(dòng)消費(fèi)者
image-20200422092556522.png

我們可以看到消費(fèi)者并沒有消費(fèi)消息治笨,因?yàn)榻粨Q機(jī)里面已經(jīng)沒有消息了驳概。


image-20200422092643891.png

交換機(jī)也也綁定了隊(duì)列。此時(shí)我們?cè)賳?dòng)一次生產(chǎn)者旷赖,由于交換機(jī)已經(jīng)存在顺又,所以就會(huì)往交換機(jī)里發(fā)送消息


image-20200422092758096.png

image-20200422092829295.png

當(dāng)然如果不想這么麻煩,也可以使用MQ提供的交換機(jī)等孵。如下:
image-20200422092917259.png

4.Routing模型

4.1 介紹

Routing模型(路由模型)其實(shí)也是屬于發(fā)布/訂閱模型稚照。只不過(guò)是交換機(jī)類型不一樣,這里我們將學(xué)習(xí)Direct交換機(jī)模型。這個(gè)類型于Fanout類型不同的是果录,F(xiàn)anout類型是給每一個(gè)綁定到交換機(jī)上的隊(duì)列發(fā)消息上枕,而Direct則是可以向指定的隊(duì)列發(fā)送消息,通過(guò)RoutingKey(路由key)

官方介紹如下:


image-20200422094942316.png

大體意思如下:

在Fanout類型中弱恒,生產(chǎn)者發(fā)布消息辨萍,所有消費(fèi)者都可以獲取所有消息。

在路由模型中返弹,我們將添加一個(gè)功能 - 我們將只能訂閱一部分消息锈玉。 例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤空間)义起,同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息嘲玫。

但是,在某些場(chǎng)景下并扇,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange抡诞。在Direct模型下穷蛹,隊(duì)列與交換機(jī)的綁定,不能是任意綁定了昼汗,而是要指定一個(gè)RoutingKey(路由key)肴熏,生產(chǎn)者在向Exchange發(fā)送消息時(shí),也必須指定消息的routing key顷窒。

簡(jiǎn)而言之就是生產(chǎn)者需要告訴交換機(jī)要將消息發(fā)送到指定的隊(duì)列中蛙吏,怎么告訴就是通過(guò)RoutingKey(路由key)

4.2 模型圖

image-20200422095256784.png

從圖中我們可以看出:

  • P (生產(chǎn)者) 向 X(交換機(jī))發(fā)送消息時(shí)會(huì)指定 路由key,
  • 由于交換機(jī)類型為direct,該交換機(jī)就根據(jù)不同的路由key將orange消息轉(zhuǎn)發(fā)到了Q1隊(duì)列,將消息black,green消息轉(zhuǎn)發(fā)到了Q2隊(duì)列鞋吉,然后被彼此綁定的消費(fèi)者所消費(fèi)鸦做。

接下來(lái)我們將使用Java代碼模擬圖中過(guò)程。

4.3 代碼

  • 生產(chǎn)者

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.springframework.amqp.core.ExchangeTypes;
    
    public class Producer {
        public static final String EXCHANGE_NAME = "hello_exchange_direct";
    
        public static void main(String[] args) throws Exception {
            // 1.創(chuàng)建連接
            Connection connection = ConnectionUtil.getConnection();
            // 2.創(chuàng)建通道
            Channel channel = connection.createChannel();
            //3. 創(chuàng)建交換機(jī)
            channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
            // 4. 發(fā)送消息
            String orange = "hello orange";
            /*
             *   5.發(fā)送消息
             *      第一個(gè)參數(shù):交換機(jī)名字
             *      第二個(gè)參數(shù):路由key
             *      第三個(gè)參數(shù):消息其他參數(shù)
             *      第四個(gè)參數(shù): 消息
             */
            channel.basicPublish(EXCHANGE_NAME,"q1",null,orange.getBytes());
    
            String black = "hello black";
            channel.basicPublish(EXCHANGE_NAME,"q2",null,black.getBytes());
    
            String green = "hello green";
            channel.basicPublish(EXCHANGE_NAME,"q2",null,green.getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
    

    上述代碼可知:

    • 生產(chǎn)者發(fā)送了三條消息hello orange,hello black,hello green
    • orange消息的路由key為q1,到時(shí)候發(fā)送到q1隊(duì)列上谓着,并消費(fèi)者1消費(fèi)
    • black,green消息的路由可以為q2,到時(shí)候發(fā)送到q2隊(duì)列上泼诱,并被消費(fèi)者2消費(fèi)
  • 消費(fèi)者1

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
        public static final  String QUEUE_NAME = "consumer1";
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 聲明隊(duì)列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            /*
            *   綁定交換機(jī)
            *       第一個(gè)參數(shù)為隊(duì)列名字
            *       第二個(gè)參數(shù)為交換機(jī)名字
            *       第三個(gè)參數(shù)為路由key
             */
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1");
    
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel)  {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
        }
    }
    
    
  • 消費(fèi)者2

    package com.mq.rabbit.routting;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
        public static final  String QUEUE_NAME = "consumer2";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 聲明隊(duì)列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1");
            channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q2");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel)  {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
        }
    }
    
    

4.4 測(cè)試

  • 生產(chǎn)者測(cè)試


    image-20200422104639726.png
  • 消費(fèi)者測(cè)試


    image-20200422104948958.png

    image-20200422105006683.png

    效果滿足我們想要的,這就是MQ中的路由模型赊锚。

5.Topics模型

MQ中的Top模型其實(shí)也是屬于發(fā)布/訂閱中模型的一種治筒,只不過(guò)交換機(jī)模型換成了 Topic

5.1 介紹

image-20200422111140615.png

大體意思如下:

  • 路由key由一個(gè)或者多個(gè)參數(shù)組成,如果是多個(gè)單詞必須以 . 號(hào)隔開 例如:category.update

  • Topic類型的交換機(jī)與Direct相比舷蒲,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列耸袜。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符

    • * 只能匹配一個(gè)單詞
    • # 匹配一個(gè)或者多個(gè)單詞
    • 例如:
      • product.* product.insert 能夠匹配到,product.insert.dd就匹配不到
      • product.# product.insert ,product.insert.dd都能匹配到

5.2 模型圖

image-20200422112203396.png

我們將發(fā)送所有描述動(dòng)物的消息。消息將用路由key發(fā)送牲平,路由key由三個(gè)字(兩個(gè)點(diǎn))組成堤框。路由key中的第一個(gè)詞將描述一種快速性、第二種顏色和第三種a物種:“<celerity><colour><species>”。

我們創(chuàng)建了三個(gè)綁定:Q1用綁定鍵*.orange.*綁定胰锌,Q2用*.rabbitlazy.#綁定骗绕。

這些綁定可以概括為:

  • Q1匹配的橙色動(dòng)物

  • Q2匹配兔子和懶惰動(dòng)物

例如:

? quick.orange.rabbit 就會(huì) 被 Q2隊(duì)列匹配到

? lazy.orange.elephant就會(huì)被 Q1 Q2隊(duì)列匹配到

? lazy.pink.rabbit 就會(huì)被Q2隊(duì)列匹配到

quick.brown.fox 不會(huì)被任何隊(duì)列匹配到

5.3 代碼實(shí)現(xiàn)

  • 生產(chǎn)者

    使用topic 類型交換機(jī),路由key為:lazy.pink.rabbit lazy.orange.elephantquick.orange.rabbit``

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import org.springframework.amqp.core.ExchangeTypes;
    
    public class Producer {
        public static final String EXCHNAGE_NAME = "hello_exchange_topic";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHNAGE_NAME, ExchangeTypes.TOPIC);
            String msg = "hello lazy.pink.rabbit";
            channel.basicPublish(EXCHNAGE_NAME,"lazy.pink.rabbit",null,msg.getBytes());
    
            msg = "hello lazy.orange.elephant";
            channel.basicPublish(EXCHNAGE_NAME,"lazy.orange.elephant",null,msg.getBytes());
    
            msg = "hello quick.orange.rabbit";
            channel.basicPublish(EXCHNAGE_NAME,"quick.orange.rabbit",null,msg.getBytes());
    
            channel.close();
            connection.close();
        }
    
    }
    
    
  • 消費(fèi)者1

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer1 {
        public static final String  QUEUE_NAME = "Q1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.orange.*");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費(fèi)者1:" + new String(body));
                }
            });
        }
    }
    
    
  • 消費(fèi)者2

    package com.mq.rabbit.topic;
    
    import com.mq.rabbit.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Consumer2 {
        public static final String  QUEUE_NAME = "Q2";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.*.rabbit");
            channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"lazy.#");
            channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費(fèi)者2:" + new String(body));
                }
            });
        }
    }
    
    
  • 測(cè)試

    • 測(cè)試生產(chǎn)者


      image-20200422153000256.png
    • 測(cè)試消費(fèi)者


      image-20200422153240392.png

      image-20200422153256104.png

      以上就是我們的topic類型

6.消息堆積&丟失問(wèn)題

6.1 堆積

如何避免消息對(duì)接問(wèn)題:

  • 在消費(fèi)者一方啟用多線程去消費(fèi)
  • 使用work模型去分擔(dān)消息资昧,注意酬土,發(fā)布/訂閱模型可以和work模型結(jié)合使用

6.2 丟失

如何避免消息丟失

  • 消費(fèi)端使用手動(dòng)ACK機(jī)制(如何消費(fèi)者在消費(fèi)消息之前,MQ就掛掉格带,那么這個(gè)操作無(wú)用)
  • 將消息持久化

消息要想持久化撤缴,那么前提條件就是 交換機(jī),隊(duì)列都需要持久化

6.2.1 交換機(jī)持久化

image-20200422154334785.png

6.2.2 隊(duì)列持久化

image-20200422154438511.png

7. RPC模型

rpc 模型其實(shí)是屬于遠(yuǎn)程調(diào)用叽唱,不屬于消息模型屈呕,所以這里不說(shuō)明,如果對(duì)rpc感興趣棺亭,可以去了解一下dubbo

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載虎眨,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。
  • 序言:七十年代末镶摘,一起剝皮案震驚了整個(gè)濱河市嗽桩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凄敢,老刑警劉巖碌冶,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異涝缝,居然都是意外死亡扑庞,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門拒逮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)罐氨,“玉大人,你說(shuō)我怎么就攤上這事滩援∑裾眩” “怎么了?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵狠怨,是天一觀的道長(zhǎng)约啊。 經(jīng)常有香客問(wèn)我,道長(zhǎng)佣赖,這世上最難降的妖魔是什么恰矩? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮憎蛤,結(jié)果婚禮上外傅,老公的妹妹穿的比我還像新娘纪吮。我一直安慰自己,他們只是感情好萎胰,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布碾盟。 她就那樣靜靜地躺著,像睡著了一般技竟。 火紅的嫁衣襯著肌膚如雪冰肴。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天榔组,我揣著相機(jī)與錄音熙尉,去河邊找鬼。 笑死搓扯,一個(gè)胖子當(dāng)著我的面吹牛检痰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播锨推,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼铅歼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了换可?” 一聲冷哼從身側(cè)響起谭贪,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎锦担,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體慨削,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡洞渔,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了缚态。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片磁椒。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖凉当,靈堂內(nèi)的尸體忽然破棺而出洽损,到底是詐尸還是另有隱情厦取,我是刑警寧澤,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布医增,位于F島的核電站,受9級(jí)特大地震影響老虫,放射性物質(zhì)發(fā)生泄漏叶骨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一祈匙、第九天 我趴在偏房一處隱蔽的房頂上張望忽刽。 院中可真熱鬧天揖,春花似錦、人聲如沸跪帝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)伞剑。三九已至斑唬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間纸泄,已是汗流浹背赖钞。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留聘裁,地道東北人雪营。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像衡便,于是被迫代替她去往敵國(guó)和親献起。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容