開篇
?這篇文章主要是用來講解ES線程池(EsExecutors)的實(shí)現(xiàn),然后象征性的和JDK的Executors實(shí)現(xiàn)進(jìn)行了簡單的對比挑宠,看了這篇文章以后要對Executors和ThreadPoolExecutor的使用更有信心才對菲盾。
elasticsearch線程池配置
public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC,
new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX,
new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK,
new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200));
builders.put(Names.GET,
new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH,
new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT,
new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
builders.put(Names.LISTENER,
new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
builders.put(Names.FLUSH,
new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH,
new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER,
new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT,
new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE,
new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
}
說明:
- elasticsearch線程池根據(jù)作用的不同主要分為兩大類 ScalingExecutor和FixedExecutor。
- ScalingExecutor表示線程池中的線程數(shù)是動(dòng)態(tài)可變的各淀。
- FixedExecutor表示線程池中的線程池是不可變的懒鉴。
elasticsearch線程池分類
elasticsearch線程池的線程按照源碼的實(shí)現(xiàn)來看分為FIXED和SCALING兩大類,FIXED的意思是固定線程的數(shù)量(core thread個(gè)數(shù) = max thread個(gè)數(shù))揪阿,SCALING的意思是動(dòng)態(tài)調(diào)整線程數(shù)量(core thread個(gè)數(shù) != max thread個(gè)數(shù))疗我。
FIXED
說明:大小固定設(shè)置的threadpool,它有一個(gè)queue來存放pending的請求南捂,其中pool的大小默認(rèn)是core*5吴裤,queue_size默認(rèn)是-1(即是無限制)。
LISTENER:用作client的操作溺健,默認(rèn)大小halfProcMaxAt10麦牺,queue_size=-1無限制;
GET:用作get操作鞭缭,默認(rèn)大小availableProcessors剖膳,queue_size為1000;
INDEX:用作index或delete操作岭辣,默認(rèn)大小availableProcessors吱晒,queue_size為200;
BULK:用作bulk操作沦童,默認(rèn)大小為availableProcessors仑濒,queue_size為200叹话;
SEARCH:用作count或是search操作,默認(rèn)大小((availableProcessors * 3) / 2) + 1墩瞳;queue_size為1000驼壶;
SUGGEST:用作suggest操作,默認(rèn)大小availableProcessors喉酌,queue_size為1000热凹;
PERCOLATE:用作percolate,默認(rèn)大小為availableProcessors泪电,queue_size為1000般妙;
FORCE_MERGE:用作force_merge操作(2.1之前叫做optimize),默認(rèn)大小為1歪架;
SCALING
說明:擁有可變大小的pool股冗,其值可在1和設(shè)置值之間。
GENERIC:通用的操作和蚪,比如node的discovery,默認(rèn)大小genericThreadPoolMax烹棉,默認(rèn)keep alive時(shí)間是30sec;
MANAGEMENT:用作ES的管理攒霹,比如集群的管理;默認(rèn)大小5浆洗,keep alive時(shí)間為5min催束;
FLUSH:用作flush操作,默認(rèn)大小為halfProcMaxAt5伏社,keep alive時(shí)間為5min抠刺;
REFRESH:用作refresh操作,默認(rèn)大小為halfProcMaxAt10摘昌,keep alive時(shí)間為5min速妖;
WARMER:用作index warm-up操作,默認(rèn)大小為halfProcMaxAt5聪黎,keep alive時(shí)間為5min罕容;
SNAPSHOT:用作snapshot操作,默認(rèn)大小為halfProcMaxAt5稿饰,keep alive時(shí)間為5min锦秒;
FETCH_SHARD_STARTED:用作fetch shard開始操作,默認(rèn)大小availableProcessors * 2喉镰,keep alive時(shí)間為5min旅择;
FETCH_SHARD_STORE:用作fetch shard存儲(chǔ)操作,默認(rèn)大小availableProcessors * 2侣姆,keep alive時(shí)間為5min生真;
JDK的Executors
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
}
說明:
- Executors的newFixedThreadPool創(chuàng)建固定線程數(shù)量的線程池脖咐。
- Executors的newCachedThreadPool創(chuàng)建可動(dòng)態(tài)調(diào)整線程數(shù)量的線程池。
- Executors創(chuàng)建的是ThreadPoolExecutor對象汇歹。
Elasticsearch的EsExecutors
public class EsExecutors {
public static final Setting<Integer> PROCESSORS_SETTING =
Setting.intSetting("processors", Runtime.getRuntime().availableProcessors(), 1, Property.NodeScope);
public static int numberOfProcessors(final Settings settings) {
return PROCESSORS_SETTING.get(settings);
}
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime,
TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime,
unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue,
threadFactory, new EsAbortPolicy(), contextHolder);
}
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
ThreadFactory threadFactory, ThreadContext contextHolder) {
if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
initialQueueCapacity);
}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
}
}
說明:
- newScaling()方法創(chuàng)建可擴(kuò)展線程數(shù)量的線程池屁擅,淘汰策略使用ForceQueuePolicy。
- newFixed()方法創(chuàng)建創(chuàng)建固定線程數(shù)量的線程池产弹,淘汰策略使用EsAbortPolicy派歌。
- newAutoQueueFixed()方法創(chuàng)建固定線程數(shù)量但是Queue隊(duì)列數(shù)量可以動(dòng)態(tài)調(diào)整的線程池,淘汰策略使用EsAbortPolicy痰哨。胶果。
- EsExecutors內(nèi)部創(chuàng)建的是EsThreadPoolExecutor對象。
- EsExecutors的實(shí)現(xiàn)借鑒了JDK的Executors接口斤斧,給我們提供了自定Executors的思路早抠。
EsExecutors的EsThreadPoolExecutor
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
}
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
}
@Override
public void execute(final Runnable command) {
doExecute(wrapRunnable(command));
}
protected void doExecute(final Runnable command) {
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
} else {
throw ex;
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
assert assertDefaultContext(r);
}
}
說明:
- EsThreadPoolExecutor繼承自ThreadPoolExecutor對象,構(gòu)造函數(shù)內(nèi)部初始化ThreadPoolExecutor對象撬讽。
- EsThreadPoolExecutor的核心的execute方法內(nèi)部也是調(diào)用了ThreadPoolExecutor的execute方法蕊连。
- EsThreadPoolExecutor重新了execute和afterExecute方法。
- EsThreadPoolExecutor給我們提供了重寫ThreadPoolExecutor的思路游昼,值得學(xué)習(xí)甘苍。
EsExecutors的ThreadFactory
public class EsExecutors {
public static String threadName(Settings settings, String ... names) {
String namePrefix =
Arrays
.stream(names)
.filter(name -> name != null)
.collect(Collectors.joining(".", "[", "]"));
return threadName(settings, namePrefix);
}
public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new EsThreadFactory(namePrefix);
}
static class EsThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
EsThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}
}
private EsExecutors() {
}
}
說明:
- 1.EsExecutors給我們提供一種創(chuàng)建線程工廠的標(biāo)準(zhǔn)方法,實(shí)現(xiàn)ThreadFactory接口重新newThread()方法烘豌。
- 2.通過AtomicInteger threadNumber = new AtomicInteger(1)變量生成線程自增的線程id载庭。
- 3.線程池的線程都有具體意義的線程名非常重要有利于排查問題,非常推薦使用廊佩。
EsExecutors的AbortPolicy
public interface XRejectedExecutionHandler extends RejectedExecutionHandler {
long rejected();
}
public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
public long rejected() {
return rejected.count();
}
}
static class ForceQueuePolicy implements XRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected() {
return 0;
}
}
說明:
- EsAbortPolicy的過期策略提供了我們自定實(shí)現(xiàn)過期策略的案例囚聚。