Spring Boot RabbitMQ快速入門 (2)

Prefetch設(shè)置

當(dāng)我們進(jìn)入RabbitMQ的GUI管理界面, 點(diǎn)入某個(gè)隊(duì)列查看消費(fèi)者的屬性時(shí), 有記錄如下

Channel Consumer tag Ack required Exclusive Prefetch count Arguments
172.22.0.1:57382 amq.ctag-Gsix2DEjaFI9zVlsJJZp3Q 1
172.22.0.1:57378 amq.ctag-_FIcIOpflMXXaBQN7xLYcA 1

上面的表格說(shuō)明消息的消費(fèi)需要手工ack, 且是公平分發(fā)的. 設(shè)置prefetch的方式有兩種

  • 全局式設(shè)定

    在application.yml文件中設(shè)定spring.rabbitmq.listener.prefetch即可, 這會(huì)影響到本Spring Boot應(yīng)用中所有使用默認(rèn)SimpleRabbitListenerContainerFactory的消費(fèi)者

    spring:
      rabbitmq:
        host: localhost
        username: chris
        password: 123123
        virtual-host: prontera
        listener:
          prefetch: 100
    
  • 特定消費(fèi)者設(shè)置

    在消費(fèi)者的配置中自定義一個(gè)SimpleRabbitListenerContainerFactory

    @Bean
    public SimpleRabbitListenerContainerFactory myContainerFactory(
      SimpleRabbitListenerContainerFactoryConfigurer configurer, 
      ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setPrefetchCount(100);
      configurer.configure(factory, connectionFactory);
      return factory;
    }
    
    

    然后在消費(fèi)者上聲明使用該ContainerFactory即可達(dá)到對(duì)特定消費(fèi)者配置prefetch的作用

    @RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}", containerFactory = "myContainerFactory")
    public void processBootTask2(WorkUnit content) {
      System.out.println(content);
    }
    
    

Ack機(jī)制

Spring Boot Rabbit使用手工應(yīng)答機(jī)制, 當(dāng)@RabbitListener修飾的方法被調(diào)用且沒(méi)有拋出異常時(shí), Spring Boot會(huì)為我們自動(dòng)應(yīng)答. 否則會(huì)根據(jù)設(shè)定的重試機(jī)制而作出nack或reject等行為.

重試機(jī)制

重試分兩種, template的重試與listener的重試, 分別代表生產(chǎn)者與消費(fèi)者

生產(chǎn)者端的重試

spring:
  rabbitmq:
    template:
      retry:
        enabled: true

通過(guò)以上配置可以啟動(dòng)AmqpTemplate的重試機(jī)制, 例如與RabbitMQ連接丟失的時(shí)候?qū)?huì)自動(dòng)重試事件的發(fā)布, 這個(gè)特性默認(rèn)是關(guān)閉的

消費(fèi)者端的重試

消費(fèi)者一端, 即@RabbitListener也有像AmqpTemplate一樣的重試機(jī)制, 當(dāng)重試次數(shù)(默認(rèn)是3)耗盡的時(shí)候, 該特性同樣也是默認(rèn)關(guān)閉的, 可以通過(guò)以下配置打開(kāi)

spring:
  rabbitmq:
    host: localhost
    username: chris
    password: 123123
    virtual-host: prontera
    listener:
      retry:
        enabled: true

如果消費(fèi)一端的重試機(jī)制沒(méi)有被啟動(dòng), 而且Listener拋出異常的話, 那么該消息就會(huì)無(wú)限地被重試(剛開(kāi)始我也暈, retry都關(guān)了居然會(huì)被無(wú)限地重試, 這個(gè)不是bug, 官方文檔就是這么寫的, 實(shí)測(cè)結(jié)果也是一樣). 通常我們的做法是拋出AmqpRejectAndDontRequeueException以reject該消息, 同時(shí)如果有dead-letter queue被設(shè)置的話該消息就會(huì)被置入, 否則被丟棄.

如果啟動(dòng)消費(fèi)端的重試機(jī)制, 我們可以設(shè)置其最大的嘗試次數(shù)(默認(rèn)為3次)

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        max-attempts: 5

死信隊(duì)列

@Bean
public DirectExchange directExchange() {
  return new DirectExchange(DEFAULT_DIRECT_EXCHANGE, true, true);
}

