RabbitMQ

1、RabbitMQ作用

1.1、流量消峰(解決高并發(fā))

image.png

1.2、模塊之間的異步通信

image.png

1.3击碗、消息隊(duì)列的中間件有哪些

ActiveMQ---------JMS(SUN公司提供的規(guī)范) Java message Server
RabbitMQ-------在當(dāng)下很多公司都用這一個(gè)
RocketMQ------阿里的
kafka------------用的比較多---最初的設(shè)計(jì) 是用來 完成分布式下日志的收集框架

1.4、RabbitMQ的基本安裝

#安裝之前需要的環(huán)境
yum install epel-release
yum install erlang

#安裝rabbitMQ了
下載rpm文件
 wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
#下載完成需要安裝
 yum install rabbitmq-server-3.6.15-1.el7.noarch.rpm
#設(shè)置開機(jī)啟動(dòng)
 systemctl enable rabbitmq-server.service
#查看服務(wù)的狀態(tài)
 systemctl status rabbitmq-server.service
#啟動(dòng)這個(gè)服務(wù)
 systemctl start rabbitmq-server.service
#停止這個(gè)服務(wù)
 systemctl stop rabbitmq-server.service
#查看當(dāng)前所有的用戶
 rabbitmqctl list_users
#查看guest用戶所有擁有的權(quán)限
  rabbitmqctl list_user_permissions guest
#刪除原來的guest用戶
  rabbitmqctl delete_user guest
#添加一個(gè)新的用戶
  rabbitmqctl add_user xiaobobo 12345678
#給小波波設(shè)置個(gè)角色(tag)
   rabbitmqctl set_user_tags xiaobobo administrator
#給xiaobobo賦予權(quán)限
   rabbitmqctl set_permissions -p / xiaobobo ".*" ".*" ".*"
#查看用戶所擁有的權(quán)限
   rabbitmqctl list_user_permissions xiaobobo
#開啟web的管理端
rabbitmq-plugins enable rabbitmq_management

1.5们拙、RabbitMQ中的五種通信模型

1.5.1、helloworld模型

image.png

意思是:生產(chǎn)者將消息發(fā)送到隊(duì)列 然后隊(duì)列將這個(gè)消息發(fā)送給消費(fèi)者

1.5.2阁吝、測(cè)試用例

(1)導(dǎo)包

<!--導(dǎo)入RabbitMQ的相關(guān)的包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.5.0</version>
        </dependency>

(2)生產(chǎn)者

//申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-helloword";

    public static void main(String[] args) throws IOException, TimeoutException {

        //第一步:獲取連接
        Connection connection = ConnectionUtils.getConnection();
        //第二步:創(chuàng)建數(shù)據(jù)傳輸?shù)耐ǖ?        Channel channel = connection.createChannel();
        //第三步:申明隊(duì)列
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否持久化   比如現(xiàn)在發(fā)送到隊(duì)列里面的消息 如果沒有持久化 重啟這個(gè)隊(duì)列后數(shù)據(jù)會(huì)丟失(false) true:重啟之后數(shù)據(jù)依然在
         * 第三個(gè)參數(shù):是否排外
         *            1:連接關(guān)閉之后 這個(gè)隊(duì)列是否自動(dòng)刪除
         *            2:是否允許其他通道來進(jìn)行訪問這個(gè)數(shù)據(jù)
         * 第四個(gè)參數(shù):是否允許自動(dòng)刪除
         *            就是當(dāng)最后一個(gè)連接斷開的時(shí)候  這個(gè)時(shí)候是否允許自動(dòng)刪除這個(gè)隊(duì)列
         * 第五個(gè)參數(shù):申明隊(duì)列的時(shí)候 要附帶的一些參數(shù)
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //發(fā)送數(shù)據(jù)到隊(duì)列

        /**
         * 第一個(gè)參數(shù):exchange  交換機(jī)  沒有就設(shè)置為"值就可以了"
         * 第二個(gè)參數(shù):原本的意思是路由的key  現(xiàn)在沒有key直接使用隊(duì)列的名字
         * 第三個(gè)參數(shù):發(fā)送數(shù)據(jù)到隊(duì)列的時(shí)候 是否要帶一些參數(shù)  沒有帶任何參數(shù)
         * 第四個(gè)參數(shù):向隊(duì)列中發(fā)送的數(shù)據(jù)
         */
        channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes("utf-8"));
        channel.close();
        connection.close();
    }

