【RabbitMQ-9】自定義配置線程池(線程池資源不足-MQ初始化隊列&&MQ動態(tài)擴容影響)

1. 源碼注意點

源碼一:啟動消費者

@Override 
protected void doStart() {
    checkListenerContainerAware();
    super.doStart();
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            throw new IllegalStateException("A stopped container should not have consumers");
        }
        int newConsumers = initializeConsumers();
        if (this.consumers == null) {
            logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
            return;
        }
        if (newConsumers <= 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Consumers are already running");
            }
            return;
        }
        Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
        //每一個消費者,創(chuàng)建ConcurrentConsumers個線程
        for (BlockingQueueConsumer consumer: this.consumers) {
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            //使用配置的線程池去開啟線程
            //(便會執(zhí)行run方法乔遮,run方法中啟動成功會使得processor的CountDownLatch-1)
            getTaskExecutor().execute(processor);
            if (getApplicationEventPublisher() != null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        //判斷所有的線程是否執(zhí)行run()方法啟動消費者成功价淌?沒有成功的話蝉衣,阻塞,直至所有消費者成功濒翻。
        waitForConsumersToStart(processors);
    }
}

若核心線程數(shù)滿了有送,但是依舊有消費者等待啟動,那么會在waitForConsumersToStart阻塞裸删。

源碼二:串行阻塞

private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
    for (AsyncMessageProcessingConsumer processor: processors) {
        FatalListenerStartupException startupException = null;
        try {
            startupException = processor.getStartupException();
        } catch(TimeoutException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
        if (startupException != null) {
            throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
        }
    }
}

源碼三:使用CountDownLatch阻塞

private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
    if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
        logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
    }
    return this.startupException;
}

啟動的線程是串行的阻塞涯塔。

例如:線程池只存在1個線程匕荸,但某個隊列消費者需要10個線程枷邪。

  1. 創(chuàng)建消費者線程;
  2. 使用配置的線程池啟動消費者践惑;
  3. 發(fā)布創(chuàng)建消費者的消息童本;
  4. 串行阻塞判斷所有消費者是否創(chuàng)建完畢(默認60s)脸候;
  5. 理論是等待9*60s的時間运沦,唯一的消費者才會開始執(zhí)行携添;

注意點:

  1. 隊列搶占線程池線程順序是按隊列初始化順序決定的篓叶,即先初始化的隊列先占用線程池資源。若線程不足左敌,MQ打印Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?信息矫限。

  2. 配置的線程池資源被消費者占用后,是不會被釋放的取董,while循環(huán)會一直監(jiān)聽MQ消息茵汰。

配置MQ的線程池不應該配置阻塞隊列孽鸡,因為getTaskExecutor().execute(processor);使用線程池啟動線程,若核心線程滿了之后画侣,會使用阻塞隊列配乱。而使用阻塞隊列皮迟,會導致消費者不能被啟動。

2. 實現(xiàn)方式

配置線程池模式:

@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

        @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        /* setConnectionFactory:設置spring-amqp的ConnectionFactory忿檩。 */
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(2);
        factory.setPrefetchCount(1);
        factory.setDefaultRequeueRejected(true);
        //使用自定義線程池來啟動消費者燥透。
        factory.setTaskExecutor(taskExecutor());
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }


    @Bean("correctTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
        // 設置核心線程數(shù)
        executor.setCorePoolSize(100);
        // 設置最大線程數(shù)
        executor.setMaxPoolSize(100);
        // 設置隊列容量
        executor.setQueueCapacity(0);
        // 設置線程活躍時間(秒)
        executor.setKeepAliveSeconds(300);
        // 設置默認線程名稱
        executor.setThreadNamePrefix("thread-xx-");
        // 設置拒絕策略rejection-policy:當pool已經(jīng)達到max size的時候班套,丟棄
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任務結束后再關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}

3. MQ自動擴容的影響

上面說到吱韭,mq在啟動時創(chuàng)建消費者時由于線程池資源不足鱼的,會導致阻塞(影響該queue的消費消息)。

那么若是代碼中配置了factory.setMaxConcurrentConsumers(2);猿规,擴容時發(fā)現(xiàn)線程池資源不足坎拐,有什么影響呢?

3.1 源碼分析

  1. 消費者線程循環(huán)的消費消息

源碼位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    if (!isActive()) {
        return;
    }
    ...
    try {
        initialize();
        //每個消費者線程循環(huán)的去獲取消息
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            mainLoop();
        }
    } ...
}
  1. 循環(huán)體的操作

注意receiveAndExecute()方法的返回值是checkAdjust()方法的請求參數(shù)都伪,那么理解MQ動態(tài)擴容陨晶,就必須先明白receiveAndExecute()的邏輯以及返回值的含義帝璧。

