Prefetch設(shè)置
當(dāng)我們進(jìn)入RabbitMQ的GUI管理界面, 點(diǎn)入某個(gè)隊(duì)列查看消費(fèi)者的屬性時(shí), 有記錄如下
Channel | Consumer tag | Ack required | Exclusive | Prefetch count | Arguments | |
---|---|---|---|---|---|---|
172.22.0.1:57382 | amq.ctag-Gsix2DEjaFI9zVlsJJZp3Q | ● | ○ | 1 | ||
172.22.0.1:57378 | amq.ctag-_FIcIOpflMXXaBQN7xLYcA | ● | ○ | 1 |
上面的表格說(shuō)明消息的消費(fèi)需要手工ack, 且是公平分發(fā)的. 設(shè)置prefetch的方式有兩種
-
全局式設(shè)定
在application.yml文件中設(shè)定
spring.rabbitmq.listener.prefetch
即可, 這會(huì)影響到本Spring Boot應(yīng)用中所有使用默認(rèn)SimpleRabbitListenerContainerFactory
的消費(fèi)者spring: rabbitmq: host: localhost username: chris password: 123123 virtual-host: prontera listener: prefetch: 100
-
特定消費(fèi)者設(shè)置
在消費(fèi)者的配置中自定義一個(gè)
SimpleRabbitListenerContainerFactory
@Bean public SimpleRabbitListenerContainerFactory myContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(100); configurer.configure(factory, connectionFactory); return factory; }
然后在消費(fèi)者上聲明使用該ContainerFactory即可達(dá)到對(duì)特定消費(fèi)者配置prefetch的作用
@RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}", containerFactory = "myContainerFactory") public void processBootTask2(WorkUnit content) { System.out.println(content); }
Ack機(jī)制
Spring Boot Rabbit使用手工應(yīng)答機(jī)制, 當(dāng)@RabbitListener修飾的方法被調(diào)用且沒(méi)有拋出異常時(shí), Spring Boot會(huì)為我們自動(dòng)應(yīng)答. 否則會(huì)根據(jù)設(shè)定的重試機(jī)制而作出nack或reject等行為.
重試機(jī)制
重試分兩種, template的重試與listener的重試, 分別代表生產(chǎn)者與消費(fèi)者
生產(chǎn)者端的重試
spring:
rabbitmq:
template:
retry:
enabled: true
通過(guò)以上配置可以啟動(dòng)AmqpTemplate
的重試機(jī)制, 例如與RabbitMQ連接丟失的時(shí)候?qū)?huì)自動(dòng)重試事件的發(fā)布, 這個(gè)特性默認(rèn)是關(guān)閉的
消費(fèi)者端的重試
消費(fèi)者一端, 即@RabbitListener
也有像AmqpTemplate
一樣的重試機(jī)制, 當(dāng)重試次數(shù)(默認(rèn)是3)耗盡的時(shí)候, 該特性同樣也是默認(rèn)關(guān)閉的, 可以通過(guò)以下配置打開(kāi)
spring:
rabbitmq:
host: localhost
username: chris
password: 123123
virtual-host: prontera
listener:
retry:
enabled: true
如果消費(fèi)一端的重試機(jī)制沒(méi)有被啟動(dòng), 而且Listener拋出異常的話, 那么該消息就會(huì)無(wú)限地被重試(剛開(kāi)始我也暈, retry都關(guān)了居然會(huì)被無(wú)限地重試, 這個(gè)不是bug, 官方文檔就是這么寫的, 實(shí)測(cè)結(jié)果也是一樣). 通常我們的做法是拋出AmqpRejectAndDontRequeueException
以reject該消息, 同時(shí)如果有dead-letter queue被設(shè)置的話該消息就會(huì)被置入, 否則被丟棄.
如果啟動(dòng)消費(fèi)端的重試機(jī)制, 我們可以設(shè)置其最大的嘗試次數(shù)(默認(rèn)為3次)
spring:
rabbitmq:
listener:
retry:
enabled: true
max-attempts: 5
死信隊(duì)列
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DEFAULT_DIRECT_EXCHANGE, true, true);
}
@Bean
public Queue tradeQueue() {
final ImmutableMap<String, Object> args =
ImmutableMap.of("x-dead-letter-exchange", DEFAULT_DIRECT_EXCHANGE,
"x-dead-letter-routing-key", TRADE_DEAD_ROUTE_KEY);
return new Queue(TRADE_QUEUE, true, false, true, args);
}
@Bean
public Binding tradeBinding() {
return BindingBuilder.bind(tradeQueue()).to(directExchange()).with(TRADE_ROUTE_KEY);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(TRADE_DEAD_QUEUE, true, false, true);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with(TRADE_DEAD_ROUTE_KEY);
}
隊(duì)列定義不一致
對(duì)于已經(jīng)存在的Queue配置將不會(huì)被后來(lái)的覆蓋, 且會(huì)在Spring Boot控制臺(tái)拋出一條WARN日志
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'boot_task' in vhost 'prontera': received 'false' but current is 'true', class-id=50, method-id=10)
Spring Boot RabbitMQ Properties
# RABBIT (RabbitProperties)
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.
spring.rabbitmq.cache.channel.checkout-timeout= # Number of milliseconds to wait to obtain a channel if the cache size has been reached.
spring.rabbitmq.cache.channel.size= # Number of channels to retain in the cache.
spring.rabbitmq.cache.connection.mode=CHANNEL # Connection factory cache mode.
spring.rabbitmq.cache.connection.size= # Number of connections to cache.
spring.rabbitmq.connection-timeout= # Connection timeout, in milliseconds; zero for infinite.
spring.rabbitmq.dynamic=true # Create an AmqpAdmin bean.
spring.rabbitmq.host=localhost # RabbitMQ host.
spring.rabbitmq.listener.acknowledge-mode= # Acknowledge mode of container.
spring.rabbitmq.listener.auto-startup=true # Start the container automatically on startup.
spring.rabbitmq.listener.concurrency= # Minimum number of consumers.
spring.rabbitmq.listener.default-requeue-rejected= # Whether or not to requeue delivery failures; default `true`.
spring.rabbitmq.listener.max-concurrency= # Maximum number of consumers.
spring.rabbitmq.listener.prefetch= # Number of messages to be handled in a single request. It should be greater than or equal to the transaction size (if used).
spring.rabbitmq.listener.retry.enabled=false # Whether or not publishing retries are enabled.
spring.rabbitmq.listener.retry.initial-interval=1000 # Interval between the first and second attempt to deliver a message.
spring.rabbitmq.listener.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.listener.retry.max-interval=10000 # Maximum interval between attempts.
spring.rabbitmq.listener.retry.multiplier=1.0 # A multiplier to apply to the previous delivery retry interval.
spring.rabbitmq.listener.retry.stateless=true # Whether or not retry is stateless or stateful.
spring.rabbitmq.listener.transaction-size= # Number of messages to be processed in a transaction. For best results it should be less than or equal to the prefetch count.
spring.rabbitmq.password= # Login to authenticate against the broker.
spring.rabbitmq.port=5672 # RabbitMQ port.
spring.rabbitmq.publisher-confirms=false # Enable publisher confirms.
spring.rabbitmq.publisher-returns=false # Enable publisher returns.
spring.rabbitmq.requested-heartbeat= # Requested heartbeat timeout, in seconds; zero for none.
spring.rabbitmq.ssl.enabled=false # Enable SSL support.
spring.rabbitmq.ssl.key-store= # Path to the key store that holds the SSL certificate.
spring.rabbitmq.ssl.key-store-password= # Password used to access the key store.
spring.rabbitmq.ssl.trust-store= # Trust store that holds SSL certificates.
spring.rabbitmq.ssl.trust-store-password= # Password used to access the trust store.
spring.rabbitmq.ssl.algorithm= # SSL algorithm to use. By default configure by the rabbit client library.
spring.rabbitmq.template.mandatory=false # Enable mandatory messages.
spring.rabbitmq.template.receive-timeout=0 # Timeout for `receive()` methods.
spring.rabbitmq.template.reply-timeout=5000 # Timeout for `sendAndReceive()` methods.
spring.rabbitmq.template.retry.enabled=false # Set to true to enable retries in the `RabbitTemplate`.
spring.rabbitmq.template.retry.initial-interval=1000 # Interval between the first and second attempt to publish a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.max-interval=10000 # Maximum number of attempts to publish a message.
spring.rabbitmq.template.retry.multiplier=1.0 # A multiplier to apply to the previous publishing retry interval.
spring.rabbitmq.username= # Login user to authenticate to the broker.
spring.rabbitmq.virtual-host= # Virtual host to use when connecting to the broker.
作者:Chris
原博客:http://blog.chriscs.com