(3)消費(fèi)者的寫法

 //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-helloword";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取連接
        Connection connection = ConnectionUtils.getConnection();

        //創(chuàng)建通道
        final Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消費(fèi)者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:這個(gè)消息的唯一標(biāo)記
             * @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
             * @param properties:前面隊(duì)列帶過來的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:"+new String(body));
                //進(jìn)行手動(dòng)應(yīng)答
                /**
                 * 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
                 * 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //綁定這個(gè)消費(fèi)者
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
         * 第三個(gè)參數(shù):消費(fèi)者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}

1.5.3砚婆、work模型的玩法

多個(gè)消費(fèi)者消費(fèi)的數(shù)據(jù)之和才是原來隊(duì)列中的所有數(shù)據(jù) 適用于流量的消峰


image.png

(1)生產(chǎn)者

public class Producer {

    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //下面向隊(duì)列中發(fā)送100條消息
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,("我是工作模型:"+i).getBytes());
        }
        channel.close();
        connection.close();
    }

}

(2)消費(fèi)者1寫法

public class Consumer {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取連接
        Connection connection = ConnectionUtils.getConnection();

        //創(chuàng)建通道
        final Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消費(fèi)者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:這個(gè)消息的唯一標(biāo)記
             * @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
             * @param properties:前面隊(duì)列帶過來的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1接受到的消息是:"+new String(body));
                //進(jìn)行手動(dòng)應(yīng)答
                /**
                 * 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
                 * 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //綁定這個(gè)消費(fèi)者
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
         * 第三個(gè)參數(shù):消費(fèi)者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

(3)消費(fèi)者2寫法

public class Consumer1 {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取連接
        Connection connection = ConnectionUtils.getConnection();

        //創(chuàng)建通道
        final Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消費(fèi)者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:這個(gè)消息的唯一標(biāo)記
             * @param envelope:信封(請(qǐng)求的消息屬性的一個(gè)封裝)
             * @param properties:前面隊(duì)列帶過來的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者2接受到的消息是:"+new String(body));
                //進(jìn)行手動(dòng)應(yīng)答
                /**
                 * 第一個(gè)參數(shù):自動(dòng)應(yīng)答的這個(gè)消息標(biāo)記
                 * 第二個(gè)參數(shù):false 就相當(dāng)于告訴隊(duì)列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //綁定這個(gè)消費(fèi)者
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
         * 第三個(gè)參數(shù):消費(fèi)者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

1.5.4、發(fā)布訂閱模型

簡(jiǎn)單的說就是隊(duì)列里面的消息會(huì)被幾個(gè)消費(fèi)者 同時(shí)接受到
模型 適合于做模塊之間的異步通信
例子: 就可以使用這種模型 來發(fā)送日志信息 ------ 立馬就會(huì)被log收集程序
收集到 直接寫到咋們的文件里面
例子:springcloud的config組件里面通知配置自動(dòng)更新
例子:緩存同步也可以使用這一個(gè)
例子:高并發(fā)下實(shí)現(xiàn)下單邏輯
(1)生產(chǎn)者寫法

public class Producer {

    //申明交換機(jī)的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明交換機(jī)
        /**
         * 第一個(gè)參數(shù):交換機(jī)的名字
         * 第二個(gè)參數(shù):交換機(jī)的類型
         *    交換機(jī)的類型是不能亂寫的   如果使用的是發(fā)布訂閱模型  只能寫 fanout
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //發(fā)送消息到交換機(jī)了
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish(EXCHANGE_NAME,"",null,("發(fā)布訂閱模型的值:"+i).getBytes());
        }
        //關(guān)閉資源
        channel.close();
        connection.close();
    }

}

(2)消費(fèi)者1寫法

public class Consumer {
    //申明交換機(jī)的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";
    //隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-fanout-queue1";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //將隊(duì)列綁定到交換機(jī)
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):交換機(jī)的名字
         * 第三個(gè)參數(shù):路由的key(現(xiàn)在沒有用到這個(gè)路由的key)
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊(duì)列1接受到的數(shù)據(jù)是:"+new String(body));
            }
        };

        //就進(jìn)行消費(fèi)者的綁定
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

(3)消費(fèi)者2寫法

public class Consumer1 {

    //申明交換機(jī)的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";
    //隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-fanout-queue2";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //將隊(duì)列綁定到交換機(jī)
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):交換機(jī)的名字
         * 第三個(gè)參數(shù):路由的key(現(xiàn)在沒有用到這個(gè)路由的key)
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊(duì)列2接受到的數(shù)據(jù)是:"+new String(body));
            }
        };

        //就進(jìn)行消費(fèi)者的綁定
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

1.5.5突勇、路由模型

路由模式相當(dāng)于是分布訂閱模式的升級(jí)版


image.png

(1)生產(chǎn)者寫法

public class Producer {
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 如果玩的是路由模型 交換機(jī)的類型只能是  direct
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //發(fā)送信息到交換機(jī)
        for (int i = 0; i <100 ; i++) {
            if(i%2==0){
                //這個(gè)路由的key是可以隨便設(shè)置的
                channel.basicPublish(EXCHANGE_NAME,"xiaowangzi",null,("路由模型的值:"+i).getBytes());
            }else{
                //這個(gè)路由的key是可以隨便設(shè)置的
                channel.basicPublish(EXCHANGE_NAME,"xiaobobo",null,("路由模型的值:"+i).getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}

(2)消費(fèi)者1寫法

public class Cosnumer1 {

    private static final String QUEUE_NAME="nz1904-direct-queue-01";
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //綁定隊(duì)列到交換機(jī)
        //第三個(gè)參數(shù):表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo");
        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               //這里就是接受消息的地方
                System.out.println("路由key是xiaobobo的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
            }
        };
        //綁定消費(fèi)者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

(3)消費(fèi)者2寫法

public class Consumer2 {
    private static final String QUEUE_NAME="nz1904-direct-queue-02";
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
      channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //綁定隊(duì)列到交換機(jī)
        //第三個(gè)參數(shù):表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi");
        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //這里就是接受消息的地方
                System.out.println("路由key是xiaowangzi的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
            }
        };
        //綁定消費(fèi)者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

1.5.6装盯、topic模式

說明:topic模式相當(dāng)于是對(duì) 路由模式的一個(gè)升級(jí) topic模式主要就是在匹配的規(guī)則上可以實(shí)現(xiàn)模糊匹配


image.png

(1)生產(chǎn)者的寫法

public class Producer {
    private static final String EXCHANGE_NAME = "nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 如果玩的是路由模型 交換機(jī)的類型只能是  direct
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //發(fā)送信息到交換機(jī)
        for (int i = 0; i < 100; i++) {
            //這個(gè)路由的key是可以隨便設(shè)置的
            //topic在路由基礎(chǔ)上只有 路由的key發(fā)生改變  其余的都不變
            channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", null, ("路由模型的值:" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

(2)消費(fèi)者1寫法

public class Cosnumer1 {

    private static final String QUEUE_NAME="nz1904-topic-queue-01";
    private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //綁定隊(duì)列到交換機(jī)
        //第三個(gè)參數(shù):表示的是路由key
        /**
         *    注意  * :只是代表一個(gè)單詞
         *         # :這個(gè)才代表  一個(gè)或者多個(gè)單詞
         *         記住如果有多個(gè)單詞組成的路由key  那么多個(gè)單詞之間使用 . 好連接
         *
         *
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo.*");
        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               //這里就是接受消息的地方
                System.out.println("路由key是xiaobobo的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
            }
        };
        //綁定消費(fèi)者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

(3)消費(fèi)者2寫法

public class Consumer2 {
    private static final String QUEUE_NAME="nz1904-topic-queue-02";
    private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //綁定隊(duì)列到交換機(jī)
        //第三個(gè)參數(shù):表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi.#");
        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //這里就是接受消息的地方
                System.out.println("路由key是xiaowangzi的這個(gè)隊(duì)列接受到數(shù)據(jù):"+new String(body));
            }
        };
        //綁定消費(fèi)者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

備注:使用了交換機(jī)發(fā)送了數(shù)據(jù) 如果沒有消費(fèi)者的話那么這個(gè)數(shù)據(jù)會(huì)發(fā)生丟失 通過設(shè)置這樣的屬性來解決這個(gè)問題
channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", MessageProperties.PERSISTENT_TEXT_PLAIN, ("路由模型的值:" + i).getBytes());

備注2:通道的問題
原本沒有通道我們也可以完成這個(gè)請(qǐng)求 RabbitMQ官方考慮到一個(gè)問題生產(chǎn)者 和 消費(fèi)者 實(shí)際上 Connection 引入這個(gè)通道這個(gè)概念 是為了降低TCP連接的這樣一個(gè)消耗 相當(dāng)于是為了 TCP的復(fù)用 還有一個(gè)目的 就是為了線程隱私 相當(dāng)于每一個(gè)線程都給你創(chuàng)建了一個(gè)通道

2、RabbitMQ中的一些高級(jí)屬性

2.1甲馋、參數(shù)的含義

channel.queueDeclare(QUEUE_NAME,true,false,true,null);
第二個(gè)參數(shù):如果是false   重啟之后 隊(duì)列都沒有了 數(shù)據(jù)也會(huì)丟失
第三個(gè)參數(shù):true:連接一旦關(guān)閉  那么就會(huì)刪除這個(gè)隊(duì)列
第四個(gè)參數(shù):就是最后一個(gè)消費(fèi)者 退出去之后 那么這個(gè)隊(duì)列是否自動(dòng)刪除
第五個(gè)參數(shù):講ttl隊(duì)列的時(shí)候要專門講

channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes());

第一個(gè)參數(shù) :交換機(jī)
第二個(gè)參數(shù):路由key
第三個(gè)參數(shù):設(shè)置的隊(duì)列的屬性
第四個(gè)參數(shù):值

2.2埂奈、confirm機(jī)制

問題:就是放到隊(duì)列中的消息 怎么保證一定就是成功的放入了隊(duì)列

引入了 confirm機(jī)制:這個(gè)機(jī)制的意思是 只要放消息到 queue是成功的那么隊(duì)列就一定會(huì)給咋們進(jìn)行反饋

public class Producer {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-confirm-01";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //第一步:開啟confirm消息確認(rèn)機(jī)制
        channel.confirmSelect();
        //我們就要對(duì)消息的可達(dá)性實(shí)施監(jiān)聽
        //下面就是對(duì)消息的簽收情況進(jìn)行確認(rèn)
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println("發(fā)送成功的監(jiān)聽.....");
            }

            @Override
            public void handleNack(long l, boolean b) throws IOException {
                System.out.println("發(fā)送失敗的監(jiān)聽.....");
            }
        });

        channel.queueDeclare(QUEUE_NAME,false,false,true,null);
        channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes());

    }
}

2.3、return機(jī)制

場(chǎng)景:我們?cè)诎l(fā)送消息的是時(shí)候定躏、我們指定的交換機(jī)不存在 或者 指定的路由key不存在 這種時(shí)候我們就需要監(jiān)聽這種不可達(dá)的消息 我們的return機(jī)制就產(chǎn)生了

前提:當(dāng)前的隊(duì)列必須要有消費(fèi)者存在

//有一個(gè)參數(shù)需要設(shè)置

mandatory 如果設(shè)置為ture:就表示的是要監(jiān)聽不可達(dá)的消息 進(jìn)行處理
如果設(shè)置為false 那么隊(duì)列端會(huì)直接刪除這個(gè)消息

(1)生產(chǎn)者的編寫

public class Producer {

    private static final String EXCHANGE_NAME="test_return_exchange1";
    //是能路由的key
    private static final String ROUTING_KEY="return.save";
    //是不能路由的key
    private static final String ROUTING_ERROR_KEY="abc.save";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //添加監(jiān)聽
        channel.addReturnListener(new ReturnListener() {
            /**
             *
             * @param i:隊(duì)列響應(yīng)給瀏覽器的狀態(tài)碼
             * @param s:表示的是狀態(tài)碼對(duì)應(yīng)的文本信息
             * @param s1:交換機(jī)的名字
             * @param s2:表示的是路由的key
             * @param basicProperties:表示的是消息的屬性
             * @param bytes:消息體的內(nèi)容
             * @throws IOException
             */
            @Override
            public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("監(jiān)聽到不可達(dá)的消息");
                System.out.println("狀態(tài)碼:"+i+"---文本信息:"+s+"---交換機(jī)名字:"+s1+"----路由的key:s2");
                System.out.println("監(jiān)聽到不可達(dá)的消息");
                System.out.println("監(jiān)聽到不可達(dá)的消息");
                System.out.println("監(jiān)聽到不可達(dá)的消息");
            }
        });
        channel.basicPublish(EXCHANGE_NAME,ROUTING_ERROR_KEY,true,null,"這里是測(cè)試Return機(jī)制".getBytes());

    }

