RabbitMQ五種消息模型

RabbitMQ支持的消息模型

https://www.rabbitmq.com/getstarted.html

環(huán)境搭建

創(chuàng)建一個Maven項目導入RabbitMQ依賴

<!--引入rabbitmq-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

創(chuàng)建ems用戶,密碼為123,創(chuàng)建VirtualHost /ems

Hello World 模型

在上圖的模型中晶通,有以下概念:

  • P:生產(chǎn)者璃氢,也就是要發(fā)送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來狮辽。
  • queue:消息隊列一也,圖中紅色部分巢寡。類似一個郵箱,可以緩存消息椰苟;生產(chǎn)者向其中投遞消息抑月,消費者從其中取出消息。

生產(chǎn)者

創(chuàng)建生產(chǎn)者Provider類舆蝴,通過連接工廠對象獲取連接爪幻,創(chuàng)建通道channel

//生產(chǎn)者
public class Provider {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        //創(chuàng)建連接mq的連接工廠對象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //設置連接rabbitmq的主機
        connectionFactory.setHost("localhost");
        //設置端口號
        connectionFactory.setPort(5672);
        //設置連接哪個虛擬主機
        connectionFactory.setVirtualHost("/ems");
        //設置訪問虛擬主機的用戶名和密碼
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");

        //獲取連接對象
        Connection connection = connectionFactory.newConnection();
        
        //獲取連接中的通道
        Channel channel = connection.createChannel();

        //通道綁定對應的消息隊列
        //參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
        //參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
        //參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
        //參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
        channel.queueDeclare("hello",false,false,false,null);


        //發(fā)布消息
        //參數(shù)1:交換機名稱 參數(shù)2:隊列名稱 參數(shù)3:傳遞消息額外設置 參數(shù)4;消息的具體內容
        channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());

        channel.close();
        connection.close();
    }
}

消費者

channel.queueDeclare();與生產(chǎn)者保持一致须误,使用channel.basicConsume()接收消息,回調接口new DefaultConsumer(channel)重寫handleDelivery()方法

//消費者
public class Customer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //創(chuàng)建連接mq的連接工廠對象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //設置連接rabbitmq的主機
        connectionFactory.setHost("localhost");
        //設置端口號
        connectionFactory.setPort(5672);
        //設置連接哪個虛擬主機
        connectionFactory.setVirtualHost("/ems");
        //設置訪問虛擬主機的用戶名和密碼
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");

        //獲取連接對象
        Connection connection = connectionFactory.newConnection();

        //獲取連接中的通道
        Channel channel = connection.createChannel();

        //通道綁定對應的消息隊列--參數(shù)要和生產(chǎn)者保持對應仇轻,否則可能出現(xiàn)錯誤
        //參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
        //參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
        //參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
        //參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
        channel.queueDeclare("hello", false, false, false, null);

        //消費信息
        //參數(shù)1:消費哪個隊列的信息 隊列名稱
        //參數(shù)2:開啟消息的自動確認機制
        //參數(shù)3:消費時的回調接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {

            //最后一個參數(shù):消息隊列中取出的信息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body)=" + new String(body));
            }
        });
    }
}

測速

首先啟動消費者監(jiān)聽京痢,然后在啟動生產(chǎn)者發(fā)送消息。發(fā)送消息后消費者進行消費

參數(shù)說明

channel.queueDeclare("hello",false,false,false,null);

  • 參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
  • 參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
  • 參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
  • 參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除

channel.basicPublish("","aa", null,"hello rabbitmq".getBytes());

  • 參數(shù)1:交換機名稱
  • 參數(shù)2:隊列名稱
  • 參數(shù)3:傳遞消息額外設置
  • 參數(shù)4篷店;消息的具體內容

channel.basicConsume()

  • 參數(shù)1:消費哪個隊列的信息 隊列名稱
  • 參數(shù)2:開啟消息的自動確認機制
  • 參數(shù)3:消費時的回調接口

durable為ture只能實現(xiàn)隊列持久化祭椰。若想要其中未消費的消息持久化channel.basicPublish()中參數(shù)3要設置為MessageProperties.PERSISTENT_TEXT_PLAIN

封裝工具類

