Rabbitmq業(yè)務(wù)流程包含容錯(cuò)排查

流程是這樣的登夫,訂閱者很钓,發(fā)送消息到test交換機(jī)香府,通過route key 分發(fā)到綁定的隊(duì)列,這里涉及到交換機(jī)的類型码倦,可以看我上一篇文章企孩。如果沒有匹配到這個(gè)routeKey就默認(rèn)發(fā)送到AE交換機(jī)(fanout模式),這個(gè)交換機(jī)要設(shè)置internal:true意為內(nèi)部交換機(jī) 袁稽。AE交換機(jī)再把錯(cuò)誤的消息勿璃,發(fā)送到其綁定的隊(duì)列中,如果test交換機(jī),發(fā)送消息被匹配到的隊(duì)里中补疑,而處理該隊(duì)列的訂閱者闻葵,拒絕了或者超時(shí)了處理,test交換機(jī)就將該消息發(fā)送到就死信交換機(jī)癣丧,然后到死信隊(duì)列中

一槽畔、 進(jìn)入死信隊(duì)列(進(jìn)入死信的三種方式)

  • 1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
  • 2.消息TTL過期
  • 3.隊(duì)列達(dá)到最大長度

代碼演示

- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
    - true   發(fā)送給下一個(gè)消費(fèi)者
    - false  誰都不接受,從隊(duì)列中刪除

Rabbit設(shè)置

  • 1.設(shè)置AE交換機(jī) 設(shè)置為內(nèi)部交換機(jī)胁编,模式為fanout
    當(dāng)發(fā)送到正常交換機(jī)消息厢钧,沒有被匹配到route key的消息對(duì)進(jìn)到改交換機(jī)
 FanoutExchange fanoutExchange=new FanoutExchange("alter");
fanoutExchange.setInternal(true);//設(shè)置為內(nèi)部交換機(jī),作為處理了非法的消息,無法匹配到route key的消息
- 為AE交換機(jī)綁定隊(duì)列 `alter_message`
  • 2.設(shè)置處理正常的交換機(jī) test

綁定參數(shù),設(shè)置沒有匹配 route key 的消息發(fā)送到AE交換機(jī) alternate-exchange

  • 3.添加正常的隊(duì)列

    • hello 測試處理正常邏輯

    • task_queue 模擬被拒絕的消息
      添加超時(shí)時(shí)間和死信交換機(jī)和rk

    x-dead-letter-exchange: dead_letter_exchange

    x-dead-letter-routing-key: task_queue.fail

    x-message-ttl: 600

  • 4.設(shè)置死信交換機(jī) dead_letter_exchange

    • 另外創(chuàng)建死信隊(duì)列 dead
    • 綁定 route key task_queue.fail
死信交換機(jī)
模擬死信隊(duì)列

代碼實(shí)例 Python

import pika

#認(rèn)證嬉橙,生產(chǎn)者
credentials = pika.PlainCredentials('guest', 'guest')


#鏈接rabbit服務(wù)器(localhost是本機(jī)早直,如果是其他服務(wù)器請修改為ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,'/',credentials))

#通過tcp協(xié)議獲取一個(gè)連接
channel = connection.channel()

#聲明一個(gè)對(duì)下列和賈環(huán)加
#channel.queue_declare(queue='hello')


#被hello接受了
channel.basic_publish(exchange='test',
                  routing_key='hello',
                  body='Hello World!')

#發(fā)送了一個(gè)沒有匹配的消息,匹配到了alter_message
channel.basic_publish(exchange='test',
                  routing_key='hello12312',
                  body='Hello World!')

#模擬一條雖然能被匹配到市框,但是無法消費(fèi)的消息霞扬,然后被發(fā)送到死信隊(duì)列消息
channel.basic_publish(exchange='test',
                  routing_key='task_queue',
                  body='Hello World!')


  • 正常隊(duì)列


  • 沒有匹配到的到


  • 被拒絕或者超時(shí)進(jìn)入私信隊(duì)列的