(2)消費(fèi)者寫法

public class Consumer {

    private static final String EXCHANGE_NAME="test_return_exchange1";

    //是能路由的key
    private static final String ROUTING_KEY="return.#";
    //制定綁定的隊(duì)列
    private static final String QUEUE_NAME="test_return_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //申明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //綁定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //申明消費(fèi)者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到這個(gè)消息了....");
            }
        };
        //進(jìn)行消費(fèi)的綁定
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }

}

2.3账磺、消費(fèi)端的限流問題

場(chǎng)景:消費(fèi)者死了 隊(duì)列里面一瞬間就就積累了上萬條數(shù)據(jù)、這個(gè)時(shí)候當(dāng)我們打開客戶端的時(shí)候痊远、瞬間就有巨量的信息給推送過來垮抗、但是我們的客戶端是沒有辦法同時(shí)處理這么多數(shù)據(jù)的 結(jié)果就是消費(fèi)者死了....

這種場(chǎng)景下我們就需要對(duì)消費(fèi)端進(jìn)行限流

(1)生產(chǎn)者的編寫

public class Producer {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-limit-01";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <100 ; i++) {
            channel.basicPublish("",QUEUE_NAME, null,("我是小波波"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

(2)消費(fèi)者1的編寫

public class Consumer {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-limit-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取連接
        Connection connection = ConnectionUtils.getConnection();

        //創(chuàng)建通道
        final Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //消費(fèi)者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者1接受到的消息是:"+new String(body));
                //進(jìn)行手動(dòng)應(yīng)答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //綁定這個(gè)消費(fèi)者
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
         * 第三個(gè)參數(shù):消費(fèi)者的申明
         */     channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

(3)消費(fèi)者2寫法

public class Consumer1 {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-limit-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        //獲取連接
        Connection connection = ConnectionUtils.getConnection();

        //創(chuàng)建通道
        final Channel channel = connection.createChannel();

        //申明隊(duì)列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //設(shè)置限流機(jī)制
        /**
         * 第一個(gè)參數(shù):消息本身的大小 如果設(shè)置為0  那么表示對(duì)消息本身的大小不限制
         * 第二個(gè)參數(shù):告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息 你要推送的前提是
         * 現(xiàn)在這N個(gè)消息 已經(jīng)手動(dòng)被確認(rèn) 已經(jīng)完成
         * 第三個(gè)參數(shù):true/false :是否將上面的設(shè)置應(yīng)用于整個(gè)通道  true :表示整個(gè)         通道的消費(fèi)者都是這個(gè)策略  如果是false表示的是 只有當(dāng)前的consumer 是這個(gè)策略
         */
        channel.basicQos(0,5,false);
        //結(jié)論:實(shí)際上如果不設(shè)置的話 分配任務(wù)的事 一開始就分配好了
        //必須手動(dòng)簽收

        //消費(fèi)者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費(fèi)者2接受到的消息是:"+new String(body));
                try {
                  Thread.sleep(200);
                }catch (Exception err){

                }
                //進(jìn)行手動(dòng)應(yīng)答
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //綁定這個(gè)消費(fèi)者
        /**
         * 第一個(gè)參數(shù):隊(duì)列的名字
         * 第二個(gè)參數(shù):是否自動(dòng)應(yīng)答
         * 第三個(gè)參數(shù):消費(fèi)者的申明
         */
       channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

2.4、TTL隊(duì)列(Time To Live)

場(chǎng)景:我要下單 下單之后 在一定的時(shí)間內(nèi)碧聪、我的訂單如果沒有被處理 那么自動(dòng)失效

備注:簡(jiǎn)單的說就是咋們的隊(duì)列中的消息是有時(shí)間限制的冒版、如果超時(shí)那么這個(gè)消息將會(huì)被隊(duì)列給刪除

public class Producer {
    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-ttl-01";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //我們只需要給下面的隊(duì)列設(shè)置好屬性 那么這個(gè)隊(duì)列 就自動(dòng)變成 ttl隊(duì)列了
        Map<String,Object> map=new HashMap<>();
        map.put("x-message-ttl",5000);
        channel.queueDeclare(QUEUE_NAME,false,false,false,map);

        channel.basicPublish("",QUEUE_NAME, null,("我是小波波").getBytes());
        channel.close();
        connection.close();
    }
}

2.5、死信隊(duì)列

什么是死信隊(duì)列

當(dāng)消息在隊(duì)列中編程死信之后逞姿、可以定義它重新push 到另外一個(gè)交換機(jī)上辞嗡、這個(gè)交換機(jī) 也有自己對(duì)應(yīng)的隊(duì)列 這個(gè)隊(duì)列就稱為死信隊(duì)列

死信:
發(fā)送到隊(duì)列中的消息被拒絕了
消息的ttl時(shí)間過期
隊(duì)列達(dá)到了最大長(zhǎng)度 再往里面放信息

在滿足上面死信的前提下 捆等、現(xiàn)在我們可以定義一個(gè)隊(duì)列 這個(gè)隊(duì)列專門用來
死信隊(duì)列也是一個(gè)正常的交換機(jī)、和一般的交換機(jī)沒有什么區(qū)別
當(dāng)這個(gè)隊(duì)列中如果有這個(gè)死信的時(shí)候续室、rabbitmq就會(huì)將這個(gè)消息自動(dòng)發(fā)送到我們提前定義好的死信隊(duì)列中去(簡(jiǎn)單的說就是路由到另外一個(gè)隊(duì)列)


image.png

(1)生產(chǎn)者的編寫

public class Producer {
    //定義的是隊(duì)列(正常的交換機(jī))  這里發(fā)送消息是在交換機(jī)上面
    private static final String EXCHANGE_NAME="ttl-dlx-bobo-exchange";
    //定義一個(gè)路由的key
    private static final String  ROUTING_KEY="dlx.#";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i <5 ; i++) {
             channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,false,null,("我是小波波"+i).getBytes());
        }
    }
}

(2)消費(fèi)者的編寫

public class Consumer {

