MultithreadEventLoopGroup
-
MultithreadEventLoopGroup
是之前提到的NioEventLoopGroup
的父類(lèi)敛助,是一個(gè)抽象類(lèi)萌业。在上節(jié)中提到它有一套計(jì)算線程數(shù)量的邏輯茄袖。 - 它繼承自
MultithreadEventExecutorGroup
,而且真正的構(gòu)造函數(shù)邏輯就在它的父類(lèi)中恒界。
MultithreadEventExecutorGroup
構(gòu)造函數(shù)如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
......
//創(chuàng)建一個(gè)‘線程任務(wù)執(zhí)行器’喂柒,傳入一個(gè)線程工廠作為參數(shù)
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
......
}
第5行創(chuàng)建了一個(gè)線程任務(wù)執(zhí)行器 ThreadPerTaskExecutor
,傳入一個(gè)線程工廠對(duì)象作為參數(shù)棚放。
線程工廠接口--ThreadFactory
ThreadFactory
是一個(gè)接口枚粘,只定義了一個(gè)newThread
方法如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
線程工廠使得線程的創(chuàng)建
和線程的執(zhí)行邏輯
得到解耦,線程工廠可以設(shè)置線程的優(yōu)先級(jí)飘蚯、名字馍迄、是否后臺(tái)線程、所屬的ThreadGroup
等信息局骤。例如常用的工具類(lèi)Executors
的內(nèi)部類(lèi)DefaultThreadFactory
就是接口ThreadFactory
的一個(gè)實(shí)現(xiàn)類(lèi):
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//創(chuàng)建新線程攀圈,設(shè)置是否后臺(tái)運(yùn)行,所屬的ThreadGroup庄涡,優(yōu)先級(jí)
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
執(zhí)行器接口--Executor
Executor
接口是一個(gè)函數(shù)式接口量承,定義了一個(gè)execute
方法:
public interface Executor {
void execute(Runnable command);
}
上面的ThreadPerTaskExecutor
就是Executor
接口的實(shí)現(xiàn)類(lèi):
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
它持有一個(gè)線程工廠ThreadFactory
對(duì)象,使用代理模式
穴店,執(zhí)行時(shí)使用線程工廠創(chuàng)建了一個(gè)新的線程撕捍,并啟動(dòng)。
Executor
接口提供了一種執(zhí)行任務(wù)的新方式泣洞,不是直接新建線程并運(yùn)行new Thread(Runnable).start()
忧风。創(chuàng)建Executor
接口的實(shí)現(xiàn)類(lèi),將Runnable
對(duì)象傳入執(zhí)行器中執(zhí)行球凰。
Executor executor = new XXXExeutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
具體例如:
-
直接執(zhí)行
Runnable
對(duì)象class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }}
ThreadPerTaskExecutor
的例子是新建了一個(gè)線程執(zhí)行任務(wù)狮腿。-
將任務(wù)按照一定的順序執(zhí)行,如
io.grpc.internal.SerializingExecutor
呕诉, 它將任務(wù)首先添加到隊(duì)列中缘厢,然后按序執(zhí)行。public final class SerializingExecutor implements Executor, Runnable { private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<Runnable>(); public SerializingExecutor(Executor executor) { Preconditions.checkNotNull(executor, "'executor' must not be null."); this.executor = executor; } //將任務(wù)添加到隊(duì)列中甩挫,按序執(zhí)行 @Override public void execute(Runnable r) { runQueue.add(checkNotNull(r, "'r' must not be null.")); schedule(r); } }