通過實現(xiàn)Hello World模型,我們可以發(fā)現(xiàn)存在了許多重復的代碼疲陕。例如獲取連接對象方淤,關閉連接操作。我們將其封裝成一個工具類 RabbitMQUtils,獲取連接對象getConnection(),關閉資源closeConnectionAndChanel()

//工具類
public class RabbitMQUtils {
    //獲取連接
    public static Connection getConnection() {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //設置連接rabbitmq的主機
            connectionFactory.setHost("localhost");
            //設置端口號
            connectionFactory.setPort(5672);
            //設置連接哪個虛擬主機
            connectionFactory.setVirtualHost("/ems");
            //設置訪問虛擬主機的用戶名和密碼
            connectionFactory.setUsername("ems");
            connectionFactory.setPassword("123");
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
    //關閉連接和通道的方法
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            //關閉通道
            if (channel != null) {
                channel.close();
            }
            //關閉連接
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

不過上述代碼還是可以優(yōu)化的蹄殃,每次獲取連接都需要創(chuàng)建一次工廠對象是沒有必要的携茂。相關認證資源也只需要加載一次即可。于是我們將代碼優(yōu)化

//工具類
public class RabbitMQUtils {
    //創(chuàng)建連接mq的連接工廠對象 屬于重量級資源
    private static ConnectionFactory connectionFactory;

    //靜態(tài)代碼塊 類加載時執(zhí)行 只執(zhí)行一次
    static {
        connectionFactory = new ConnectionFactory();
        //設置連接rabbitmq的主機
        connectionFactory.setHost("localhost");
        //設置端口號
        connectionFactory.setPort(5672);
        //設置連接哪個虛擬主機
        connectionFactory.setVirtualHost("/ems");
        //設置訪問虛擬主機的用戶名和密碼
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
    }
    //獲取連接
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    //關閉連接和通道的方法
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            //關閉通道
            if (channel != null) {
                channel.close();
            }
            //關閉連接
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}
//生產(chǎn)者
public class Provider {

    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        //通過工具類獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();

        //獲取連接中的通道
        Channel channel = connection.createChannel();

        //通道綁定對應的消息隊列
        //參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
        //參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
        //參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
        //參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
        channel.queueDeclare("hello",true,false,false,null);

        //發(fā)布消息
        //參數(shù)1:交換機名稱 參數(shù)2:隊列名稱 參數(shù)3:傳遞消息額外設置 參數(shù)4诅岩;消息的具體內容
        //channel.basicPublish("","aa", null,"hello rabbitmq".getBytes());
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

        //通過工具類關閉
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }
}

消費者同樣使用工具類

//消費者
public class Customer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //通過工具類獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();

        //獲取連接中的通道
        Channel channel = connection.createChannel();

        //通道綁定對應的消息隊列--參數(shù)要和生產(chǎn)者保持對應讳苦,否則可能出現(xiàn)錯誤
        //參數(shù)1:queue 隊列名稱 如果隊列不存在則自動創(chuàng)建
        //參數(shù)2:durable 用來定義隊列特性是否要持久化 true 持久化隊列 false 不持久化
        //參數(shù)3:exclusive 是否獨占隊列 true 獨占隊列 false 不獨占
        //參數(shù)4:autoDelete 是否在消費完成后自動刪除隊列 true 自動刪除 false 不自動刪除
        channel.queueDeclare("hello", true, false, false, null);

        //消費信息
        //參數(shù)1:消費哪個隊列的信息 隊列名稱
        //參數(shù)2:開啟消息的自動確認機制
        //參數(shù)3:消費時的回調接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {

            //最后一個參數(shù):消息隊列中取出的信息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body)=" + new String(body));
            }
        });
    }
}

封裝成功!

Work Quene 模型

Work queues吩谦,也被稱為(Task queues)鸳谜,任務模型。當消息處理比較耗時的時候式廷,可能生產(chǎn)消息的速度會遠遠大于消息的消費速度咐扭。長此以往,消息就會堆積越來越多滑废,無法及時處理蝗肪。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息策严。隊列中的消息一旦消費穗慕,就會消失,因此任務是不會被重復執(zhí)行的妻导。