    //定義的是交換機(jī)
    private static final String EXCHANGE_NAME="ttl-dlx-bobo-exchange";
    //正常情況下的隊(duì)列
    private static final String QUEUE_NAME="ttl-dlx-bobo-queue";

    //定義死信隊(duì)列的交換機(jī)的名字
    private static final  String DLX_EXCHANGE_NAME="dlx-bobo-exchange";
    //死信隊(duì)列的定義
    private static final  String DLX_QUEUE_NAME="dlx-bobo-queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //創(chuàng)建交換機(jī)和隊(duì)列進(jìn)行綁定
        channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
        //隊(duì)列的申明
        //我們要申明成死信隊(duì)列
        Map<String,Object> map=new HashMap<>();
        map.put("x-message-ttl",5000);
        //添加一個(gè)死信的屬性  //后面這個(gè)名字就是死信隊(duì)列交換機(jī)的名字
        map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);

        channel.queueDeclare(QUEUE_NAME,true,false,false,map);

        //進(jìn)行隊(duì)列和交換機(jī)進(jìn)行綁定
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"dlx.#");

        //上面是正常的隊(duì)列的申明
        //下面就是死信隊(duì)列的申明
        channel.exchangeDeclare(DLX_EXCHANGE_NAME,"topic");
        //申明隊(duì)列
        channel.queueDeclare(DLX_QUEUE_NAME,true,false,false,null);
        //綁定這個(gè)死信隊(duì)列
        channel.queueBind(DLX_QUEUE_NAME,DLX_EXCHANGE_NAME,"#");


        //直接性的來調(diào)用這個(gè)
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("獲取到數(shù)據(jù)了:"+new String(body));

            }
        };
        //綁定消費(fèi)者
        channel.basicConsume(DLX_QUEUE_NAME,true,defaultConsumer);
        //現(xiàn)在有這個(gè)問題呀?
    } 
}

