線程池是一個在多線程場景中運用很廣泛的并發(fā)框架剑按,需要異步執(zhí)行或并發(fā)執(zhí)行任務的程序都可以使用線程池。有任務到來時澜术,如果不使用線程池艺蝴,我們需要不斷的創(chuàng)建/銷毀線程,還需要對線程進行管理鸟废;而使用線程池猜敢,直接將任務提交到線程池即可。使用線程池有幾個好處:無需重復創(chuàng)建/銷毀線程盒延,降低資源消耗缩擂;提高程序響應速度;提高線程的可管理性添寺。
3.1 實現原理
線程池內部一般包含一個核心線程池胯盯,其內部的線程在創(chuàng)建之后一般不會銷毀,執(zhí)行完任務后線程會阻塞等待新任務到來计露。
當向線程池提交任務時唆缴,線程池會做如下判斷:
- 核心線程池未滿讲竿,創(chuàng)建線程執(zhí)行任務
- 核心線程池已滿卵凑,若等待隊列未滿暮胧,則加入到等待隊列;若等待隊列已滿但線程池未滿该押,創(chuàng)建新線程執(zhí)行任務疗杉;若等待隊列和線程池均已滿,則按照指定策略退出/拒絕任務/丟棄任務等蚕礼。
了解了實現原理烟具,我們先來自己實現一個線程池,首先定義線程池的接口
ThreadPool
線程池的接口里面最重要的方法是execute執(zhí)行任務
public interface ThreadPool<Job extends Runnable> {
//提交一個Job奠蹬,這個Job需要實現Runnable接口
void execute(Job job);
//關閉線程池
void shutdown();
//增加工作者線程
void addWorkers(int num);
//減少工作者線程
void removeWorker(int num);
//得到正在等待執(zhí)行的任務數量
int getJobSize();
}
CommonThreadPool
在實現線程池時朝聋,我們需要定義線程池的大小,以及保存任務的列表jobs罩润,下面是變量定義:
// 線程池最大限制數
private static final int MAX_WORKER_NUMBERS = 100;
// 線程池默認的數量
private static final int DEFAULT_WORKER_NUMBERS = 1;
// 線程池最小數量
private static final int MIN_WORKER_NUMBERS = 1;
// 工作列表
private final LinkedList<Job> jobs = new LinkedList<Job>();
在線程池初始化時,我們要將核心線程池進行初始化翼馆,創(chuàng)建多個Worker線程割以,然后啟動Worker線程金度。
// num 為DEFAULT_WORKER_NUMBERS 默認線程池大小
private void initializeWokers(int num) {
// 創(chuàng)建多個線程,加入workers中严沥,并啟動
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-"
+ threadNum.getAndIncrement());
thread.start();
}
}
Worker啟動后猜极,一直沒有任務,需要阻塞在jobs上(jobs是上面定義的任務列表)消玄,Worker等待任務到來后喚醒獲取隊列中的任務并執(zhí)行跟伏。下面的代碼中,如果jobs為空翩瓜,則線程等待受扳;
// worker的代碼,首先要獲取jobs的鎖兔跌,
synchronized (jobs) {
while (jobs.isEmpty()) {// 如果jobs是空的勘高,則執(zhí)行jobs.wait,使用while而不是if坟桅,因為wait后可能已經為空了华望,需要繼續(xù)等待
try {
jobs.wait();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();// 中斷
return;// 結束
}
}
job = jobs.removeFirst();// 第一個job
if (job != null) {
try {
job.run();//注意,這里是run而不是start仅乓,傳入的Job
} catch (Exception e) {
// 忽略Job執(zhí)行中的Exception
e.printStackTrace();
}
}
}
提交任務時赖舟,只需要將任務加入jobs中,然后通知worker線程即可夸楣。worker線程獲得鎖后會取第一個任務執(zhí)行宾抓。執(zhí)行完畢,若jobs為空裕偿,worker線程繼續(xù)進行休眠等待任務到來洞慎。
@Override
public void execute(Job job) {
if (job == null)
return;
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
完整的代碼可以查看https://github.com/ssj234/JavaStudy_IO/tree/master/IOResearch/src/net/ssj/pool
3.2 Java的Executor框架
Java平臺本身提供了Executor框架用來幫助我們使用線程池。
Executor框架最核心的類是ThreadPoolExecutor嘿棘,這是各個線程池的實現類劲腿,有如下幾個屬性:
- corePool:核心線程池的大小 m
- maximumPool:最大線程池的大小
- keepAliveTime: 休眠等待時間
- TimeUnit unit : 休眠等待時間單位,如微秒/納秒等
- BlockingQueue workQueue:用來保存任務的工作隊列
- ThreadFactory: 創(chuàng)建線程的工廠
- RejectedExecutionHandler:當線程池已經關閉或線程池Executor已經飽和鸟妙,execute()方法將要調用的Handler
通過Executor框架的根據類Executors焦人,可以創(chuàng)建三種基本的線程池:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
FixedThreadPool
FixedThreadPool被稱為可重用固定線程數的線程池。
// 獲取fixedThreadPool
ExecutorService fixedThreadPool=Executors.newFixedThreadPool(paramInt);
//內部會調用下面的方法重父,參數 corePoolSize花椭、maximumPoolSize、keepAliveTime房午、workQueue
return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
FixedTheadPool設置的線程池大小和最大數量一樣矿辽;keepAliveTime為0,代表多余的空閑線程會立刻終止;保存任務的隊列使用LinkedBlockingQueue袋倔,當線程池中的線程執(zhí)行完任務后雕蔽,會循環(huán)反復從隊列中獲取任務來執(zhí)行。
FixedThreadPool適用于限制當前線程數量的應用場景宾娜,適用于負載比較重
的服務器批狐。
SingleThreadExecutor
SingleThreadExecutor的核心線程池數量corePoolSize和最大數量maximumPoolSize都設置為1,適用于需要保證順序執(zhí)行
的場景
ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
CachedThreadPool
CachedThreadPool是一個會根據需要創(chuàng)建新線程的線程池前塔,適用于短期異步的小任務嚣艇,或負載教輕
的服務器。
ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
SynchronousQueue是一種阻塞隊列华弓,其中每個插入操作必須等待另一個線程的對應移除操作 食零,反之亦然。corePoolSize是0该抒,maximumPoolSize都最大慌洪,無界的。keepAliveTime為60秒凑保,空閑線程超過60S會被終止冈爹。
ScheduleThreadPoolExecutor
ScheduleThreadPoolExecutor和Timer類似,可以設置延時執(zhí)行或周期執(zhí)行欧引,但比Timer有更多的功能频伤。Timer和TimerTask只創(chuàng)建一個線程,任務執(zhí)行時間超過周期會產生一些問題芝此。Timer創(chuàng)建的線程沒有處理異常憋肖,因此一旦拋出非受檢異常,會立刻終止婚苹。
ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(5);
//可以直接執(zhí)行
executor.execute(new JobTaskR("executor", 0));
executor.execute(new JobTaskR("executor", 1));
System.out.println("5S后執(zhí)行executor3");
//隔5秒后執(zhí)行一次岸更,但只會執(zhí)行一次。
executor.schedule(new JobTaskR("executor", 3), 5, TimeUnit.SECONDS);
System.out.println("開始周期調度");
//設置周期執(zhí)行膊升,初始時6S后執(zhí)行怎炊,之后每2s執(zhí)行一次
executor.scheduleAtFixedRate(new JobTaskR("executor", 4), 6, 2, TimeUnit.SECONDS);
scheduleAtFixedRate或者scheduleWithFixedDelay方法,它們不同的是前者以固定頻率執(zhí)行廓译,后者以相對固定延遲之后執(zhí)行评肆。
3.3 Netty的EventLoop與線程池
Netty的事件循環(huán)和事件循環(huán)組的實現中,類的層級關系比較復雜非区,其底層是Java線程池的實現瓜挽,不過在netty的實際使用中還是比較簡單的,我們只需要使用如下的代碼即可征绸,
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)//設置事件循環(huán)組
Netty的事件循環(huán)機制有兩個基本接口:EventLoop和EventLoopGroup久橙。前者是事件循環(huán)俄占,后者是由多個事件循環(huán)組成的組。
EventLoop自身是一個不斷循環(huán)執(zhí)行的線程淆衷,以NioEventLoop為例颠放,其繼承了SingleThreadEventExecutor
,內部的executor
是創(chuàng)建NioEventLoop時傳入的線程池吭敢,用來將run方法放入線程池中執(zhí)行;此外還包含為一個TaskQueue暮芭,netty在處理io過程中的task可以提交到這個隊列中鹿驼,事件循環(huán)會不斷獲取task并執(zhí)行,因此但其本身也可以看做一個線程池辕宏。
NioEventLoop的run方法中畜晰,Nio的事件循環(huán)會不斷select后獲取任務并執(zhí)行,然后根據ioRatio的設置執(zhí)行TaskQueue的任務瑞筐。NioEventLoop的execute方法中凄鼻,其會將task加入到taskQueue等待事件循環(huán)執(zhí)行。因此聚假,我們可以將NioEventLoop當做一個不斷執(zhí)行的線程池块蚌,EventLoopGroup作為線程池組,線程池組的意義是采用給的的策略選取一個EventLoop并提交任務膘格。
EventLoop的定義如下峭范,其繼承了一個順序執(zhí)行的線程池接口和EventLoopGroup,也就是說EventLoop之間有父子關系瘪贱,通過parent();返回任務循環(huán)組纱控,通過next()選取一個事件循環(huán)。線程池組的register用于將Netty的Channel注冊到事件循環(huán)中菜秦。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel channel);
}
NioEventLoopGroup
NioEventLoopGroup除了處理網絡的異步I/O任務甜害,還用于完成異步提交的系統(tǒng)任務。NioEventLoopGroup初始化時球昨,有如下幾個參數可以配置尔店,主要用于設置線程池的相關配置。
- nThreads 子線程池數量
- Executor executor 用來執(zhí)行事件循環(huán)的線程池
- chooserFactory :next()時選擇線程池的策略
- selectorProvider 用于打開selector
- selectStrategyFactory 用來控制select循環(huán)行為的策略
- RejectedExecutionHandlers 線程池執(zhí)行的異常處理策略
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
NioEventLoopGroup初始化過程為:
- 如果傳入的executor 為空褪尝,會默認使用
ThreadPerTaskExecutor
闹获,該線程池針對每個任務會創(chuàng)建一個線程,創(chuàng)建線程方式使用DefaultThreadFactory
提供的newThread方法河哑。 - 初始化開始避诽,首先會根據創(chuàng)建nThread個子線程池,保存在childrens變量中璃谨,創(chuàng)建邏輯比較簡單沙庐,將初始化NioEventLoopGroup時設置的參數傳遞給NioEventLoop對象鲤妥。在創(chuàng)建子線程池NioEventLoop的過程中,如果一旦有失敗的拱雏,就需要關閉已經創(chuàng)建的所有子線程池并等待這些線程池結束棉安。
- 之后,使用chooserFactory創(chuàng)建
chooser
铸抑,用來在next()選擇事件循環(huán)時從childrens變量選擇一個返回贡耽。默認使用2的倍數的策略,也可以設置為順序依次選擇鹊汛。 - 向組中所有的事件循環(huán)的
terminationFuture
注冊事件蒲赂,目的是等待所有事件循環(huán)結束后將事件循環(huán)組的terminatedChildren
設置為成功完成。 - 最后刁憋,將children復制保存為一個只讀的集合滥嘴,保存在變量
readonlyChildren
中。
至此至耻,NioEventLoopGroup的初始化過程就結束了若皱。我們可以看到,NioEventLoopGroup主要的用來聚合多個EventLoop尘颓,對其進行調度走触。
NioEventLoop
在NioEventLoopGroup的初始化過程中,會創(chuàng)建多個NioEventLoop疤苹,NioEventLoop用來執(zhí)行實際的事件循環(huán)饺汹,初始化時有如下幾個屬性:
NioEventLoopGroup parent 線程池所在的Group
Executor executor 執(zhí)行任務的線程池,默認是ThreadPerTaskExecutor
SelectorProvider selectorProvider 用來打開selector
SelectStrategy strategy 用來控制select循環(huán)行為的策略
RejectedExecutionHandlers 線程池執(zhí)行的異常處理策略
addTaskWakesUp addTask(Runnable)添加任務時是否喚醒線程池痰催,默認是false
maxPendingTasks 線程池中等待任務的最大數量
scheduledTaskQueue 保存定時任務的QUeue
tailTasks :保存任務的Queue兜辞,netty選擇使用jctools的MpscChunkedArrayQueue,原因是為了提高效率夸溶,因為Nio線程池的線程消費者只有一個逸吵,就是一直進行的select循環(huán),而生產者可能有多個缝裁。具體實現參見 http://blog.csdn.net/youaremoon/article/details/50351929
事件循環(huán)
- NioEventLoop初始化時扫皱,會根據配置參數
sun.nio.ch.bugLevel
和io.netty.selectorAutoRebuildThreshold
設置重建selector的閾值,這是為了解決jvm空輪詢導致cpu利用率100%的問題捷绑。 - openSelector的目的是打開選擇描述符Selector韩脑,并對
sun.nio.ch.SelectorImpl
的實現進行優(yōu)化,將selectedKeys和publicSelectedKeys屬性都修改為SelectedSelectionKeySet類粹污,這個類使用了兩個數組段多,使用空間換時間的方法,設置了兩個數組壮吩,每次使用其中的一個进苍。 - 打開Selector之后加缘,在服務器啟動后會調用register將選擇描述符注冊到EventLoopGroup,NioEventLoopGroup中會調用NioEventLoop的register觉啊,這樣拣宏,事件循環(huán)中的Selector就注冊到了channel上。
- 在run方法中杠人,會根據selectStrategy調用select方法勋乾,收到io事件后使用processSelectedKeys處理,處理完成后執(zhí)行TaskQueue中的方法嗡善。
提交任務
NioEventLoop初始化時市俊,會創(chuàng)建/設置其包含的屬性,最重要的是打開selector和創(chuàng)建tailTasks兩個步驟滤奈;這時,由于沒有任何任務撩满,NioEventLoop不會啟動線程蜒程。在netty中,向線程池提交任務可以使用下面的方法:
EventLoopGroup loop = new NioEventLoopGroup();
loop.next().submit(Callable<T> task)
loop.next().submit(Runnable task)
loop.next().execute(Runnable command);
也可以直接通過EventLoopGroup提交任務伺帘,只是EventLoopGroup內部會調用next()后再執(zhí)行相關的方法昭躺。
EventLoopGroup loop = new NioEventLoopGroup();
loop.submit(Callable<T> task)
loop.submit(Runnable task)
loop.execute(Runnable command);
submit方法的內部會將Callable或Runnable包裝后交給execute方法執(zhí)行。
// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 包裝task為 ftask
execute(ftask);
return ftask;
}
execute方法被NioEventLoop的父類SingleThreadEventExecutor覆蓋伪嫁,程序如下:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task); // 添加到任務隊列
} else {
startThread(); // 啟動線程领炫,向EventLoop內部的線程池提交任務,會執(zhí)行NioEventLoop run
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- 判斷當前線程(提交任務的線程)與當前線程池是同一個線程张咳,也就是說是如果是當前線程池提交的任務帝洪,則直接將任務加入線程池隊列即可;
- 如果不是脚猾,則需要啟動線程后添加任務葱峡。啟動線程的過程是,如果內部線程沒有啟動則啟動龙助,向NioEventLoop內部包含的executor提交一個任務砰奕,任務內部執(zhí)行NioEventLoop的run方法也就是事件循環(huán)(executor是實際使用的線程池,初始化是傳入提鸟,默認是ThreadPerTaskExecutor)军援。
- 最后根據addTaskWakesUp標志和任務是否實現了NonWakeupRunnable判斷是否需要喚醒,喚醒的方法是提交一個默認的空任務WAKEUP_TASK称勋。
3.4 事件循環(huán)解析
Nio事件循環(huán)在NioEventLoop中胸哥,主要功能:
- 處理網絡I/O讀寫事件
- 執(zhí)行系統(tǒng)任務和定時任務
在主循環(huán)中我們可以看到netty對I/O任務和提交到事件循環(huán)中的系統(tǒng)任務的調度。
3.4.1 I/O事件
- 由于NIO的I/O讀寫需要使用選擇符赡鲜,因此烘嘱,netty在NioEventLoop初始化時昆禽,會使用SelectorProvider打開selector。在類加載時蝇庭,netty會從系統(tǒng)設置中讀取相關配置參數:
- sun.nio.ch.bugLevel 用來修復JDK的NIO在Selector.open()的一個BUG
- io.netty.selectorAutoRebuildThreshold select()多少次數后重建selector
static {
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
- NioEventLoop的構造方法中醉鳖,會調用provider.openSelector()打開Selector;如果設置
io.netty.noKeySetOptimization
為true,則會啟動優(yōu)化哮内,優(yōu)化內容是將Selector的selectedKeys和publicSelectedKeys屬性設置為可寫并替換為Netty實現的集合以提供效率盗棵。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
// 下面是優(yōu)化程序,此處省略
...
return selector;
}
- NioEventLoop最核心的地方在于事件循環(huán),具體代碼在NioEventLoop.java在run方法中
- 首先根據默認的選擇策略DefaultSelectStrategy判斷本次循環(huán)是否select北发,具體邏輯為:如果當前有任務則使用selectNow立刻查詢是否有準備就緒的I/O纹因;如果當前沒有任務則返回SelectStrategy.SELECT,并將wakenUp設置為false琳拨,并調用select()進行查詢瞭恰。
protected void run() {
for (;;) { // 事件循環(huán)
try {
// select策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // select()
if (wakenUp.get()) {
selector.wakeup(); // 喚醒select()的線程
}
default:
// fallthrough
}
.... 后續(xù)處理
select()時需要判斷當前是否有scheduledTask(定時任務),如果有則需要計算任務delay的時間狱庇,如果定時任務需要立刻執(zhí)行了惊畏,那么必須馬上selectNow()并返回,之后執(zhí)行任務密任。如果沒有scheduledTask颜启,會判斷當前是否有任務在等待列表,如果有任務時將wakenUp設置為true并selectNow()浪讳;如果沒有任務缰盏,那么會 selector.select(1000); 阻塞等待1s,直到有I/O就緒淹遵,或者有任務等待口猜,或需要喚醒時退出,否則透揣,會繼續(xù)循環(huán)暮的,直到前面的幾種情況發(fā)生后退出。
之后淌实,事件循環(huán)開始處理IO和任務冻辩。如果查詢到有IO事件,會調用processSelectedKeysOptimized(優(yōu)化的情況下)拆祈,對SelectionKey進行處理恨闪。
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime; // io花費的時間
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate計算task的時間
}
}
- processSelectedKeysOptimized處理I/O,主要是NIO的select操作放坏,處理相關的事件咙咽。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
3.4.2 任務處理
- runAllTasks執(zhí)行提交到EventLoop的任務,首先從scheduledTaskQueue獲取需要執(zhí)行的任務淤年,加入到taskQueue钧敞,然后依次執(zhí)行taskQueue的任務蜡豹。
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue(); // 獲取定時任務
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
- ioRatio不為100時,會調用runAllTasks(ioTime * (100 - ioRatio) / ioRatio)溉苛,首先計算出I/O處理的事件镜廉,然后按照比例為執(zhí)行task分配事件,內部主要邏輯與runAllTasks()主要邏輯相同愚战。