1. 源碼注意點
源碼一:啟動消費者
@Override
protected void doStart() {
checkListenerContainerAware();
super.doStart();
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
//每一個消費者,創(chuàng)建ConcurrentConsumers個線程
for (BlockingQueueConsumer consumer: this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
//使用配置的線程池去開啟線程
//(便會執(zhí)行run方法乔遮,run方法中啟動成功會使得processor的CountDownLatch-1)
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
//判斷所有的線程是否執(zhí)行run()方法啟動消費者成功价淌?沒有成功的話蝉衣,阻塞,直至所有消費者成功濒翻。
waitForConsumersToStart(processors);
}
}
若核心線程數(shù)滿了有送,但是依舊有消費者等待啟動,那么會在waitForConsumersToStart
阻塞裸删。
源碼二:串行阻塞
private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
for (AsyncMessageProcessingConsumer processor: processors) {
FatalListenerStartupException startupException = null;
try {
startupException = processor.getStartupException();
} catch(TimeoutException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
源碼三:使用CountDownLatch阻塞
private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
}
return this.startupException;
}
啟動的線程是串行的阻塞涯塔。
例如:線程池只存在1個線程匕荸,但某個隊列消費者需要10個線程枷邪。
- 創(chuàng)建消費者線程;
- 使用配置的線程池啟動消費者践惑;
- 發(fā)布創(chuàng)建消費者的消息童本;
- 串行阻塞判斷所有消費者是否創(chuàng)建完畢(默認60s)脸候;
- 理論是等待9*60s的時間运沦,唯一的消費者才會開始執(zhí)行携添;
注意點:
隊列搶占線程池線程順序是按隊列初始化順序決定的篓叶,即先初始化的隊列先占用線程池資源。若線程不足左敌,MQ打印
Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
信息矫限。配置的線程池資源被消費者占用后,是不會被釋放的取董,while循環(huán)會一直監(jiān)聽MQ消息茵汰。
配置MQ的線程池不應該配置阻塞隊列孽鸡,因為
getTaskExecutor().execute(processor);
使用線程池啟動線程,若核心線程滿了之后画侣,會使用阻塞隊列配乱。而使用阻塞隊列皮迟,會導致消費者不能被啟動。
2. 實現(xiàn)方式
配置線程池模式:
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/* setConnectionFactory:設置spring-amqp的ConnectionFactory忿檩。 */
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setPrefetchCount(1);
factory.setDefaultRequeueRejected(true);
//使用自定義線程池來啟動消費者燥透。
factory.setTaskExecutor(taskExecutor());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean("correctTaskExecutor")
@Primary
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
// 設置核心線程數(shù)
executor.setCorePoolSize(100);
// 設置最大線程數(shù)
executor.setMaxPoolSize(100);
// 設置隊列容量
executor.setQueueCapacity(0);
// 設置線程活躍時間(秒)
executor.setKeepAliveSeconds(300);
// 設置默認線程名稱
executor.setThreadNamePrefix("thread-xx-");
// 設置拒絕策略rejection-policy:當pool已經(jīng)達到max size的時候班套,丟棄
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 等待所有任務結束后再關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
3. MQ自動擴容的影響
上面說到吱韭,mq在啟動時創(chuàng)建消費者時由于線程池資源不足鱼的,會導致阻塞(影響該queue的消費消息)。
那么若是代碼中配置了factory.setMaxConcurrentConsumers(2);
猿规,擴容時發(fā)現(xiàn)線程池資源不足坎拐,有什么影響呢?
3.1 源碼分析
- 消費者線程循環(huán)的消費消息
源碼位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
if (!isActive()) {
return;
}
...
try {
initialize();
//每個消費者線程循環(huán)的去獲取消息
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
} ...
}
- 循環(huán)體的操作
注意receiveAndExecute()
方法的返回值是checkAdjust()
方法的請求參數(shù)都伪,那么理解MQ動態(tài)擴容陨晶,就必須先明白receiveAndExecute()
的邏輯以及返回值的含義帝璧。
private void mainLoop() throws Exception { // NOSONAR Exception
try {
//該方法是獲取消息的烁,并執(zhí)行業(yè)務操作(并發(fā)送ACK或NACK到MQ)。返回值true表示已經(jīng)消費消息铃芦;false表示未獲取到消息
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
//判斷是否配置了maxConcurrentConsumers襟雷,是否進行動態(tài)擴容
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
...
}
}
2.1 receiveAndExecute—業(yè)務邏輯
該方法會執(zhí)行業(yè)務邏輯耸弄,并發(fā)送ACK或NACK到MQ中。完成一個消息的消費砰诵。
但是即使發(fā)送ACK后震叮,依舊在mainLoop()
循環(huán)中苇瓣,需要完成后續(xù)邏輯才能消費下一個消息击罪。(注:不是向MQ發(fā)送ACK或NACK后立即去消費后續(xù)消息L靶健!?⒒)
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
PlatformTransactionManager transactionManager = getTransactionManager();
if (transactionManager != null) {...事務操作毫别,不關注
}
//接受消息并進行處理
return doReceiveAndExecute(consumer);
}
若是執(zhí)行nextMessage()
沒有獲取到消息,那么執(zhí)行break操作台丛,最終會導致上面的receiveAndExecute()
方法返回false砾肺。而receiveAndExecute()
的值可以決定是否動態(tài)擴容
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
//默認txSize=1
for (int i = 0; i < this.txSize; i++) {
//在內(nèi)存隊列中獲取消息
Message message = consumer.nextMessage(this.receiveTimeout);
//未獲取到消息变汪,開始下次循環(huán)
if (message == null) {
break;
}
try {
//執(zhí)行業(yè)務邏輯
executeListener(channel, message);
}
...catch操作,不關注
}
//沒有獲取到消息实胸,這個方法會返回false
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
//此處直接返回false
if (this.deliveryTags.isEmpty()) {
return false;
}
...
return true;
}
在內(nèi)存中獲取消息
由于配置了setPrefetchCount
參數(shù)童芹,所以內(nèi)存會去MQ中預取配置的消息數(shù)鲤拿,放到本地的BlockingQueue
中。
配置詳見:
【RabbitMQ-2】RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)
未獲取到消息
public Message nextMessage(long timeout) throws InterruptedException,
ShutdownSignalException {
if (logger.isTraceEnabled()) {
logger.trace("Retrieving delivery for " + this);
}
checkShutdown();
if (this.missingQueues.size() > 0) {
checkMissingQueues();
}
//poll的API描述:檢索并刪除此隊列的頭生音,等待指定的等待時間(如有必要)使元素變?yōu)榭捎谩? Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
//cancelled默認false不會執(zhí)行改邏輯
if (message == null && this.cancelled.get()) {
throw new ConsumerCancelledException();
}
//未獲取到消息返回null缀遍。
return message;
}
2.2 checkAdjust()—動態(tài)擴容業(yè)務
由上面源碼可以若是沒有獲取到消息域醇,receivedOk
返回false(注:若是獲取到消息蓉媳,但是NACK,receivedOk
返回值依舊是true)减宣。
如何保證是連續(xù)獲取或者連續(xù)空轉(zhuǎn)的玩荠?
答案:因為mainloop()
一直循環(huán)贼邓,每次均在本地queue獲取消息(最長阻塞1s)塑径。若連續(xù)9次均未獲取到消息晓勇,第10次獲取到消息灌旧,那么會重置consecutiveIdles=0
。
private void checkAdjust(boolean receivedOk) {
//成功獲取到消息
if (receivedOk) {
if (isActive(this.consumer)) {
//連續(xù)空轉(zhuǎn)標識設置為0
this.consecutiveIdles = 0;
//consecutiveActiveTrigger默認為10
if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
//開啟一個消費者線程
considerAddingAConsumer();
//練習消費的標識設置為0
this.consecutiveMessages = 0;
}
}
} else {
this.consecutiveMessages = 0;
if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
this.consecutiveIdles = 0;
}
}
}
開啟一個消費者線程:
private void considerAddingAConsumer() {
//加鎖
synchronized(this.consumersMonitor) {
//若是當前consumers數(shù)量小于配置maxConcurrentConsumers
if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
//開啟消費者有間隔時間
if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
//增加消費者。
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
開啟消費者的操作
protected void addAndStartConsumers(int delta) {
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
//每一次循環(huán)均是創(chuàng)建一個消費者
for (int i = 0; i < delta; i++) {
//判斷是否創(chuàng)建消費者
if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
break;
}
//創(chuàng)建消費者
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
//(核心)屬性的consumers+1
this.consumers.add(consumer);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Starting a new consumer: " + consumer);
}
//使用內(nèi)部的線程池執(zhí)行
getTaskExecutor().execute(processor);
//發(fā)布創(chuàng)建消費者事件
if (this.getApplicationEventPublisher() != null) {
this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
try {
//線程執(zhí)行完run方法后窿克,線程中的CountDownLatch-1毛甲。
//若線程池沒有資源玻募,那么會在此處阻塞(默認60s)
//阻塞完畢,startupException返回null跃惫。即成功創(chuàng)建
FatalListenerStartupException startupException = processor.getStartupException();
//若是線程池資源不足艾栋,只是返回null,不會執(zhí)行下面分支先较。
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
} catch(Exception e) {
consumer.stop();
logger.error("Error starting new consumer", e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}
- 上述代碼中拇泣,創(chuàng)建消費者線程是同步的流程矮锈,即某個消費者線程加鎖去創(chuàng)建苞笨。某創(chuàng)建時線程池沒有資源,會阻塞消費者線程瀑凝。
- 若線程池沒有資源,阻塞完畢后谚中,只是打印異常日志宪塔,并拋出異常囊拜,此時內(nèi)存中消費者個數(shù)為n+1個,但是只有n個線程可以消費消息
- 當連續(xù)10次空轉(zhuǎn)時
consecutiveIdles =10
南誊,且消費者線程n+1蜜托,會回收臨時擴展的消費者線程。
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized(this.consumersMonitor) {
if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
//回收消費者的核心方式
consumer.basicCancel(true);
//本地消費者集合移除消費者
this.consumers.remove(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
上面說到幔托,內(nèi)存中的消費者數(shù)量n+1柑司,但是有效的消費者n個锅劝。當回收消費者時會回收有效的消費者使得內(nèi)存消費者數(shù)量n個,有效消費者數(shù)量n-1個玻粪。
若是線程池資源不足诬垂,且配置了消費者動態(tài)擴展參數(shù)后,最終會導致有效的消費者數(shù)量為0很洋,導致消息的大量積壓K矸恪N焦丁涝焙!
注:RabbitMQ使用默認的new SimpleAsyncTaskExecutor()
開啟消費者線程孕暇,即每當使用線程是,均是new出來的隧哮。
總結:不推薦使用自定義配置的線程池铛楣,若使用,每次增加隊列時均需要注意配置好線程數(shù)鉴竭。