2.6栋烤、消費(fèi)者端手動(dòng)簽收和消息的重回隊(duì)列

場(chǎng)景:消費(fèi)者端接受到了咋們的隊(duì)列中的數(shù)據(jù),但是在進(jìn)行業(yè)務(wù)邏輯處理的時(shí)候猎贴、發(fā)現(xiàn)一個(gè)問題班缎、業(yè)務(wù)邏輯處理失敗了? 怎么辦?
手動(dòng)簽收應(yīng)答、我們也可以手動(dòng)拒絕她渴、然后讓消息重回隊(duì)列
(1)生產(chǎn)者的編寫

public class Producer {
    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-ack-02";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
   channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.basicPublish("",QUEUE_NAME, null,"我是小波波1111".getBytes());
        channel.close();
        connection.close();
    }
}

(2)消費(fèi)者的編寫

public class Consumer {

    //申明隊(duì)列的名字
    private static final String QUEUE_NAME="nz1904-ack-02";
    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:"+new String(body));
                /**第一個(gè)參數(shù):當(dāng)前消息的標(biāo)記
                 * 第二個(gè)參數(shù):是否批量進(jìn)行應(yīng)答
                 * 下面是簽收
                 */
                //channel.basicAck(envelope.getDeliveryTag(),false);
                //下面也可以拒絕簽收
                /**
                 * 第三個(gè)參數(shù):表示決絕簽收之后這個(gè)消息是否要重回隊(duì)列?
                 */
                channel.basicNack(envelope.getDeliveryTag(),false,true);
            }
        };     channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
    }
}