角色:

  • P:生產(chǎn)者:任務的發(fā)布者
  • C1:消費者-1逛绵,領取任務并且完成任務怀各,假設完成速度較慢
  • C2:消費者-2:領取任務并完成任務,假設完成速度快

平均消費信息

生產(chǎn)者

public class Provider {

    public static void main(String[] args) throws IOException {
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        //獲取通道對象
        Channel channel = connection.createChannel();

        //通過通道聲明隊列
        channel.queueDeclare("work",true,false,false,null );

        //生產(chǎn)消息
        for (int i = 1; i <= 20; i++) {
            channel.basicPublish("","work",null,(i+"hello work queue").getBytes());
        }

        //關閉資源
         RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

消費者

消費者1

public class Customer1 {

    public static void main(String[] args) throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", true, false, false, null);

        //參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者-1:" + new String(body));
            }
        });
    }
}

消費者2

public class Customer2 {

    public static void main(String[] args) throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", true, false, false, null);

        //參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者-2:" + new String(body));
            }
        });
    }
}

測速

先分別啟動消費者1和消費者2進行監(jiān)聽术浪,再啟動生產(chǎn)者生產(chǎn)消息

可以看到20條消息被依次平均消費了瓢对,輪詢。

<mark>不過這種消費效果會導致一種問題胰苏。當某一個消費者出現(xiàn)了故障或者消費消息的時間過長硕蛹,則會導致整個隊列的消息都會變慢。類似木桶效應硕并,因此我們需要一種能者多勞多得的消費消息策略法焰。</mark>

消息自動確認機制

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

我們在channel.basicConsume()中配置了消息自動確認。

<mark>不過開啟自動確認機制還是存在隱患倔毙,例如上圖消息消費者1接收到了5個消息后埃仪,自動確認了。則隊列中的消息則會收到確認后刪除陕赃。而消費者1消費3個消息后出現(xiàn)故障宕機卵蛉,則為被消費的剩余兩個消息則丟失了。</mark>

能者多勞多得

  • 關閉自動確認 channel.basicConsume(隊列名, false,回調函數(shù))
  • 道道每一次只消費一條信息 channel.basicQos(1);
  • 消費消息后手動確認 channel.basicAck(envelope.getDeliveryTag(),false);

消費者

public class Customer1 {

    public static void main(String[] args) throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //每一次只消費一條消息
        channel.basicQos(1);

        channel.queueDeclare("work", true, false, false, null);

        //參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消費者-1:" + new String(body));
                //手動確認信息
                //參數(shù)1:確認隊列中哪個具體消息 參數(shù)2:是否開啟多個消息同時確認
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消費者1每次消費消息前先睡眠2秒么库。

public class Customer2 {

    public static void main(String[] args) throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //每一次只消費一條消息
        channel.basicQos(1);

        channel.queueDeclare("work", true, false, false, null);

        //參數(shù)1:隊列名稱 參數(shù)2:消息自動確認 true 消費者自動向rabbitmq確認消息消費 false 不會自動確認消息
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者-2:" + new String(body));
                //手動確認信息
                //參數(shù)1:確認隊列中哪個具體消息 參數(shù)2:是否開啟多個消息同時確認
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

測試

兩個消費者進行修改后傻丝,消費者1的消費速度遠遠小于消費者2 。此時看消費100條信息的結果诉儒。

消費者1只消費了1條消息葡缰,而消費者2消費了99條消息。實現(xiàn)了能者多勞多得多效果忱反!

Fanout 模型

廣播模型

角色:

  • P 生產(chǎn)者
  • C 消費者
  • X 交換機

在廣播模式下运准,消息發(fā)送流程是這樣的:

  • 可以有多個消費者
  • 每個消費者有自己的queue(隊列)
  • 每個隊列都要綁定到Exchange(交換機)
  • 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機缭受,交換機來決定要發(fā)給哪個隊列胁澳,生產(chǎn)者無法決定。
  • 交換機把消息發(fā)送給綁定過的所有隊列
  • 隊列的消費者都能拿到消息米者。實現(xiàn)一條消息被多個消費者消費

生產(chǎn)者

聲明交換機channel.exchangeDeclare("logs", "fanout");參數(shù)1:交換機名稱 參數(shù)2:交換機類型 fanout 廣播類型

public class Provider {

