Elasticsearch 6.1.0線程池介紹

開篇

?這篇文章主要是用來講解ES線程池(EsExecutors)的實(shí)現(xiàn),然后象征性的和JDK的Executors實(shí)現(xiàn)進(jìn)行了簡單的對比挑宠,看了這篇文章以后要對Executors和ThreadPoolExecutor的使用更有信心才對菲盾。


elasticsearch線程池配置

public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {

  final int availableProcessors = EsExecutors.numberOfProcessors(settings);
  final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
  final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
  final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);

  builders.put(Names.GENERIC, 
  new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));

  builders.put(Names.INDEX, 
  new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));

  builders.put(Names.BULK, 
  new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); 

  builders.put(Names.GET, 
  new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));

  builders.put(Names.SEARCH, 
  new AutoQueueAdjustingExecutorBuilder(settings,
                        Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));

  builders.put(Names.MANAGEMENT, 
  new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));

  builders.put(Names.LISTENER, 
  new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));

  builders.put(Names.FLUSH, 
  new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

  builders.put(Names.REFRESH, 
  new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));

  builders.put(Names.WARMER, 
  new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

  builders.put(Names.SNAPSHOT, 
  new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

  builders.put(Names.FETCH_SHARD_STARTED, 
  new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));

  builders.put(Names.FORCE_MERGE, 
  new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));

  builders.put(Names.FETCH_SHARD_STORE, 
  new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
}

說明:

  • elasticsearch線程池根據(jù)作用的不同主要分為兩大類 ScalingExecutor和FixedExecutor。
  • ScalingExecutor表示線程池中的線程數(shù)是動(dòng)態(tài)可變的各淀。
  • FixedExecutor表示線程池中的線程池是不可變的懒鉴。


elasticsearch線程池分類

elasticsearch線程池的線程按照源碼的實(shí)現(xiàn)來看分為FIXED和SCALING兩大類FIXED的意思是固定線程的數(shù)量(core thread個(gè)數(shù) = max thread個(gè)數(shù))揪阿,SCALING的意思是動(dòng)態(tài)調(diào)整線程數(shù)量(core thread個(gè)數(shù) != max thread個(gè)數(shù))疗我。

FIXED

說明:大小固定設(shè)置的threadpool,它有一個(gè)queue來存放pending的請求南捂,其中pool的大小默認(rèn)是core*5吴裤,queue_size默認(rèn)是-1(即是無限制)。

  • LISTENER:用作client的操作溺健,默認(rèn)大小halfProcMaxAt10麦牺,queue_size=-1無限制;

  • GET:用作get操作鞭缭,默認(rèn)大小availableProcessors剖膳,queue_size為1000;

  • INDEX:用作index或delete操作岭辣,默認(rèn)大小availableProcessors吱晒,queue_size為200;

  • BULK:用作bulk操作沦童,默認(rèn)大小為availableProcessors仑濒,queue_size為200叹话;

  • SEARCH:用作count或是search操作,默認(rèn)大小((availableProcessors * 3) / 2) + 1墩瞳;queue_size為1000驼壶;

  • SUGGEST:用作suggest操作,默認(rèn)大小availableProcessors喉酌,queue_size為1000热凹;

  • PERCOLATE:用作percolate,默認(rèn)大小為availableProcessors泪电,queue_size為1000般妙;

  • FORCE_MERGE:用作force_merge操作(2.1之前叫做optimize),默認(rèn)大小為1歪架;


SCALING

說明:擁有可變大小的pool股冗,其值可在1和設(shè)置值之間。

  • GENERIC:通用的操作和蚪,比如node的discovery,默認(rèn)大小genericThreadPoolMax烹棉,默認(rèn)keep alive時(shí)間是30sec;

  • MANAGEMENT:用作ES的管理攒霹,比如集群的管理;默認(rèn)大小5浆洗,keep alive時(shí)間為5min催束;

  • FLUSH:用作flush操作,默認(rèn)大小為halfProcMaxAt5伏社,keep alive時(shí)間為5min抠刺;

  • REFRESH:用作refresh操作,默認(rèn)大小為halfProcMaxAt10摘昌,keep alive時(shí)間為5min速妖;

  • WARMER:用作index warm-up操作,默認(rèn)大小為halfProcMaxAt5聪黎,keep alive時(shí)間為5min罕容;

  • SNAPSHOT:用作snapshot操作,默認(rèn)大小為halfProcMaxAt5稿饰,keep alive時(shí)間為5min锦秒;

  • FETCH_SHARD_STARTED:用作fetch shard開始操作,默認(rèn)大小availableProcessors * 2喉镰,keep alive時(shí)間為5min旅择;

  • FETCH_SHARD_STORE:用作fetch shard存儲(chǔ)操作,默認(rèn)大小availableProcessors * 2侣姆,keep alive時(shí)間為5min生真;


JDK的Executors

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
}

說明:

  • Executors的newFixedThreadPool創(chuàng)建固定線程數(shù)量的線程池脖咐。
  • Executors的newCachedThreadPool創(chuàng)建可動(dòng)態(tài)調(diào)整線程數(shù)量的線程池。
  • Executors創(chuàng)建的是ThreadPoolExecutor對象汇歹。


Elasticsearch的EsExecutors

public class EsExecutors {

    public static final Setting<Integer> PROCESSORS_SETTING =
        Setting.intSetting("processors", Runtime.getRuntime().availableProcessors(), 1, Property.NodeScope);

    public static int numberOfProcessors(final Settings settings) {
        return PROCESSORS_SETTING.get(settings);
    }

