SpringBoot整合RabbitMQ,常用操作

本文介紹三種常用操作,基于spring-boot-starter-amqp依賴

  • 手動ack
  • work模式(能者多勞)
  • 消息格式轉(zhuǎn)換

手動ack

消息確認(rèn)模式

在amqp協(xié)議中消息確認(rèn)有兩種模式

  1. 自動確認(rèn)模式(automatic acknowledgement model)當(dāng)消息代理將消息發(fā)送給應(yīng)用后立即刪除

  2. 顯式確認(rèn)模式(explicit acknowledgement model)待應(yīng)用發(fā)送一個確認(rèn)回執(zhí)后再刪除消息

而在spring-boot-starter-amqp,spring定義了三種

  1. NONE 沒有ack的意思,對應(yīng)rabbitMQ的自動確認(rèn)模式

  2. MANUAL 手動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式

  3. AUTO 自動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式

首先注意的是spring-amqp中的自動模式與rabbit中的自動模式是不一樣的,其次,在spring-amqp中MANUAL 與 AUTO的關(guān)系有點類似于在spring中手動提交事務(wù)與自動提交事務(wù)的區(qū)別,一個是手動發(fā)送ack一個是在方法執(zhí)行完,沒有異常的情況下自動發(fā)送ack

代碼實現(xiàn)

三個步驟

  1. 設(shè)置消費者的消息確認(rèn)模式

  2. 手動確認(rèn)/拒絕消息

  3. 設(shè)置消息拒絕策略

設(shè)置消費者的消息確認(rèn)模式:

@Configuration
public class ListenerConfig {

   @Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory= 
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //設(shè)置消費者的消息確認(rèn)模式
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return containerFactory;
    }
}

手動確認(rèn)/拒絕消息:

@Component
@RabbitListener(
        containerFactory = "myListenerFactory",
        bindings = @QueueBinding(
            value = @Queue("myManualAckQueue"),
            exchange = @Exchange(value = "myManualAckExchange", type = ExchangeTypes.DIRECT),
            key = "mine.manual"))
public class MyAckListener {

    @RabbitHandler
    public void onMessage(@Payload String msg, 
                          @Headers Map<String, Object> headers, 
                          Channel channel) throws Exception{
        try {
            System.out.println(msg);
            //消息確認(rèn),(deliveryTag,multiple是否確認(rèn)所有消息)
            channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (Exception e) {
            //消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊列)
            channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
            // 拒絕一條
            // channel.basicReject();
        }
    }
}

設(shè)置消息拒絕策略:

拒絕策略是指,當(dāng)消息被消費者拒絕時該如何處理,丟棄或者是重新回到隊列.

在MANUAL 模式下,在拒絕消息的方法中設(shè)置

//消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊列)
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);

在AUTO 模式下可通過RabbitListenerContainerFactory或是ListenerContainer設(shè)置,如

@Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //自動ack
        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //拒絕策略,true回到隊列 false丟棄
        containerFactory.setDefaultRequeueRejected(false);
        return containerFactory;
    }

需要注意的是,默認(rèn)的拒絕策略是回到隊列,所以,如果隊列只有一個消費者的話就會產(chǎn)生死循環(huán)

work模式-能者多勞

默認(rèn)情況下,如果有多個消費者在一個隊列上,消息是公平的分發(fā)給消費者的,一人一個輪著來,不考慮每個消費者之間的處理能力的差異,這可以通過設(shè)置預(yù)處理消息數(shù)(prefetchCount)緩解,或是使用work-能者多勞模式

work-能者多勞模式: 每個消費者的預(yù)處理消息數(shù)(prefetchCount)都設(shè)置為1,每個消費者消息確認(rèn)都為顯式確認(rèn)模式,即MANUAL,或是AUTO

如下,兩個消費者消費同一個queue上的消息,理論上consumer-one處理能力是consumer-two的兩倍

@Component
public class WorkListener {
    private int one = 1;
    private int two = 1;

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageOne(String msg) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-one 第 " + one + " 個消息 :" + msg);
        one++;
    }

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageTwo(String msg) throws InterruptedException {
        Thread.sleep(200);
        System.out.println("consumer-two 第 " + two + " 個消息 :" + msg);
        two++;
    }
}