@Bean
public Queue tradeQueue() {
  final ImmutableMap<String, Object> args = 
    ImmutableMap.of("x-dead-letter-exchange", DEFAULT_DIRECT_EXCHANGE,
                    "x-dead-letter-routing-key", TRADE_DEAD_ROUTE_KEY);
  return new Queue(TRADE_QUEUE, true, false, true, args);
}

@Bean
public Binding tradeBinding() {
  return BindingBuilder.bind(tradeQueue()).to(directExchange()).with(TRADE_ROUTE_KEY);
}

@Bean
public Queue deadLetterQueue() {
  return new Queue(TRADE_DEAD_QUEUE, true, false, true);
}

@Bean
public Binding deadLetterBinding() {
  return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with(TRADE_DEAD_ROUTE_KEY);
}

隊(duì)列定義不一致

對(duì)于已經(jīng)存在的Queue配置將不會(huì)被后來(lái)的覆蓋, 且會(huì)在Spring Boot控制臺(tái)拋出一條WARN日志

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'boot_task' in vhost 'prontera': received 'false' but current is 'true', class-id=50, method-id=10)

Spring Boot RabbitMQ Properties

# RABBIT (RabbitProperties)
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.
spring.rabbitmq.cache.channel.size= # Number of channels to retain in the cache.
spring.rabbitmq.cache.connection.mode=CHANNEL # Connection factory cache mode.
spring.rabbitmq.cache.connection.size= # Number of connections to cache.
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
spring.rabbitmq.host=localhost # RabbitMQ host.
spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container.
spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup.
spring.rabbitmq.listener.concurrency= # Minimum number of consumers.
spring.rabbitmq.listener.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers.
spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
spring.rabbitmq.listener.retry.enabled=false # Whether or not publishing retries are enabled.
spring.rabbitmq.listener.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
spring.rabbitmq.listener.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.listener.retry.max-interval=10000 # Maximum interval between attempts.
spring.rabbitmq.listener.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
spring.rabbitmq.listener.retry.stateless=true # Whether or not retry is stateless or stateful.
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
spring.rabbitmq.publisher-returns=false # Enable publisher returns.
spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none.
spring.rabbitmq.ssl.enabled=false # Enable SSL support.
spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate.
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
spring.rabbitmq.ssl.algorithm= # SSL algorithm to use. By default configure by the rabbit client library.
spring.rabbitmq.template.mandatory=false # Enable mandatory messages.
spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods.
spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods.
spring.rabbitmq.template.retry.enabled=false # Set to true to enable retries in the `RabbitTemplate`.
spring.rabbitmq.template.retry.initial-interval=1000 # Interval between the first and second attempt to publish a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.max-interval=10000 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.multiplier=1.0 # A multiplier to apply to the previous publishing retry interval.
spring.rabbitmq.username= # Login user to authenticate to the broker.
spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker.

作者:Chris
原博客:http://blog.chriscs.com

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末零酪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子氏涩,更是在濱河造成了極大的恐慌,老刑警劉巖攒射,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蜡励,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡中捆,警方通過(guò)查閱死者的電腦和手機(jī)漂彤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門雾消,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人挫望,你說(shuō)我怎么就攤上這事立润。” “怎么了媳板?”我有些...
    開(kāi)封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵桑腮,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蛉幸,道長(zhǎng)破讨,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任巨缘,我火速辦了婚禮添忘,結(jié)果婚禮上采呐,老公的妹妹穿的比我還像新娘若锁。我一直安慰自己,他們只是感情好斧吐,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布又固。 她就那樣靜靜地躺著,像睡著了一般煤率。 火紅的嫁衣襯著肌膚如雪仰冠。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天蝶糯,我揣著相機(jī)與錄音洋只,去河邊找鬼。 笑死昼捍,一個(gè)胖子當(dāng)著我的面吹牛识虚,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播妒茬,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼担锤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了乍钻?” 一聲冷哼從身側(cè)響起肛循,我...
    開(kāi)封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤铭腕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后多糠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體累舷,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年夹孔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了笋粟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡析蝴,死狀恐怖害捕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情闷畸,我是刑警寧澤尝盼,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站佑菩,受9級(jí)特大地震影響盾沫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜殿漠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一赴精、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧绞幌,春花似錦蕾哟、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至票渠,卻和暖如春逐哈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背问顷。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工昂秃, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人杜窄。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓肠骆,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親羞芍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子哗戈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容