    public static void main(String[] args) throws IOException {
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明指定的的交換機
        //參數(shù)1:交換機名稱 參數(shù)2:交換機類型 fanout 廣播類型
        channel.exchangeDeclare("logs", "fanout");

        //發(fā)送消息
        channel.basicPublish("logs", "", null, "fanout type message".getBytes());

        //釋放資源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);
    }
}

消費者

消費者1

  • channel.exchangeDeclare("logs","fanout");通道綁定交換機
  • String queue = channel.queueDeclare().getQueue();獲取臨時隊列
  • channel.queueBind(queue,"logs","");綁定交換機和隊列
public class Customer1 {

    public static void main(String[] args) throws IOException {
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道綁定交換機
        channel.exchangeDeclare("logs","fanout");

        //
        String queue = channel.queueDeclare().getQueue();

        //綁定交換機和隊列
        channel.queueBind(queue,"logs","");

        //消費消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

消費者2和消費者3復制修改輸入語句即可韭畸。

測試

首先啟動消費者們,在啟動生產(chǎn)者生產(chǎn)消息蔓搞。

測試成功胰丁!

Direct 模型

在Fanout模式中,一條消息喂分,會被所有訂閱的隊列都消費锦庸。但是,在某些場景下蒲祈,我們希望不同的消息被不同的隊列消費甘萧。這時就要用到Direct類型的Exchange萝嘁。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了扬卷,而是要指定一個RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時牙言,也必須指定消息的 RoutingKey
  • Exchange不再把消息交給每一個綁定的隊列怪得,而是根據(jù)消息的Routing Key進行判斷咱枉,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

圖解:

  • P:生產(chǎn)者徒恋,向Exchange發(fā)送消息蚕断,發(fā)送消息時,會指定一個routing key入挣。
  • X:Exchange(交換機)基括,接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊列
  • C1:消費者财岔,其所在隊列指定了需要routing key 為 error 的消息
  • C2:消費者,其所在隊列指定了需要routing key 為 info河爹、error匠璧、warning 的消息

生產(chǎn)者

public class Provider {

    public static void main(String[] args) throws IOException {

        String exchangeName = "logs_direct";
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        //獲取連接通道對象
        Channel channel = connection.createChannel();
        //通過通道聲明交換機
        //參數(shù)1:交換機名稱 參數(shù)2:direct 路由模式
        channel.exchangeDeclare(exchangeName,"direct");
        //發(fā)送消息
        String routingKey = "error";
        channel.basicPublish(exchangeName,routingKey,null,("這是direct模型發(fā)布的基于routingKey:【"+routingKey+"】").getBytes());

        //關閉資源
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }
}

消費者

消費者1

public class Customer1 {

    public static void main(String[] args) throws IOException {
        String exchangeName = "logs_direct";
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道綁定交換機
        channel.exchangeDeclare(exchangeName,"direct");

        //獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();

        //基于routingKey綁定交換機和隊列
        channel.queueBind(queue,exchangeName,"error");

        //消費消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

消費者2

public class Customer2 {

    public static void main(String[] args) throws IOException {
        String exchangeName = "logs_direct";
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道綁定交換機
        channel.exchangeDeclare(exchangeName,"direct");

        //獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();

        //基于routingKey綁定交換機和隊列
        channel.queueBind(queue,exchangeName,"info");
        channel.queueBind(queue,exchangeName,"error");
        channel.queueBind(queue,exchangeName,"warning");

        //消費消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

測試

生產(chǎn)者發(fā)送消息是的routingKey為error所以消費者1,和消費者2都能消費

當我們被生產(chǎn)者綁定當routingKey改為info時咸这,此時只有消費者2才能消費

測試成功夷恍!

Topic 模型

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊列媳维。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符酿雪!這種模型Routingkey 一般都是由一個或多個單詞組成,多個單詞之間以”.”分割侄刽,例如: item.insert

生產(chǎn)者

public class Provider {

    public static void main(String[] args) throws IOException {
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //聲明交換機以及交換機類型 topic
        channel.exchangeDeclare("topics","topic");

        String routingKey = "user.save.find";

        //發(fā)布消息
        channel.basicPublish("topics",routingKey,null,("這里是topic動態(tài)路由模型,routingKey:【"+routingKey+"】").getBytes());

        //釋放資源
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }
}

消費者

消費者1

public class Customer1 {

    public static void main(String[] args) throws IOException {
        String exchangeName = "topics";
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道綁定交換機
        channel.exchangeDeclare(exchangeName,"topic");

        //獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();

        //基于routingKey綁定交換機和隊列
        channel.queueBind(queue,exchangeName,"user.*");

        //消費消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

消費者2

public class Customer2 {

    public static void main(String[] args) throws IOException {
        String exchangeName = "topics";
        //獲取連接對象
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道綁定交換機
        channel.exchangeDeclare(exchangeName,"topic");

        //獲取臨時隊列
        String queue = channel.queueDeclare().getQueue();

        //基于routingKey綁定交換機和隊列
        channel.queueBind(queue,exchangeName,"user.#");

        //消費消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

測試

生產(chǎn)者綁定的routingKey為user.save.find指黎,消費者1的routingKey為user.*,消費者2的routingKey為user.#。所以只符合消費者2州丹。

測試成功醋安!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市墓毒,隨后出現(xiàn)的幾起案子吓揪,更是在濱河造成了極大的恐慌,老刑警劉巖所计,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件柠辞,死亡現(xiàn)場離奇詭異,居然都是意外死亡主胧,警方通過查閱死者的電腦和手機叭首,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門习勤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人放棒,你說我怎么就攤上這事姻报。” “怎么了间螟?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵吴旋,是天一觀的道長。 經(jīng)常有香客問我厢破,道長荣瑟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任摩泪,我火速辦了婚禮笆焰,結果婚禮上,老公的妹妹穿的比我還像新娘见坑。我一直安慰自己嚷掠,他們只是感情好,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布荞驴。 她就那樣靜靜地躺著不皆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪熊楼。 梳的紋絲不亂的頭發(fā)上霹娄,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音鲫骗,去河邊找鬼犬耻。 笑死,一個胖子當著我的面吹牛执泰,可吹牛的內容都是我干的枕磁。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼术吝,長吁一口氣:“原來是場噩夢啊……” “哼透典!你這毒婦竟也來了?” 一聲冷哼從身側響起顿苇,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤峭咒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后纪岁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凑队,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了漩氨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片西壮。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖叫惊,靈堂內的尸體忽然破棺而出款青,到底是詐尸還是另有隱情,我是刑警寧澤霍狰,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布抡草,位于F島的核電站,受9級特大地震影響蔗坯,放射性物質發(fā)生泄漏康震。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一宾濒、第九天 我趴在偏房一處隱蔽的房頂上張望腿短。 院中可真熱鬧,春花似錦绘梦、人聲如沸橘忱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽钝诚。三九已至,卻和暖如春择卦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背郎嫁。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工秉继, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人泽铛。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓尚辑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親盔腔。 傳聞我的和親對象是個殘疾皇子杠茬,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內容

  • RabbitMQ基本概念 我們先看一下RabbitMQ模型結構圖,這樣會方便們更好地去理解RabbitMQ的基本原...
    GeekerLou閱讀 3,850評論 4 47
  • RabbitMQ RabbitMQ是流行的開源消息隊列系統(tǒng)弛随,用erlang語言開發(fā)瓢喉。RabbitMQ是AMQP(高...
    霍運浩閱讀 775評論 0 1
  • 生產(chǎn)者: 復制代碼 1 //direct類型 路由模式 1對1匹配 2 //生產(chǎn)...
    金色888閱讀 544評論 0 0
  • 一、利用docker安裝rabbitmq https://www.cnblogs.com/yufeng218/p/...
    Junma_c631閱讀 347評論 0 1
  • 久違的晴天舀透,家長會栓票。 家長大會開好到教室時,離放學已經(jīng)沒多少時間了愕够。班主任說已經(jīng)安排了三個家長分享經(jīng)驗走贪。 放學鈴聲...
    飄雪兒5閱讀 7,515評論 16 22