生產(chǎn)者,使用了上一篇中介紹的默認(rèn)交換機

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private void send() {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("workQueue", "this is a message");
        }
    }

執(zhí)行結(jié)果如下,符合預(yù)期,兩個消費者幾乎同時消費完畢,且one消費的消息數(shù)是two的兩倍

......
consumer-two 第 31 個消息 :this is a message
consumer-one 第 62 個消息 :this is a message
consumer-one 第 63 個消息 :this is a message
consumer-two 第 32 個消息 :this is a message
consumer-one 第 64 個消息 :this is a message
consumer-one 第 65 個消息 :this is a message
consumer-two 第 33 個消息 :this is a message
consumer-one 第 66 個消息 :this is a message
consumer-two 第 34 個消息 :this is a message

消息格式轉(zhuǎn)換

rabbirMQ中的消息對應(yīng)到j(luò)ava中對應(yīng)的實體類是 org.springframework.amqp.core.Message,所以消息轉(zhuǎn)換接口MessageConverter 有兩個主要方法 toMessage 和 fromMessage 顧名思義,即將發(fā)送的內(nèi)容與Message的互轉(zhuǎn)

SimpleMessageConverter

spring中默認(rèn)使用的是 SimpleMessageConverter 它的兩個轉(zhuǎn)化方法如下

toMessage,根據(jù) object instanceof xxx 轉(zhuǎn)化


toMessage.png

fromMessage,根據(jù)MessageProperties的ContentType轉(zhuǎn)換

fromMessage.png

所以你大可以自己實現(xiàn)MessageConverter 接口自己轉(zhuǎn)換,當(dāng)然spring也提供了常用的轉(zhuǎn)化,如轉(zhuǎn)json,xml

Jackson2JsonMessageConverter

常用的將object與json互轉(zhuǎn)

生產(chǎn)者

@Autowired
private RabbitTemplate rabbitTemplate;

private void send() {
    //實際項目不建議這么干,spring單例模式,
    // 所以最好自己構(gòu)建一個"jasonRabbitTemplate",用的使用注入
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
}

消費者

@Bean("jasonTemplate")
public RabbitTemplate jasonRabbitTemplate(ConnectionFactory connectionFactory) {
    Jackson2JsonMessageConverter messageConverter = 
        new Jackson2JsonMessageConverter();
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //設(shè)置轉(zhuǎn)化類
    rabbitTemplate.setMessageConverter(messageConverter);
    return rabbitTemplate;
}

...

@Component
@RabbitListener(containerFactory = "jsonListenerFactory",
                queuesToDeclare = @Queue("jsonQueue"))
public class JasonListener {

    @RabbitHandler
    public void onMessage(Student student) {
        System.out.println(student);
    }
}

消息內(nèi)容:

json.png

轉(zhuǎn)載請注明出處
系列文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末郊尝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子相嵌,更是在濱河造成了極大的恐慌的畴,老刑警劉巖镊尺,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡播赁,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門吼渡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來容为,“玉大人,你說我怎么就攤上這事】脖常” “怎么了替劈?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長得滤。 經(jīng)常有香客問我陨献,道長,這世上最難降的妖魔是什么懂更? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任眨业,我火速辦了婚禮,結(jié)果婚禮上沮协,老公的妹妹穿的比我還像新娘龄捡。我一直安慰自己,他們只是感情好皂股,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布墅茉。 她就那樣靜靜地躺著,像睡著了一般呜呐。 火紅的嫁衣襯著肌膚如雪就斤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天蘑辑,我揣著相機與錄音洋机,去河邊找鬼洋魂。 笑死,一個胖子當(dāng)著我的面吹牛副砍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播豁翎,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼角骤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了心剥?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤优烧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后畦娄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體又沾,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡弊仪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了捍掺。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡曲横,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出禾嫉,到底是詐尸還是另有隱情,我是刑警寧澤熙参,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布麦备,位于F島的核電站孽椰,受9級特大地震影響凛篙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜呛梆,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纹腌。 院中可真熱鬧,春花似錦滞磺、人聲如沸升薯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽责语。三九已至目派,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間企蹭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工谅摄, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留徒河,地道東北人送漠。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像代兵,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子植影,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355