Dubbo之線程池設(shè)計

前言

Dubbo的線程模型中可使用4種線程池

  • CachedThreadPool
  • LimitedThreadPool
  • FixedThreadPool
  • EagerThreadPool

想深入了解線程池原理的同學渣蜗,可以閱讀我的線程池那些事專欄酥宴。

在Dubbo中什么時候會用到線程池

我們的線程主要執(zhí)行2種邏輯,一是普通IO事件宽气,比如建立連接俄认,斷開連接,二是請求IO事件,執(zhí)行業(yè)務(wù)邏輯共屈。
在Dubbo的Dispatcher擴展點會使用到這些線程池,Dispatcher這個擴展點用于決定Netty ChannelHandler中的那些事件在Dubbo提供的線程池中執(zhí)行党窜。

源碼解析

CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

緩沖線程池拗引,默認配置如下

配置 配置值
corePoolSize 0
maximumPoolSize Integer.MAX_VALUE
keepAliveTime 60s
workQueue 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue,默認queue=0幌衣,所以是SynchronousQueue
threadFactory NamedInternalThreadFactory
rejectHandler AbortPolicyWithReport

就默認配置來看矾削,和Executors創(chuàng)建的差不多,存在內(nèi)存溢出風險。
NamedInternalThreadFactory主要用于修改線程名哼凯,方便我們排查問題欲间。
AbortPolicyWithReport對拒絕的任務(wù)打印日志,也是方便排查問題断部。

LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
配置 配置值
corePoolSize 0
maximumPoolSize 200
keepAliveTime Long.MAX_VALUE,相當于無限長
workQueue 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue猎贴,默認queue=0,所以是SynchronousQueue
threadFactory NamedInternalThreadFactory
rejectHandler AbortPolicyWithReport

從keepAliveTime的配置可以看出來家坎,LimitedThreadPool線程池的特性是線程數(shù)只會增加不會減少嘱能。

FixedThreadPool

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
配置 配置值
corePoolSize 200
maximumPoolSize 200
keepAliveTime 0
workQueue 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue,默認queue=0虱疏,所以是SynchronousQueue
threadFactory NamedInternalThreadFactory
rejectHandler AbortPolicyWithReport

Dubbo的默認線程池惹骂,固定200個線程,就配置來看和LimitedThreadPool基本一致做瞪。
如果一定要說區(qū)別对粪,那就是FixedThreadPool等到創(chuàng)建完200個線程,再往隊列放任務(wù)装蓬。而LimitedThreadPool是先放隊列放任務(wù)著拭,放滿了之后才創(chuàng)建線程。

EagerThreadPool

public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}
配置 配置值
corePoolSize 0
maximumPoolSize Integer.MAX_VALUE
keepAliveTime 60s
workQueue 自定義實現(xiàn)TaskQueue牍帚,默認長度為1儡遮,使用時要自己配置下
threadFactory NamedInternalThreadFactory
rejectHandler AbortPolicyWithReport

我們知道,當線程數(shù)量達到corePoolSize之后暗赶,只有當workqueue滿了之后鄙币,才會增加工作線程。
這個線程池就是對這個特性做了優(yōu)化蹂随,首先繼承ThreadPoolExecutor實現(xiàn)EagerThreadPoolExecutor十嘿,對當前線程池提交的任務(wù)數(shù)submittedTaskCount進行記錄。
其次是通過自定義TaskQueue作為workQueue岳锁,它會在提交任務(wù)時判斷是否currentPoolSize<submittedTaskCount<maxPoolSize绩衷,然后通過workQueue的offer方法返回false導致增加工作線程。

為什么返回false會增加工作線程激率,我們回顧下ThreadPoolExecutor的execute方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //在這一步如果offer方法返回false咳燕,那么會進入到下一個else分支判斷
        //如果這個時候當前工作線程數(shù)沒有達到上限,那么就會增加一個工作線程
        //對于普通的workQueue在沒有滿的情況下是不會返回false的
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

然后看下TaskQueue的offer方法邏輯

 public boolean offer(Runnable runnable) {
        //TaskQueue持有executor引用乒躺,用于獲取當前提交任務(wù)數(shù)
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        //如果提交任務(wù)數(shù)小于當前工作線程數(shù)迟郎,說明當前工作線程足夠處理任務(wù),將提交的任務(wù)插入到工作隊列
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        //如果提交任務(wù)數(shù)大于當前工作線程數(shù)并且小于最大線程數(shù)聪蘸,說明提交的任務(wù)量線程已經(jīng)處理不過來,那么需要增加線程數(shù),返回false
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        //工作線程數(shù)到達最大線程數(shù)健爬,插入到workqueue
        return super.offer(runnable);
    }

最后看下EagerThreadPoolExecutor是如何統(tǒng)計已提交的任務(wù)

    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    //任務(wù)執(zhí)行完畢后控乾,submittedTaskCount--
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        //執(zhí)行任務(wù)前,submittedTaskCount++
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
           //失敗嘗試重新加入到workqueue
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    //如果重新加入失敗娜遵,那么拋出異常蜕衡,并且統(tǒng)計數(shù)-1
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.");
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
        }
    }

最后

一般來講使用Dubbo的默認配置,我們公司的業(yè)務(wù)量還沒到需要對線程池進行特殊配置的地步设拟。本文主要目的是慨仿,通過一個成熟框架對線程池的配置點,指導我們在實際使用線程池中需要注意的點纳胧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末镰吆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子跑慕,更是在濱河造成了極大的恐慌万皿,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件核行,死亡現(xiàn)場離奇詭異牢硅,居然都是意外死亡,警方通過查閱死者的電腦和手機芝雪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門减余,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惩系,你說我怎么就攤上這事位岔。” “怎么了蛆挫?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵赃承,是天一觀的道長。 經(jīng)常有香客問我悴侵,道長瞧剖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任可免,我火速辦了婚禮抓于,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘浇借。我一直安慰自己捉撮,他們只是感情好,可當我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布妇垢。 她就那樣靜靜地躺著巾遭,像睡著了一般肉康。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上灼舍,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天吼和,我揣著相機與錄音,去河邊找鬼骑素。 笑死炫乓,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的献丑。 我是一名探鬼主播末捣,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼创橄!你這毒婦竟也來了箩做?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤筐摘,失蹤者是張志新(化名)和其女友劉穎卒茬,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體咖熟,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡圃酵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了馍管。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片郭赐。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖确沸,靈堂內(nèi)的尸體忽然破棺而出捌锭,到底是詐尸還是另有隱情,我是刑警寧澤罗捎,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布观谦,位于F島的核電站,受9級特大地震影響桨菜,放射性物質(zhì)發(fā)生泄漏豁状。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一倒得、第九天 我趴在偏房一處隱蔽的房頂上張望泻红。 院中可真熱鬧,春花似錦霞掺、人聲如沸谊路。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缠劝。三九已至潮梯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間剩彬,已是汗流浹背酷麦。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留喉恋,地道東北人。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓母廷,卻偏偏與公主長得像轻黑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子琴昆,可洞房花燭夜當晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容