private void mainLoop() throws Exception { // NOSONAR Exception
    try {
        //該方法是獲取消息的烁,并執(zhí)行業(yè)務操作(并發(fā)送ACK或NACK到MQ)。返回值true表示已經(jīng)消費消息铃芦;false表示未獲取到消息
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        //判斷是否配置了maxConcurrentConsumers襟雷,是否進行動態(tài)擴容
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
            checkAdjust(receivedOk);
        }
        ...
    }
}

2.1 receiveAndExecute—業(yè)務邏輯

該方法會執(zhí)行業(yè)務邏輯耸弄,并發(fā)送ACK或NACK到MQ中。完成一個消息的消費砰诵。
但是即使發(fā)送ACK后震叮,依舊在mainLoop()循環(huán)中苇瓣,需要完成后續(xù)邏輯才能消費下一個消息击罪。(注:不是向MQ發(fā)送ACK或NACK后立即去消費后續(xù)消息L靶健!?⒒)

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
    PlatformTransactionManager transactionManager = getTransactionManager();
    if (transactionManager != null) {...事務操作毫别,不關注
    }
    //接受消息并進行處理
    return doReceiveAndExecute(consumer);
}

若是執(zhí)行nextMessage()沒有獲取到消息,那么執(zhí)行break操作台丛,最終會導致上面的receiveAndExecute()方法返回false砾肺。而receiveAndExecute()的值可以決定是否動態(tài)擴容

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
    Channel channel = consumer.getChannel();
    //默認txSize=1
    for (int i = 0; i < this.txSize; i++) {

        //在內(nèi)存隊列中獲取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        //未獲取到消息变汪,開始下次循環(huán)
        if (message == null) {
            break;
        }
        try {
            //執(zhí)行業(yè)務邏輯  
            executeListener(channel, message);
        } 
       ...catch操作,不關注
    }
    //沒有獲取到消息实胸,這個方法會返回false
    return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
    //此處直接返回false
    if (this.deliveryTags.isEmpty()) {
        return false;
    }
    ...
    return true;
}

在內(nèi)存中獲取消息

由于配置了setPrefetchCount參數(shù)童芹,所以內(nèi)存會去MQ中預取配置的消息數(shù)鲤拿,放到本地的BlockingQueue中。
配置詳見:
【RabbitMQ-2】RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)

未獲取到消息

public Message nextMessage(long timeout) throws InterruptedException,
ShutdownSignalException {
    if (logger.isTraceEnabled()) {
        logger.trace("Retrieving delivery for " + this);
    }
    checkShutdown();
    if (this.missingQueues.size() > 0) {
        checkMissingQueues();
    }
    //poll的API描述:檢索并刪除此隊列的頭生音,等待指定的等待時間(如有必要)使元素變?yōu)榭捎谩?    Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
    //cancelled默認false不會執(zhí)行改邏輯
    if (message == null && this.cancelled.get()) {
        throw new ConsumerCancelledException();
    }
    //未獲取到消息返回null缀遍。
    return message;
}

2.2 checkAdjust()—動態(tài)擴容業(yè)務

由上面源碼可以若是沒有獲取到消息域醇,receivedOk返回false(注:若是獲取到消息蓉媳,但是NACK,receivedOk返回值依舊是true)减宣。

如何保證是連續(xù)獲取或者連續(xù)空轉(zhuǎn)的玩荠?
答案:因為mainloop()一直循環(huán)贼邓,每次均在本地queue獲取消息(最長阻塞1s)塑径。若連續(xù)9次均未獲取到消息晓勇,第10次獲取到消息灌旧,那么會重置consecutiveIdles=0

private void checkAdjust(boolean receivedOk) {
    //成功獲取到消息
    if (receivedOk) {
        if (isActive(this.consumer)) {
            //連續(xù)空轉(zhuǎn)標識設置為0
            this.consecutiveIdles = 0;
            //consecutiveActiveTrigger默認為10
            if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
               //開啟一個消費者線程
                considerAddingAConsumer();
                //練習消費的標識設置為0
                this.consecutiveMessages = 0;
            }
        }
    } else {
        this.consecutiveMessages = 0;
        if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
            considerStoppingAConsumer(this.consumer);
            this.consecutiveIdles = 0;
        }
    }
}

開啟一個消費者線程:

private void considerAddingAConsumer() {
    //加鎖
    synchronized(this.consumersMonitor) {
      //若是當前consumers數(shù)量小于配置maxConcurrentConsumers
        if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
            long now = System.currentTimeMillis();
            //開啟消費者有間隔時間
            if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
                //增加消費者。
                this.addAndStartConsumers(1);
                this.lastConsumerStarted = now;
            }
        }
    }
}

開啟消費者的操作

