上兩篇博文(netty源碼分析之揭開reactor線程的面紗(一),netty源碼分析之揭開reactor線程的面紗(二))已經(jīng)描述了netty的reactor線程前兩個(gè)步驟所處理的工作清笨,在這里月杉,我們用這張圖片來回顧一下:
簡(jiǎn)單總結(jié)一下reactor線程三部曲
- 輪詢出IO事件
- 處理IO事件
- 處理任務(wù)隊(duì)列
今天,我們要進(jìn)行的是三部曲中的最后一曲【處理任務(wù)隊(duì)列】抠艾,也就是上面圖中的紫色部分沙合。
讀完本篇文章,你將了解到netty的異步task機(jī)制跌帐,定時(shí)任務(wù)的處理邏輯首懈,這些細(xì)節(jié)可以更好地幫助你寫出netty應(yīng)用
netty中的task的常見使用場(chǎng)景
我們?nèi)∪N典型的task使用場(chǎng)景來分析
一. 用戶自定義普通任務(wù)
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//...
}
});
我們跟進(jìn)execute
方法,看重點(diǎn)
@Override
public void execute(Runnable task) {
//...
addTask(task);
//...
}
execute
方法調(diào)用 addTask
方法
protected void addTask(Runnable task) {
// ...
if (!offerTask(task)) {
reject(task);
}
}
然后調(diào)用offerTask
方法谨敛,如果offer失敗究履,那就調(diào)用reject
方法,通過默認(rèn)的 RejectedExecutionHandler
直接拋出異常
final boolean offerTask(Runnable task) {
// ...
return taskQueue.offer(task);
}
跟到offerTask
方法脸狸,基本上task就落地了最仑,netty內(nèi)部使用一個(gè)taskQueue
將task保存起來,那么這個(gè)taskQueue
又是何方神圣炊甲?
我們查看 taskQueue
定義的地方和被初始化的地方
private final Queue<Runnable> taskQueue;
taskQueue = newTaskQueue(this.maxPendingTasks);
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
我們發(fā)現(xiàn) taskQueue
在NioEventLoop中默認(rèn)是mpsc隊(duì)列泥彤,mpsc隊(duì)列,即多生產(chǎn)者單消費(fèi)者隊(duì)列卿啡,netty使用mpsc吟吝,方便的將外部線程的task聚集,在reactor線程內(nèi)部用單線程來串行執(zhí)行颈娜,我們可以借鑒netty的任務(wù)執(zhí)行模式來處理類似多線程數(shù)據(jù)上報(bào)剑逃,定時(shí)聚合的應(yīng)用
在本節(jié)討論的任務(wù)場(chǎng)景中,所有代碼的執(zhí)行都是在reactor線程中的官辽,所以蛹磺,所有調(diào)用 inEventLoop()
的地方都返回true,既然都是在reactor線程中執(zhí)行同仆,那么其實(shí)這里的mpsc隊(duì)列其實(shí)沒有發(fā)揮真正的作用萤捆,mpsc大顯身手的地方其實(shí)在第二種場(chǎng)景
二. 非當(dāng)前reactor線程調(diào)用channel的各種方法
// non reactor thread
channel.write(...)
上面一種情況在push系統(tǒng)中比較常見,一般在業(yè)務(wù)線程里面俗批,根據(jù)用戶的標(biāo)識(shí)俗或,找到對(duì)應(yīng)的channel引用,然后調(diào)用write類方法向該用戶推送消息扶镀,就會(huì)進(jìn)入到這種場(chǎng)景
關(guān)于channel.write()類方法的調(diào)用鏈蕴侣,后面會(huì)單獨(dú)拉出一篇文章來深入剖析,這里臭觉,我們只需要知道昆雀,最終write方法串至以下方法
AbstractChannelHandlerContext.java
private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
外部線程在調(diào)用write
的時(shí)候辱志,executor.inEventLoop()
會(huì)返回false,直接進(jìn)入到else分支狞膘,將write封裝成一個(gè)WriteTask
(這里僅僅是write而沒有flush揩懒,因此flush
參數(shù)為false), 然后調(diào)用 safeExecute
方法
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
// ...
executor.execute(runnable);
// ...
}
接下來的調(diào)用鏈就進(jìn)入到第一種場(chǎng)景了,但是和第一種場(chǎng)景有個(gè)明顯的區(qū)別就是挽封,第一種場(chǎng)景的調(diào)用鏈的發(fā)起線程是reactor線程已球,第二種場(chǎng)景的調(diào)用鏈的發(fā)起線程是用戶線程,用戶線程可能會(huì)有很多個(gè)辅愿,顯然多個(gè)線程并發(fā)寫taskQueue
可能出現(xiàn)線程同步問題智亮,于是,這種場(chǎng)景下点待,netty的mpsc queue就有了用武之地
三. 用戶自定義定時(shí)任務(wù)
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);
第三種場(chǎng)景就是定時(shí)任務(wù)邏輯了阔蛉,用的最多的便是如上方法:在一定時(shí)間之后執(zhí)行任務(wù)
我們跟進(jìn)schedule
方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//...
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
通過 ScheduledFutureTask
, 將用戶自定義任務(wù)再次包裝成一個(gè)netty內(nèi)部的任務(wù)
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
// ...
scheduledTaskQueue().add(task);
// ...
return task;
}
到了這里,我們有點(diǎn)似曾相識(shí)癞埠,在非定時(shí)任務(wù)的處理中状原,netty通過一個(gè)mpsc隊(duì)列將任務(wù)落地,這里苗踪,是否也有一個(gè)類似的隊(duì)列來承載這類定時(shí)任務(wù)呢颠区?帶著這個(gè)疑問,我們繼續(xù)向前
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
果不其然通铲,scheduledTaskQueue()
方法毕莱,會(huì)返回一個(gè)優(yōu)先級(jí)隊(duì)列,然后調(diào)用 add
方法將定時(shí)任務(wù)加入到隊(duì)列中去测暗,但是央串,這里為什么要使用優(yōu)先級(jí)隊(duì)列,而不需要考慮多線程的并發(fā)碗啄?
因?yàn)槲覀儸F(xiàn)在討論的場(chǎng)景,調(diào)用鏈的發(fā)起方是reactor線程稳摄,不會(huì)存在多線程并發(fā)這些問題
但是稚字,萬一有的用戶在reactor之外執(zhí)行定時(shí)任務(wù)呢?雖然這類場(chǎng)景很少見厦酬,但是netty作為一個(gè)無比健壯的高性能io框架胆描,必須要考慮到這種情況。
對(duì)此仗阅,netty的處理是昌讲,如果是在外部線程調(diào)用schedule,netty將添加定時(shí)任務(wù)的邏輯封裝成一個(gè)普通的task减噪,這個(gè)task的任務(wù)是添加[添加定時(shí)任務(wù)]的任務(wù)短绸,而不是添加定時(shí)任務(wù)车吹,其實(shí)也就是第二種場(chǎng)景,這樣醋闭,對(duì) PriorityQueue
的訪問就變成單線程窄驹,即只有reactor線程
完整的schedule方法
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
// 進(jìn)入到場(chǎng)景二,進(jìn)一步封裝任務(wù)
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
在閱讀源碼細(xì)節(jié)的過程中证逻,我們應(yīng)該多問幾個(gè)為什么乐埠?這樣會(huì)有利于看源碼的時(shí)候不至于犯困!比如這里囚企,為什么定時(shí)任務(wù)要保存在優(yōu)先級(jí)隊(duì)列中丈咐,我們可以先不看源碼,來思考一下優(yōu)先級(jí)對(duì)列的特性
優(yōu)先級(jí)隊(duì)列按一定的順序來排列內(nèi)部元素龙宏,內(nèi)部元素必須是可以比較的棵逊,聯(lián)系到這里每個(gè)元素都是定時(shí)任務(wù),那就說明定時(shí)任務(wù)是可以比較的烦衣,那么到底有哪些地方可以比較歹河?
每個(gè)任務(wù)都有一個(gè)下一次執(zhí)行的截止時(shí)間,截止時(shí)間是可以比較的花吟,截止時(shí)間相同的情況下秸歧,任務(wù)添加的順序也是可以比較的,就像這樣衅澈,閱讀源碼的過程中键菱,一定要多和自己對(duì)話,多問幾個(gè)為什么
帶著猜想今布,我們研究與一下ScheduledFutureTask
经备,抽取出關(guān)鍵部分
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
private static final AtomicLong nextTaskId = new AtomicLong();
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
private final long id = nextTaskId.getAndIncrement();
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
@Override
public int compareTo(Delayed o) {
//...
}
// 精簡(jiǎn)過的代碼
@Override
public void run() {
}
這里,我們一眼就找到了compareTo
方法部默,cmd+u
跳轉(zhuǎn)到實(shí)現(xiàn)的接口侵蒙,發(fā)現(xiàn)就是Comparable
接口
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
進(jìn)入到方法體內(nèi)部,我們發(fā)現(xiàn)傅蹂,兩個(gè)定時(shí)任務(wù)的比較纷闺,確實(shí)是先比較任務(wù)的截止時(shí)間,截止時(shí)間相同的情況下份蝴,再比較id犁功,即任務(wù)添加的順序,如果id再相同的話婚夫,就拋Error
這樣浸卦,在執(zhí)行定時(shí)任務(wù)的時(shí)候,就能保證最近截止時(shí)間的任務(wù)先執(zhí)行
下面案糙,我們?cè)賮砜聪耼etty是如何來保證各種定時(shí)任務(wù)的執(zhí)行的限嫌,netty里面的定時(shí)任務(wù)分以下三種
1.若干時(shí)間后執(zhí)行一次
2.每隔一段時(shí)間執(zhí)行一次
3.每次執(zhí)行結(jié)束靴庆,隔一定時(shí)間再執(zhí)行一次
netty使用一個(gè) periodNanos
來區(qū)分這三種情況,正如netty的注釋那樣
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
了解這些背景之后萤皂,我們來看下netty是如何來處理這三種不同類型的定時(shí)任務(wù)的
public void run() {
if (periodNanos == 0) {
V result = task.call();
setSuccessInternal(result);
} else {
task.call();
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
scheduledTaskQueue.add(this);
}
}
}
if (periodNanos == 0)
對(duì)應(yīng) 若干時(shí)間后執(zhí)行一次
的定時(shí)任務(wù)類型撒穷,執(zhí)行完了該任務(wù)就結(jié)束了。
否則裆熙,進(jìn)入到else代碼塊端礼,先執(zhí)行任務(wù),然后再區(qū)分是哪種類型的任務(wù)入录,periodNanos
大于0蛤奥,表示是以固定頻率執(zhí)行某個(gè)任務(wù),和任務(wù)的持續(xù)時(shí)間無關(guān)僚稿,然后凡桥,設(shè)置該任務(wù)的下一次截止時(shí)間為本次的截止時(shí)間加上間隔時(shí)間periodNanos
,否則蚀同,就是每次任務(wù)執(zhí)行完畢之后缅刽,間隔多長(zhǎng)時(shí)間之后再次執(zhí)行,截止時(shí)間為當(dāng)前時(shí)間加上間隔時(shí)間蠢络,-p
就表示加上一個(gè)正的間隔時(shí)間衰猛,最后,將當(dāng)前任務(wù)對(duì)象再次加入到隊(duì)列刹孔,實(shí)現(xiàn)任務(wù)的定時(shí)執(zhí)行
netty內(nèi)部的任務(wù)添加機(jī)制了解地差不多之后啡省,我們就可以查看reactor第三部曲是如何來調(diào)度這些任務(wù)的
reactor線程task的調(diào)度
首先,我們將目光轉(zhuǎn)向最外層的外觀代碼
runAllTasks(long timeoutNanos);
顧名思義髓霞,這行代碼表示了盡量在一定的時(shí)間內(nèi)卦睹,將所有的任務(wù)都取出來run一遍。timeoutNanos
表示該方法最多執(zhí)行這么長(zhǎng)時(shí)間方库,netty為什么要這么做结序?我們可以想一想,reactor線程如果在此停留的時(shí)間過長(zhǎng)纵潦,那么將積攢許多的IO事件無法處理(見reactor線程的前面兩個(gè)步驟)笼痹,最終導(dǎo)致大量客戶端請(qǐng)求阻塞,因此酪穿,默認(rèn)情況下坡氯,netty將控制內(nèi)部隊(duì)列的執(zhí)行時(shí)間
好摊灭,我們繼續(xù)跟進(jìn)
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
//...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
這段代碼便是reactor執(zhí)行task的所有邏輯,可以拆解成下面幾個(gè)步驟
- 從scheduledTaskQueue轉(zhuǎn)移定時(shí)任務(wù)到taskQueue(mpsc queue)
- 計(jì)算本次任務(wù)循環(huán)的截止時(shí)間
- 執(zhí)行任務(wù)
- 收尾
按照這個(gè)步驟贴妻,我們一步步來分析下
從scheduledTaskQueue轉(zhuǎn)移定時(shí)任務(wù)到taskQueue(mpsc queue)
首先調(diào)用 fetchFromScheduledTaskQueue()
方法涧团,將到期的定時(shí)任務(wù)轉(zhuǎn)移到mpsc queue里面
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
可以看到只磷,netty在把任務(wù)從scheduledTaskQueue轉(zhuǎn)移到taskQueue的時(shí)候還是非常小心的经磅,當(dāng)taskQueue無法offer的時(shí)候,需要把從scheduledTaskQueue里面取出來的任務(wù)重新添加回去
從scheduledTaskQueue從拉取一個(gè)定時(shí)任務(wù)的邏輯如下钮追,傳入的參數(shù)nanoTime
為當(dāng)前時(shí)間(其實(shí)是當(dāng)前納秒減去ScheduledFutureTask
類被加載的納秒個(gè)數(shù))
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
可以看到预厌,每次 pollScheduledTask
的時(shí)候,只有在當(dāng)前任務(wù)的截止時(shí)間已經(jīng)到了元媚,才會(huì)取出來
計(jì)算本次任務(wù)循環(huán)的截止時(shí)間
Runnable task = pollTask();
//...
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
這一步將取出第一個(gè)任務(wù)轧叽,用reactor線程傳入的超時(shí)時(shí)間 timeoutNanos
來計(jì)算出當(dāng)前任務(wù)循環(huán)的deadline,并且使用了runTasks
刊棕,lastExecutionTime
來時(shí)刻記錄任務(wù)的狀態(tài)
循環(huán)執(zhí)行任務(wù)
for (;;) {
safeExecute(task);
runTasks ++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
這一步便是netty里面執(zhí)行所有任務(wù)的核心代碼了炭晒。
首先調(diào)用safeExecute
來確保任務(wù)安全執(zhí)行,忽略任何異常
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
然后將已運(yùn)行任務(wù) runTasks
加一甥角,每隔0x3F
任務(wù)网严,即每執(zhí)行完64個(gè)任務(wù)之后,判斷當(dāng)前時(shí)間是否超過本次reactor任務(wù)循環(huán)的截止時(shí)間了嗤无,如果超過震束,那就break掉,如果沒有超過当犯,那就繼續(xù)執(zhí)行垢村。可以看到灶壶,netty對(duì)性能的優(yōu)化考慮地相當(dāng)?shù)闹艿礁味希僭O(shè)netty任務(wù)隊(duì)列里面如果有海量小任務(wù),如果每次都要執(zhí)行完任務(wù)都要判斷一下是否到截止時(shí)間驰凛,那么效率是比較低下的
收尾
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
收尾工作很簡(jiǎn)單胸懈,調(diào)用一下 afterRunningAllTasks
方法
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
NioEventLoop
可以通過父類SingleTheadEventLoop
的executeAfterEventLoopIteration
方法向tailTasks
中添加收尾任務(wù),比如恰响,你想統(tǒng)計(jì)一下一次執(zhí)行一次任務(wù)循環(huán)花了多長(zhǎng)時(shí)間就可以調(diào)用此方法
public final void executeAfterEventLoopIteration(Runnable task) {
// ...
if (!tailTasks.offer(task)) {
reject(task);
}
//...
}
this.lastExecutionTime = lastExecutionTime;
簡(jiǎn)單記錄一下任務(wù)執(zhí)行的時(shí)間趣钱,搜了一下該field的引用,發(fā)現(xiàn)這個(gè)field并沒有使用過胚宦,只是每次不停地賦值首有,賦值,賦值...枢劝,改天再去向netty官方提個(gè)issue...
reactor線程第三曲到了這里基本上就給你講完了井联,如果你讀到這覺得很輕松,那么恭喜你您旁,你對(duì)netty的task機(jī)制已經(jīng)非常比較熟悉了烙常,也恭喜一下我,把這些機(jī)制給你將清楚了鹤盒。我們最后再來一次總結(jié)蚕脏,以tips的方式
- 當(dāng)前reactor線程調(diào)用當(dāng)前eventLoop執(zhí)行任務(wù)侦副,直接執(zhí)行,否則驼鞭,添加到任務(wù)隊(duì)列稍后執(zhí)行
- netty內(nèi)部的任務(wù)分為普通任務(wù)和定時(shí)任務(wù)秦驯,分別落地到MpscQueue和PriorityQueue
- netty每次執(zhí)行任務(wù)循環(huán)之前,會(huì)將已經(jīng)到期的定時(shí)任務(wù)從PriorityQueue轉(zhuǎn)移到MpscQueue
- netty每隔64個(gè)任務(wù)檢查一下是否該退出任務(wù)循環(huán)
如果你覺得看的不過癮挣棕,想系統(tǒng)學(xué)習(xí)Netty原理译隘,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html