2.7达址、怎么保證消息的投遞一定是成功的(難)

(1)消息的延遲投遞來解決傳遞的可靠性


image.png

(2)日志消息表實(shí)現(xiàn)可靠消息的傳輸


image.png

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市趁耗,隨后出現(xiàn)的幾起案子沉唠,更是在濱河造成了極大的恐慌,老刑警劉巖苛败,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件满葛,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡罢屈,警方通過查閱死者的電腦和手機(jī)嘀韧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缠捌,“玉大人锄贷,你說我怎么就攤上這事÷拢” “怎么了谊却?”我有些...
    開封第一講書人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)哑芹。 經(jīng)常有香客問我炎辨,道長(zhǎng),這世上最難降的妖魔是什么聪姿? 我笑而不...
    開封第一講書人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任碴萧,我火速辦了婚禮,結(jié)果婚禮上末购,老公的妹妹穿的比我還像新娘勿决。我一直安慰自己,他們只是感情好招盲,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開白布低缩。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪咆繁。 梳的紋絲不亂的頭發(fā)上讳推,一...
    開封第一講書人閱讀 51,718評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音玩般,去河邊找鬼银觅。 笑死,一個(gè)胖子當(dāng)著我的面吹牛坏为,可吹牛的內(nèi)容都是我干的究驴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼匀伏,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼洒忧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起够颠,我...
    開封第一講書人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤熙侍,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后履磨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛉抓,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年剃诅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了巷送。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡矛辕,死狀恐怖惩系,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情如筛,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布抒抬,位于F島的核電站杨刨,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏擦剑。R本人自食惡果不足惜妖胀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惠勒。 院中可真熱鬧赚抡,春花似錦、人聲如沸纠屋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至赁遗,卻和暖如春署辉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背岩四。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工哭尝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人剖煌。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓材鹦,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親耕姊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子桶唐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355