Rabbitmq打怪升級之路(十二)Headers-頭交換機模式

簡書:亞武de小文 【原創(chuàng):轉載請注明出處】

頭交換機模式(Headers)

LengToo上學.png
RabbitMQ有以下幾種工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC

Header
模型圖
[亞武de小文]Headers模型圖.png

消息header數據里有一個特殊的鍵x-match榨惰,它有兩個值:

  • all: 默認值蚓聘。一個傳送消息的header里的鍵值對和交換機的header鍵值對全部匹配赖钞,才可以路由到對應交換機
  • any: 一個傳送消息的header里的鍵值對和交換機的header鍵值對任意一個匹配,就可以路由到對應交換機

頭交換機和主題交換機類似,區(qū)別在于:Topic路由值是基于路由鍵搀别,Headers的路由值基于消息的header數據退个。 主題交換機路由鍵只有是字符串,而頭交換機可以是整型和哈希值。

參考代碼
生產者
  • Producer.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    /**
     * Header交換機
     * 生產者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Producer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> headerMap) {
            try {
                // RabbitMQ建立連接的管理器
                ConnectionFactory factory = new ConnectionFactory();
                // 設置服務器地址
                factory.setHost("127.0.0.1");
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 創(chuàng)建一個連接
                Connection connection = factory.newConnection();
                // 創(chuàng)建一個信道
                Channel channel = connection.createChannel();
    
                String message = "發(fā)送信息-headers交換機";
    
                //聲明一個Header類型的交換機
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 生成發(fā)送消息的屬性
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .headers(headerMap)
                        .build();
    
                // 向交換機發(fā)送消息
                channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8"));
    
                LOGGER.info("消息發(fā)送成功:{}", message);
                channel.close();
                connection.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
消費者
  • Consumer01.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * Header交換機
     * 消費者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Consumer01 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> myHeaderMap) {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                //聲明一個Headers類型的交換機
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 聲明一個臨時隊列
                String queue_name = channel.queueDeclare().getQueue();
    
                // 隊列綁定時需要指定參數,注意雖然不需要路由鍵但仍舊不能寫成null绽昼,需要寫成空字符串""
                channel.queueBind(queue_name, EXCHANGE_NAME, "", myHeaderMap);
    
                LOGGER.info("【Consumer01:" + myHeaderMap + "】 等待消息...");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        final String message = new String(body, "UTF-8");
                        LOGGER.info("【Consumer01:" + myHeaderMap + "】接收到的消息 '" + properties.getHeaders() + "':'" + message + "'");
                    }
                };
    
                // 隊列一確認消息
                channel.basicConsume(queue_name, true, consumer);
    
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
        }
    }
    
    
測試工具
  • HearTest.java

    package com.yawu.xiaowen.header;
    
    
    import org.junit.Test;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class HeaderTest {
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        @Test
        public void header() throws InterruptedException {
    
            // 消費者1:綁定 health=Nice,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
                Producer.execute(headerMap);
            });
    
            // 消費者2:綁定  health=Nice,mentality=Bad
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Bad");
                headerMap.put("x-match", "any");
                Producer.execute(headerMap);
            });
    
            // 消費者3:綁定  health=Terrible,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
    //            headerMap.put("x-match","any");
                Producer.execute(headerMap);
            });
    
            Thread.sleep(2 * 1000);
            System.out.println("=============消息01===================");
            // 生產者1 : health=Nice,mentality=Good,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息02===================");
            // 生產者2 : health=Nice,x-match=any
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
    //            headerMap.put("x-match","any");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息03===================");
            // 生產者1 : health=Terrible,mentality=Bad,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Bad");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            // sleep 10s
            Thread.sleep(10 * 1000);
        }
    }
    
  • 運行HeaderTest測試工具,結果如圖:


    Headers模式測試結果.png
  • 至此须蜗,Headers交換機模式學習完畢

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
禁止轉載硅确,如需轉載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末唠粥,一起剝皮案震驚了整個濱河市疏魏,隨后出現的幾起案子,更是在濱河造成了極大的恐慌晤愧,老刑警劉巖大莫,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異官份,居然都是意外死亡只厘,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門舅巷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來羔味,“玉大人,你說我怎么就攤上這事钠右「吃” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵飒房,是天一觀的道長搁凸。 經常有香客問我,道長狠毯,這世上最難降的妖魔是什么护糖? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮嚼松,結果婚禮上嫡良,老公的妹妹穿的比我還像新娘。我一直安慰自己献酗,他們只是感情好寝受,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凌摄,像睡著了一般羡蛾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锨亏,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天痴怨,我揣著相機與錄音,去河邊找鬼器予。 笑死浪藻,一個胖子當著我的面吹牛,可吹牛的內容都是我干的乾翔。 我是一名探鬼主播爱葵,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼反浓!你這毒婦竟也來了萌丈?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雷则,失蹤者是張志新(化名)和其女友劉穎辆雾,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體月劈,經...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡度迂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了猜揪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惭墓。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖而姐,靈堂內的尸體忽然破棺而出腊凶,到底是詐尸還是另有隱情,我是刑警寧澤拴念,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布钧萍,位于F島的核電站,受9級特大地震影響丈莺,放射性物質發(fā)生泄漏划煮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一缔俄、第九天 我趴在偏房一處隱蔽的房頂上張望弛秋。 院中可真熱鬧,春花似錦俐载、人聲如沸蟹略。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挖炬。三九已至,卻和暖如春状婶,著一層夾襖步出監(jiān)牢的瞬間意敛,已是汗流浹背馅巷。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留草姻,地道東北人钓猬。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像撩独,于是被迫代替她去往敵國和親敞曹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355