RabbitMQ Exchange類型

image.png

藍(lán)色的框:指的是生產(chǎn)者將消息投遞到EXchange上,然后根據(jù)routingkey路由到指定隊(duì)列上
綠色框:消費(fèi)者監(jiān)聽隊(duì)列竿痰,然后接受消息脆粥。
黃色框:消息到達(dá)了exchange是路由到哪個(gè)隊(duì)列,要根據(jù)routingkey而定菇曲。

下面講解Exchange
Exchange的屬性:(大致有個(gè)印象就OK冠绢,繼續(xù)往下看)
1.Name:交換機(jī)名稱;
2.Type:交換機(jī)類型 direct,topic,fanout,headers;
3.Durability:是否需要持久化,true為持久化,代表交換機(jī)在服務(wù)器重啟后是否還存在常潮;
4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后弟胀,自動(dòng)刪除該exchange.
5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
6.Arguments:擴(kuò)展參數(shù)喊式,用于擴(kuò)展AMQP協(xié)議自制化孵户。

Exchange類型以及講解

1.Direct exchange

image.png
image.png

生產(chǎn)者代碼:

public class Producer4DirectExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //5 發(fā)送
        
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
        
    }
    
}

消費(fèi)者的代碼:
此時(shí)需要channel發(fā)送消息指定的routingkey和綁定exchange和隊(duì)列時(shí)候的routingkey相同,直接路由到這些隊(duì)列上岔留。
如果不指定exchangeType,那么就是default Exchange,此時(shí)不需要將隊(duì)列綁定到exchange.但是Routekey需要完全匹配夏哭。

public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        
        //表示聲明了一個(gè)交換機(jī)
        //參數(shù)說明:Exchange的屬性:
        //1.Name:交換機(jī)名稱;
        //2.Type:交換機(jī)類型 direct,topic,fanout,headers;
        //3.Durability:是否需要持久化,true為持久化献联;
        //4.Auto Delete :當(dāng)最后一個(gè)綁定到exchange上的隊(duì)列刪除后竖配,自動(dòng)刪除該exchange.
        //5.Internal:當(dāng)前的exchange是否用于rabbitmq內(nèi)部使用,默認(rèn)為false.
        //Arguments:擴(kuò)展參數(shù)里逆,用于擴(kuò)展AMQP協(xié)議自制化
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示聲明了一個(gè)隊(duì)列
        
        //參數(shù)說明:
        //1.消息隊(duì)列:實(shí)際存儲(chǔ)消息數(shù)據(jù)
        //2.Durability:是否持久化进胯,3.auto_delete:如果選yes,代表最后一個(gè)監(jiān)聽被移除之后原押,該隊(duì)列會(huì)自動(dòng)被刪除胁镐。
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一個(gè)綁定關(guān)系:
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息盯漂,如果沒有消息颇玷,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

2.topic exchange

此方式是算是routingkey的通配符匹配模式,兩張圖片說明問題

符號(hào)“#” 匹配一個(gè)或多個(gè)詞
符號(hào)“”匹配不多不少一個(gè)詞
例如:“l(fā)og.#”能夠匹配到“l(fā)og.info.oa”
"log.
"只會(huì)匹配到“l(fā)og.erro”
如下圖

image.png

生產(chǎn)者代碼(用了三種routingkey發(fā)送)

public class Producer4TopicExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 發(fā)送
        
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }
    
}

消費(fèi)者代碼:

public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.*";
        // 1 聲明交換機(jī) 
        channel.exchangeDeclare(exchangeName, exch angeType, true, false, false, null);
        // 2 聲明隊(duì)列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交換機(jī)和隊(duì)列的綁定關(guān)系:
        //綁定關(guān)系中指定routingkey
        channel.queueBind(queueName, exchangeName, routingKey);
         
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱就缆、是否自動(dòng)ACK帖渠、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息,如果沒有消息违崇,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}

3.Fanout Exchange(不需要routingkey征绎,只需要綁定)

image.png

特點(diǎn)是:
1.不處理路由鍵匿值,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上
2.發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上
3.Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的

生產(chǎn)者代碼:

public class Producer4FanoutExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 創(chuàng)建Connection
        Connection connection = connectionFactory.newConnection();
        //3 創(chuàng)建Channel
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        //5 發(fā)送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());          
        }
        channel.close();  
        connection.close();  
    }
    
}

消費(fèi)者代碼:

public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        
        Channel channel = connection.createChannel();  
        //4 聲明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不設(shè)置路由鍵
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //參數(shù):隊(duì)列名稱、是否自動(dòng)ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //循環(huán)獲取消息  
        while(true){  
            //獲取消息撒妈,如果沒有消息指孤,這一步將會(huì)一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到消息:" + msg);  
        } 
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末孵班,一起剝皮案震驚了整個(gè)濱河市绝葡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嗤谚,老刑警劉巖棺蛛,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異巩步,居然都是意外死亡旁赊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門椅野,熙熙樓的掌柜王于貴愁眉苦臉地迎上來终畅,“玉大人,你說我怎么就攤上這事竟闪±敫#” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵炼蛤,是天一觀的道長(zhǎng)妖爷。 經(jīng)常有香客問我,道長(zhǎng)理朋,這世上最難降的妖魔是什么絮识? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮嗽上,結(jié)果婚禮上笋除,老公的妹妹穿的比我還像新娘。我一直安慰自己炸裆,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布鲜屏。 她就那樣靜靜地躺著烹看,像睡著了一般国拇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上惯殊,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天酱吝,我揣著相機(jī)與錄音,去河邊找鬼土思。 笑死务热,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的己儒。 我是一名探鬼主播崎岂,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼闪湾!你這毒婦竟也來了冲甘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤途样,失蹤者是張志新(化名)和其女友劉穎江醇,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體何暇,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡陶夜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了裆站。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片条辟。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖遏插,靈堂內(nèi)的尸體忽然破棺而出捂贿,到底是詐尸還是另有隱情,我是刑警寧澤胳嘲,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布厂僧,位于F島的核電站,受9級(jí)特大地震影響了牛,放射性物質(zhì)發(fā)生泄漏颜屠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一鹰祸、第九天 我趴在偏房一處隱蔽的房頂上張望甫窟。 院中可真熱鬧,春花似錦蛙婴、人聲如沸粗井。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浇衬。三九已至懒构,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間耘擂,已是汗流浹背胆剧。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留醉冤,地道東北人秩霍。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蚁阳,于是被迫代替她去往敵國(guó)和親铃绒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355