protected void addAndStartConsumers(int delta) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            //每一次循環(huán)均是創(chuàng)建一個消費者
            for (int i = 0; i < delta; i++) {
                 //判斷是否創(chuàng)建消費者
                if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
                    break;
                }
                //創(chuàng)建消費者
                BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                //(核心)屬性的consumers+1
                this.consumers.add(consumer);
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Starting a new consumer: " + consumer);
                }  
                //使用內(nèi)部的線程池執(zhí)行
                getTaskExecutor().execute(processor);
                //發(fā)布創(chuàng)建消費者事件
                if (this.getApplicationEventPublisher() != null) {
                    this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
                try {
                    //線程執(zhí)行完run方法后窿克,線程中的CountDownLatch-1毛甲。
                    //若線程池沒有資源玻募,那么會在此處阻塞(默認60s)
                    //阻塞完畢,startupException返回null跃惫。即成功創(chuàng)建
                    FatalListenerStartupException startupException = processor.getStartupException();
                    //若是線程池資源不足艾栋,只是返回null,不會執(zhí)行下面分支先较。
                    if (startupException != null) {
                        this.consumers.remove(consumer);
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                } catch(InterruptedException ie) {
                    Thread.currentThread().interrupt();
                } catch(Exception e) {
                    consumer.stop();
                    logger.error("Error starting new consumer", e);
                    this.cancellationLock.release(consumer);
                    this.consumers.remove(consumer);
                }
            }
        }
    }
}
  1. 上述代碼中拇泣,創(chuàng)建消費者線程是同步的流程矮锈,即某個消費者線程加鎖去創(chuàng)建苞笨。某創(chuàng)建時線程池沒有資源,會阻塞消費者線程瀑凝。
  2. 若線程池沒有資源,阻塞完畢后谚中,只是打印異常日志宪塔,并拋出異常囊拜,此時內(nèi)存中消費者個數(shù)為n+1個,但是只有n個線程可以消費消息
  3. 當連續(xù)10次空轉(zhuǎn)時consecutiveIdles =10南誊,且消費者線程n+1蜜托,會回收臨時擴展的消費者線程。
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
            long now = System.currentTimeMillis();
            if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
                //回收消費者的核心方式
                consumer.basicCancel(true);
                //本地消費者集合移除消費者
                this.consumers.remove(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Idle consumer terminating: " + consumer);
                }
                this.lastConsumerStopped = now;
            }
        }
    }
}

上面說到幔托,內(nèi)存中的消費者數(shù)量n+1柑司,但是有效的消費者n個锅劝。當回收消費者時會回收有效的消費者使得內(nèi)存消費者數(shù)量n個,有效消費者數(shù)量n-1個玻粪。

若是線程池資源不足诬垂,且配置了消費者動態(tài)擴展參數(shù)后,最終會導致有效的消費者數(shù)量為0很洋,導致消息的大量積壓K矸恪N焦丁涝焙!

注:RabbitMQ使用默認的new SimpleAsyncTaskExecutor()開啟消費者線程孕暇,即每當使用線程是,均是new出來的隧哮。

總結:不推薦使用自定義配置的線程池铛楣,若使用,每次增加隊列時均需要注意配置好線程數(shù)鉴竭。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末搏存,一起剝皮案震驚了整個濱河市矢洲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌责静,老刑警劉巖盖桥,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揩徊,死亡現(xiàn)場離奇詭異,居然都是意外死亡熄赡,警方通過查閱死者的電腦和手機齿税,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溜在,“玉大人他托,你說我怎么就攤上這事赏参⊙刂眩” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵韧掩,是天一觀的道長疗锐。 經(jīng)常有香客問我,道長滑臊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任关划,我火速辦了婚禮贮折,結果婚禮上脱货,老公的妹妹穿的比我還像新娘。我一直安慰自己振峻,他們只是感情好择份,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布荣赶。 她就那樣靜靜地躺著鸽斟,像睡著了一般富蓄。 火紅的嫁衣襯著肌膚如雪慢逾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天口注,我揣著相機與錄音寝志,去河邊找鬼。 笑死材部,一個胖子當著我的面吹牛舰攒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播兽叮,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼鹦聪,長吁一口氣:“原來是場噩夢啊……” “哼蒂秘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起规丽,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤撇贺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后艘狭,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡遵倦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年梧躺,在試婚紗的時候發(fā)現(xiàn)自己被綠了歧寺。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蛀缝,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情嗤练,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布煞抬,位于F島的核電站革答,受9級特大地震影響曙强,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜碟嘴,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一娜扇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧雀瓢,春花似錦、人聲如沸登疗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽智政。三九已至,卻和暖如春续捂,著一層夾襖步出監(jiān)牢的瞬間垦垂,已是汗流浹背牙瓢。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工矾克, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人酒繁。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓控妻,卻偏偏與公主長得像,于是被迫代替她去往敵國和親弓候。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容