使用代碼去創(chuàng)建隊(duì)列和交換機(jī) Java

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
        connectionFactory.setUsername("liuxin");
        connectionFactory.setPassword("930914lx");
        connectionFactory.setVirtualHost("az");
        connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置回調(diào)

        Channel channel = connectionFactory.createConnection().createChannel(false);

        //String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("internal",true);
        //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        //設(shè)置AE交換機(jī)
        channel.exchangeDeclare("alter", "fanout", false, false, false, arguments);
        channel.queueDeclare("alter_message", false, false, false, null);
        channel.queueBind("alter_message", "alter", "");
        
        //聲明死信交換機(jī)并綁定
        channel.exchangeDeclare("dead_letter_exchange", "direct", false, false, null);
        channel.queueDeclare("dead", false, false, false, null);
        channel.queueBind("dead", "dead_letter_exchange", "task_queue.fail");


        arguments = new HashMap<>();
        arguments.put("alternate-exchange", "alter");//指定AE交換機(jī)
        channel.exchangeDeclare("test", "direct", false, false, arguments);
        //聲明接受正式的隊(duì)列,不需要參數(shù)
        channel.queueDeclare("hello", false, false, false, null);
        channel.queueBind("hello", "test", "hello");

        arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
        arguments.put("x-dead-letter-routing-key", "task_queue.fail");
        arguments.put("x-message-ttl",6000);//6s沒有被處理枫振,就死了
        //設(shè)置測試死信隊(duì)列的task_queue喻圃,推送該隊(duì)列里面,被拒絕會(huì)到dead_letter_exchange粪滤,并最終到dead斧拍,routeKey,task_queue.fail 為并設(shè)置死信隊(duì)列參數(shù)
        channel.queueDeclare("task_queue", false, false, false, arguments);
        channel.queueBind("task_queue", "test", "task_queue");

        return connectionFactory;
    }
    
    
    
    
      /**
     * 接受消息的監(jiān)聽杖小,這個(gè)監(jiān)聽客戶交易流水的消息
     * 針對(duì)消費(fèi)者配置
     *
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, PayMentConsumeImpl transactionConsume) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.addQueueNames("hello");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(8);
        container.setConcurrentConsumers(4);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn),當(dāng)設(shè)置了此模式肆汹,必須返回ACK,否則會(huì)進(jìn)入死信隊(duì)列
        container.setMessageListener(transactionConsume);
        container.setPrefetchCount(1000);
        return container;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末予权,一起剝皮案震驚了整個(gè)濱河市昂勉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扫腺,老刑警劉巖岗照,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異斧账,居然都是意外死亡谴返,警方通過查閱死者的電腦和手機(jī)煞肾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門咧织,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人籍救,你說我怎么就攤上這事习绢。” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵闪萄,是天一觀的道長梧却。 經(jīng)常有香客問我,道長败去,這世上最難降的妖魔是什么放航? 我笑而不...
    開封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮圆裕,結(jié)果婚禮上广鳍,老公的妹妹穿的比我還像新娘。我一直安慰自己吓妆,他們只是感情好赊时,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著行拢,像睡著了一般祖秒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上舟奠,一...
    開封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天竭缝,我揣著相機(jī)與錄音,去河邊找鬼沼瘫。 笑死歌馍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晕鹊。 我是一名探鬼主播松却,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼溅话!你這毒婦竟也來了晓锻?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤飞几,失蹤者是張志新(化名)和其女友劉穎砚哆,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屑墨,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡躁锁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卵史。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片战转。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖以躯,靈堂內(nèi)的尸體忽然破棺而出槐秧,到底是詐尸還是另有隱情啄踊,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布刁标,位于F島的核電站颠通,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏膀懈。R本人自食惡果不足惜顿锰,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望启搂。 院中可真熱鬧撵儿,春花似錦、人聲如沸狐血。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽匈织。三九已至浪默,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缀匕,已是汗流浹背纳决。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乡小,地道東北人阔加。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像满钟,于是被迫代替她去往敵國和親胜榔。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理湃番,服務(wù)發(fā)現(xiàn)夭织,斷路器,智...
    卡卡羅2017閱讀 134,656評(píng)論 18 139
  • 為了一些初學(xué)習(xí)者更好理解我就從簡單的解釋一下Rabbitmq的原理吧?吠撮,首先你可以這樣想RabbitMq就是一個(gè)隊(duì)...
    螃蟹和駱駝先生Yvan閱讀 7,403評(píng)論 6 4
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過消息的發(fā)送與接收的方式進(jìn)行通信尊惰,當(dāng)消息接收方服務(wù)忙或不可用時(shí),其提供了一...
    zhuke閱讀 4,467評(píng)論 0 12
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器泥兰。支持消息的持久化弄屡、事務(wù)、擁塞控...
    jiangmo閱讀 10,359評(píng)論 2 34
  • 背景 何為延遲隊(duì)列鞋诗? 顧名思義膀捷,延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列。而一般的隊(duì)列师脂,消息一旦入隊(duì)了之后就...
    wooyoo閱讀 3,453評(píng)論 0 17