Dubbo 線程池策略和線程模型

開篇

?這篇文章的目的主要是分析下Dubbo當(dāng)中關(guān)于線程池的策略和線程模型浮还,主要從源碼角度出發(fā)并結(jié)合網(wǎng)上一些現(xiàn)成的文章來進(jìn)行闡述。

?個(gè)人閱讀過源碼以后的第一感覺就是成熟的框架也是靠使用了基礎(chǔ)的線程池來實(shí)現(xiàn)的闽巩,唯一的遺憾就是這篇文章沒有理順dubbo的配置和真正線程池參數(shù)之間的關(guān)系碑定,這個(gè)后面再補(bǔ)充一篇類似的文章流码。


Dubbo線程池策略

resources目錄下的com.alibaba.dubbo.common.threadpool.ThreadPool的文件

fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool

說明:

  • dubbo的線程池策略通過SPI配置文件對(duì)外提供,在com.alibaba.dubbo.common.threadpool.ThreadPool文件當(dāng)中定義延刘。
  • dubbo的線程池策略對(duì)外提供了三種策略漫试,分別是fixed、cached碘赖、limited三類驾荣。
  • 每類策略的定義見下述源碼。


CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

說明:

  • CachedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newCachedThreadPool()方法普泡。


FixedThreadPool

public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

說明:

  • FixedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newFixedThreadPool()方法播掷。


LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

說明:

  • LimitedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newCachedThreadPool()方法。
  • LimitedThreadPool的區(qū)別在于線程池中的線程永遠(yuǎn)不會(huì)過期撼班,因?yàn)閍live時(shí)間為最大值歧匈。


NamedThreadFactory

public class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    private final String mPrefix;

    private final boolean mDaemon;

    private final ThreadGroup mGroup;

    public NamedThreadFactory() {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }

    public ThreadGroup getThreadGroup() {
        return mGroup;
    }
}

說明

  • 提供了線程池中線程創(chuàng)建的工廠,推薦使用線程池時(shí)候一定要自定義線程池工廠砰嘁,便于定位線程的用途件炉。


Dubbo線程模型

resources目錄下的com.alibaba.dubbo.remoting.Dispatcher文件

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

說明:

  • Dubbo線程模型在resources目錄下的com.alibaba.dubbo.remoting.Dispatcher文件當(dāng)中。
  • 提供5類線程模型矮湘,分別是all斟冕、direct、message缅阳、execution磕蛇、connection。
  • 每類的用途見下面的介紹十办。


Dubbo線程模型對(duì)比

根據(jù)請(qǐng)求的消息類被IO線程處理還是被業(yè)務(wù)線程池處理秀撇,Dubbo提供了下面幾種線程模型:

  • all : (AllDispatcher類)所有消息都派發(fā)到業(yè)務(wù)線程池,這些消息包括請(qǐng)求/響應(yīng)/連接事件/斷開事件/心跳等向族,這些線程模型如下圖:
AllDispatcher
  • direct : (DirectDispacher類)所有消息都不派發(fā)到業(yè)務(wù)線程池呵燕,全部在IO線程上直接執(zhí)行,模型如下圖:
DirectDispacher
  • message : (MessageOnlyDispatcher類)只有請(qǐng)求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池炸枣,其他連接斷開事件/心跳等消息,直接在IO線程上執(zhí)行弄唧,模型圖如下:
MessageOnlyDispatcher
  • execution:(ExecutionDispatcher類)只把請(qǐng)求類消息派發(fā)到業(yè)務(wù)線程池處理适肠,但是響應(yīng)和其它連接斷開事件,心跳等消息直接在IO線程上執(zhí)行候引,模型如下圖:
execution
  • connection:(ConnectionOrderedDispatcher類)在IO線程上侯养,將連接斷開事件放入隊(duì)列,有序逐個(gè)執(zhí)行澄干,其它消息派發(fā)到業(yè)務(wù)線程池處理逛揩,模型如下圖:
ConnectionOrderedDispatcher


AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {}
    }

    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {}
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

說明:

  • AllChannelHandler的connected、disconnected、received統(tǒng)一通過業(yè)務(wù)線程池處理采章。
  • cexecutor.execute(new ChannelEventRunnable())統(tǒng)一提交葛菇。


ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }
}


public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {}
    }

    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {}
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}

說明:

  • ConnectionOrderedDispatcher的connected和disconnected事件通過connectionExecutor實(shí)現(xiàn)。
  • ConnectionOrderedDispatcher的received事件通過單獨(dú)的executor去實(shí)現(xiàn)逞泄。
  • ConnectionOrderedDispatcher的連接處理和消息處理通過不同的executor處理患整。


DirectDispatcher

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

說明:

  • DirectDispatcher內(nèi)部的處理沒有用到線程池,統(tǒng)一由IO線程去處理喷众。


ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }
}

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    }

}

說明:

  • ExecutionDispatcher的實(shí)現(xiàn)和AllDispatcher幾乎相同各谚。
  • ExecutionDispatcher的連接和消息處理統(tǒng)一由業(yè)務(wù)線程池處理。


MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }
}


public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {}
    }

}

說明:

  • MessageOnlyDispatcher的消息處理通過業(yè)務(wù)線程池去執(zhí)行到千。
  • MessageOnlyDispatcher的連接事件通過IO線程去執(zhí)行昌渤。


參考文章

Dubbo學(xué)習(xí)筆記8:Dubbo的線程模型與線程池策略

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市憔四,隨后出現(xiàn)的幾起案子膀息,更是在濱河造成了極大的恐慌,老刑警劉巖加矛,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件履婉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡斟览,警方通過查閱死者的電腦和手機(jī)毁腿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來苛茂,“玉大人已烤,你說我怎么就攤上這事〖搜颍” “怎么了胯究?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)躁绸。 經(jīng)常有香客問我裕循,道長(zhǎng),這世上最難降的妖魔是什么净刮? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任剥哑,我火速辦了婚禮,結(jié)果婚禮上淹父,老公的妹妹穿的比我還像新娘株婴。我一直安慰自己,他們只是感情好暑认,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布困介。 她就那樣靜靜地躺著大审,像睡著了一般。 火紅的嫁衣襯著肌膚如雪座哩。 梳的紋絲不亂的頭發(fā)上徒扶,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音八回,去河邊找鬼酷愧。 笑死,一個(gè)胖子當(dāng)著我的面吹牛缠诅,可吹牛的內(nèi)容都是我干的溶浴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼管引,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼士败!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起褥伴,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤谅将,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后重慢,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饥臂,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年似踱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了隅熙。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡核芽,死狀恐怖囚戚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情轧简,我是刑警寧澤驰坊,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站哮独,受9級(jí)特大地震影響拳芙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜皮璧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一舟扎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧恶导,春花似錦浆竭、人聲如沸浸须。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至裂垦,卻和暖如春顺囊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蕉拢。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國打工特碳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人晕换。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓午乓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親闸准。 傳聞我的和親對(duì)象是個(gè)殘疾皇子益愈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容