    public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, 
                          TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
        ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
        EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, 
                            unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
        queue.executor = executor;
        return executor;
    }

    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, 
                             ThreadFactory threadFactory, ThreadContext contextHolder) {
        BlockingQueue<Runnable> queue;
        if (queueCapacity < 0) {
            queue = ConcurrentCollections.newBlockingQueue();
        } else {
            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
        }
        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, 
                                        threadFactory, new EsAbortPolicy(), contextHolder);
    }

    public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
                                                         int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
                                                         ThreadFactory threadFactory, ThreadContext contextHolder) {
        if (initialQueueCapacity <= 0) {
            throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
                            initialQueueCapacity);
        }
        ResizableBlockingQueue<Runnable> queue =
                new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
        return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
                queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
                new EsAbortPolicy(), contextHolder);
    }
}

說明:

  • newScaling()方法創(chuàng)建可擴(kuò)展線程數(shù)量的線程池屁擅,淘汰策略使用ForceQueuePolicy。
  • newFixed()方法創(chuàng)建創(chuàng)建固定線程數(shù)量的線程池产弹,淘汰策略使用EsAbortPolicy派歌。
  • newAutoQueueFixed()方法創(chuàng)建固定線程數(shù)量但是Queue隊(duì)列數(shù)量可以動(dòng)態(tài)調(diào)整的線程池,淘汰策略使用EsAbortPolicy痰哨。胶果。
  • EsExecutors內(nèi)部創(chuàng)建的是EsThreadPoolExecutor對象。
  • EsExecutors的實(shí)現(xiàn)借鑒了JDK的Executors接口斤斧,給我們提供了自定Executors的思路早抠。


EsExecutors的EsThreadPoolExecutor

public class EsThreadPoolExecutor extends ThreadPoolExecutor {
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, 
             workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

    @Override
    public void execute(final Runnable command) {
        doExecute(wrapRunnable(command));
    }

    protected void doExecute(final Runnable command) {
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        assert assertDefaultContext(r);
    }
}

說明:

  • EsThreadPoolExecutor繼承自ThreadPoolExecutor對象,構(gòu)造函數(shù)內(nèi)部初始化ThreadPoolExecutor對象撬讽。
  • EsThreadPoolExecutor的核心的execute方法內(nèi)部也是調(diào)用了ThreadPoolExecutor的execute方法蕊连。
  • EsThreadPoolExecutor重新了execute和afterExecute方法。
  • EsThreadPoolExecutor給我們提供了重寫ThreadPoolExecutor的思路游昼,值得學(xué)習(xí)甘苍。


EsExecutors的ThreadFactory

public class EsExecutors {

    public static String threadName(Settings settings, String ... names) {
        String namePrefix =
                Arrays
                        .stream(names)
                        .filter(name -> name != null)
                        .collect(Collectors.joining(".", "[", "]"));
        return threadName(settings, namePrefix);
    }

    public static ThreadFactory daemonThreadFactory(String namePrefix) {
        return new EsThreadFactory(namePrefix);
    }

    static class EsThreadFactory implements ThreadFactory {

        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        EsThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
                    0);
            t.setDaemon(true);
            return t;
        }
    }

    private EsExecutors() {
    }
}

說明:

  • 1.EsExecutors給我們提供一種創(chuàng)建線程工廠的標(biāo)準(zhǔn)方法,實(shí)現(xiàn)ThreadFactory接口重新newThread()方法烘豌。
  • 2.通過AtomicInteger threadNumber = new AtomicInteger(1)變量生成線程自增的線程id载庭。
  • 3.線程池的線程都有具體意義的線程名非常重要有利于排查問題,非常推薦使用廊佩。


EsExecutors的AbortPolicy

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {
    long rejected();
}

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }

    public long rejected() {
        return rejected.count();
    }
}


static class ForceQueuePolicy implements XRejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() {
            return 0;
        }
    }

說明:

  • EsAbortPolicy的過期策略提供了我們自定實(shí)現(xiàn)過期策略的案例囚聚。


參考文章

源碼Elasticsearch源碼3(線程池)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市标锄,隨后出現(xiàn)的幾起案子顽铸,更是在濱河造成了極大的恐慌,老刑警劉巖鸯绿,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跋破,死亡現(xiàn)場離奇詭異,居然都是意外死亡瓶蝴,警方通過查閱死者的電腦和手機(jī)毒返,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舷手,“玉大人拧簸,你說我怎么就攤上這事∧锌撸” “怎么了盆赤?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵贾富,是天一觀的道長。 經(jīng)常有香客問我牺六,道長颤枪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任淑际,我火速辦了婚禮畏纲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘春缕。我一直安慰自己盗胀,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布锄贼。 她就那樣靜靜地躺著票灰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪宅荤。 梳的紋絲不亂的頭發(fā)上屑迂,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機(jī)與錄音膘侮,去河邊找鬼屈糊。 笑死,一個(gè)胖子當(dāng)著我的面吹牛琼了,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播夫晌,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼雕薪,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了晓淀?” 一聲冷哼從身側(cè)響起所袁,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎凶掰,沒想到半個(gè)月后燥爷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡懦窘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年前翎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片畅涂。...
    茶點(diǎn)故事閱讀 40,505評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡港华,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出午衰,到底是詐尸還是另有隱情立宜,我是刑警寧澤冒萄,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站橙数,受9級特大地震影響尊流,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜灯帮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一崖技、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧施流,春花似錦响疚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至银受,卻和暖如春践盼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宾巍。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工咕幻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人顶霞。 一個(gè)月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓肄程,卻偏偏與公主長得像,于是被迫代替她去往敵國和親选浑。 傳聞我的和親對象是個(gè)殘疾皇子蓝厌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評論 2 359