Netty源碼分析系列文章已接近尾聲,本文再來分析Netty中兩個(gè)常見組件:FastThreadLoca與HashedWheelTimer缅茉。
源碼分析基于Netty 4.1.52
FastThreadLocal
FastThreadLocal比較簡(jiǎn)單。
FastThreadLocal和FastThreadLocalThread是配套使用的埃叭。
FastThreadLocalThread繼承了Thread暂氯,F(xiàn)astThreadLocalThread#threadLocalMap 是一個(gè)InternalThreadLocalMap秸侣,該InternalThreadLocalMap對(duì)象只能用于當(dāng)前線程。
InternalThreadLocalMap#indexedVariables是一個(gè)數(shù)組罢绽,存放了當(dāng)前線程所有FastThreadLocal對(duì)應(yīng)的值畏线。
而每個(gè)FastThreadLocal都有一個(gè)index,用于定位InternalThreadLocalMap#indexedVariables良价。
FastThreadLocal#get
public final V get() {
// #1
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// #2
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// #3
return initialize(threadLocalMap);
}
#1
獲取該線程的InternalThreadLocalMap
如果是FastThreadLocalThread寝殴,直接獲取FastThreadLocalThread#threadLocalMap。
否則明垢,從UnpaddedInternalThreadLocalMap.slowThreadLocalMap獲取該線程InternalThreadLocalMap杯矩。
注意,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一個(gè)ThreadLocal袖外,這里實(shí)際回退到使用ThreadLocal了史隆。
#2
每個(gè)FastThreadLocal都有一個(gè)index。
通過該index曼验,獲取InternalThreadLocalMap#indexedVariables中存放的值
#3
找不到值泌射,通過initialize方法構(gòu)建新對(duì)象。
可以看到鬓照,F(xiàn)astThreadLocal中連hash算法都不用熔酷,通過下標(biāo)獲取對(duì)應(yīng)的值,復(fù)雜度為log(1)豺裆,自然很快啦拒秘。
HashedWheelTimer
HashedWheelTimer是Netty提供的時(shí)間輪調(diào)度器。
時(shí)間輪是一種充分利用線程資源進(jìn)行批量化任務(wù)調(diào)度的調(diào)度模型臭猜,能夠高效的管理各種延時(shí)任務(wù)躺酒。
簡(jiǎn)單說,就是將延時(shí)任務(wù)存放到一個(gè)環(huán)形隊(duì)列中蔑歌,并通過執(zhí)行線程定時(shí)執(zhí)行該隊(duì)列的任務(wù)羹应。
例如,
環(huán)形隊(duì)列上有60個(gè)格子次屠,
執(zhí)行線程每秒移動(dòng)一個(gè)格子园匹,則環(huán)形隊(duì)列每輪可存放1分鐘內(nèi)的任務(wù)雳刺。
現(xiàn)在有兩個(gè)定時(shí)任務(wù)
task1,32秒后執(zhí)行
task2裸违,2分25秒后執(zhí)行
而執(zhí)行線程當(dāng)前位于第6格子
則task1放到32+6=38格掖桦,輪數(shù)為0
task2放到25+6=31個(gè),輪數(shù)為2
執(zhí)行線程將執(zhí)行當(dāng)前格子輪數(shù)為0的任務(wù)供汛,并將其他任務(wù)輪數(shù)減1枪汪。
缺點(diǎn),時(shí)間輪調(diào)度器的時(shí)間精度不高紊馏。
因?yàn)闀r(shí)間輪算法的精度取決于執(zhí)行線程移動(dòng)速度料饥。
例如上面例子中執(zhí)行線程每秒移動(dòng)一個(gè)格子蒲犬,則調(diào)度精度小于一秒的任務(wù)就無(wú)法準(zhǔn)時(shí)調(diào)用朱监。
HashedWheelTimer關(guān)鍵字段
// 任務(wù)執(zhí)行器,負(fù)責(zé)執(zhí)行任務(wù)
Worker worker = new Worker();
// 任務(wù)執(zhí)行線程
Thread workerThread;
// HashedWheelTimer狀態(tài)原叮, 0 - init, 1 - started, 2 - shut down
int workerState;
// 時(shí)間輪隊(duì)列赫编,使用數(shù)組實(shí)現(xiàn)
HashedWheelBucket[] wheel;
// 暫存新增的任務(wù)
Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 已取消任務(wù)
Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
添加延遲任務(wù) HashedWheelTimer#newTimeout
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
...
// #1
start();
// #2
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
...
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
#1
如果HashedWheelTimer未啟動(dòng),則啟動(dòng)該HashedWheelTimer
HashedWheelTimer#start方法負(fù)責(zé)是啟動(dòng)workerThread線程
#2
startTime是HashedWheelTimer啟動(dòng)時(shí)間
deadline是相對(duì)HashedWheelTimer啟動(dòng)的延遲時(shí)間
構(gòu)建HashedWheelTimeout奋隶,添加到HashedWheelTimer#timeouts
時(shí)間輪運(yùn)行 Worker#run
public void run() {
...
// #1
startTimeInitialized.countDown();
do {
// #2
final long deadline = waitForNextTick();
if (deadline > 0) {
// #3
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// #4
transferTimeoutsToBuckets();
// #5
bucket.expireTimeouts(deadline);
// #6
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// #7
...
}
#1
HashedWheelTimer#start方法阻塞HashedWheelTimer線程直到Worker啟動(dòng)完成擂送,這里解除HashedWheelTimer線程阻塞。
#2
計(jì)算下一格子開始執(zhí)行的時(shí)間唯欣,然后sleep到下次格子開始執(zhí)行時(shí)間
#2
tick是從HashedWheelTimer啟動(dòng)后移動(dòng)的總格子數(shù)嘹吨,這里獲取tick對(duì)應(yīng)的格子索引。
由于Long類型足夠大境氢,這里并不考慮溢出問題蟀拷。
#4
將HashedWheelTimer#timeouts的任務(wù)遷移到對(duì)應(yīng)的格子中
#5
處理已到期任務(wù)
#6
移動(dòng)到下一個(gè)格子
#7
這里是HashedWheelTimer#stop后的邏輯處理,取消任務(wù)萍聊,停止時(shí)間輪
遷移任務(wù) Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() {
// #1
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// #2
long calculated = timeout.deadline / tickDuration;
// #3
timeout.remainingRounds = (calculated - tick) / wheel.length;
// #4
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// #5
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
#1
注意问芬,每次只遷移100000個(gè)任務(wù),以免阻塞線程
#2
任務(wù)延遲時(shí)間/每格時(shí)間數(shù)寿桨, 得到該任務(wù)需延遲的總格子移動(dòng)數(shù)
#3
(總格子移動(dòng)數(shù) - 已移動(dòng)格子數(shù))/每輪格子數(shù)此衅,得到輪數(shù)
#4
如果任務(wù)在timeouts隊(duì)列放得太久導(dǎo)致已經(jīng)過了執(zhí)行時(shí)間,則使用當(dāng)前tick亭螟, 也就是放到當(dāng)前bucket挡鞍,以便盡快執(zhí)行該任務(wù)
#5
計(jì)算tick對(duì)應(yīng)格子索引,放到對(duì)應(yīng)的格子位置
執(zhí)行到期任務(wù) HashedWheelBucket#expireTimeouts
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// #1
if (timeout.remainingRounds <= 0) {
// #2
next = remove(timeout);
if (timeout.deadline <= deadline) {
// #3
timeout.expire();
} else {
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// #4
timeout.remainingRounds --;
}
timeout = next;
}
}
#1
選擇輪數(shù)小于等于0的任務(wù)
#2
移除任務(wù)
#3
修改狀態(tài)為過期预烙,并執(zhí)行任務(wù)
#4
其他任務(wù)輪數(shù)減1
ScheduledExecutorService使用堆(DelayedWorkQueue)維護(hù)任務(wù)匕累,新增任務(wù)復(fù)雜度為O(logN)。
而 HashedWheelTimer 新增任務(wù)復(fù)雜度為O(1)默伍,所以在任務(wù)非常多時(shí)欢嘿, HashedWheelTimer 可以表現(xiàn)出它的優(yōu)勢(shì)衰琐。
但是任務(wù)較少甚至沒有任務(wù)時(shí),HashedWheelTimer的執(zhí)行線程都需要不斷移動(dòng)炼蹦,也會(huì)造成性能消耗羡宙。
注意,HashedWheelTimer使用同一個(gè)線程調(diào)用和執(zhí)行任務(wù)掐隐,如果某些任務(wù)執(zhí)行時(shí)間過久狗热,則影響后續(xù)定時(shí)任務(wù)執(zhí)行。當(dāng)然虑省,我們也可以考慮在任務(wù)中另起線程執(zhí)行邏輯匿刮。
另外,如果任務(wù)過多探颈,也會(huì)導(dǎo)致任務(wù)長(zhǎng)期滯留在HashedWheelTimer#timeouts中而不能及時(shí)執(zhí)行熟丸。
如果您覺得本文不錯(cuò),歡迎關(guān)注我的微信公眾號(hào)伪节,系列文章持續(xù)更新中光羞。您的關(guān)注是我堅(jiān)持的動(dòng)力!