導(dǎo)讀
原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明出處蛛蒙。
本文源碼地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼
Pipeline
這個(gè)詞翻譯過(guò)來(lái)就是“流水線”的意思爪膊,讀到這里有了解過(guò)設(shè)計(jì)模式的同學(xué)應(yīng)該已經(jīng)想到了捡遍,這里用到的是“責(zé)任鏈模式”。本文我們將以DefaultChannelPipeline
為例看一下Pipeline
的構(gòu)造以及其中重要的數(shù)據(jù)結(jié)構(gòu)眨业。
1 和Pipeline相關(guān)的其他組件
1.1 ChannnelHandler
這是ChannelHandler
中的注釋急膀,翻譯過(guò)來(lái)就是“處理IO事件或者攔截IO操作,并且將其向ChannelPipeline
中的下一個(gè)handler
傳遞”坛猪,說(shuō)白了就是在責(zé)任鏈中注冊(cè)的一系列回調(diào)方法脖阵。
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
its ChannelPipeline
這里的I/O event
就是很多書中提到的“入站事件”,而I/O operation
就是很多書中提到的“出站事件”墅茉,前面我說(shuō)過(guò)命黔,這里我并不準(zhǔn)備這么叫,按我的理解我習(xí)慣把這兩者稱之為“事件”和“命令”就斤。很顯然這里event
和operation
的含義是不一樣的悍募,event
更多地多表示事件發(fā)生了,我們被動(dòng)地收到洋机,而operation
則表示我們主動(dòng)地發(fā)起一個(gè)動(dòng)作或者命令坠宴。
1.2 ChannelHandlerContext
每一個(gè)ChannelHandler
在被添加進(jìn)ChannelPipeline
時(shí)會(huì)被包裝進(jìn)一個(gè)ChannelHandlerContext
。有兩個(gè)特殊的ChannelHandlerContext
除外绷旗,分別是HeadContext
和TailContext
喜鼓,HeadContext
繼承了ChannelInBoundHandler
和ChannelOutBoundHandler
,而TailContext
繼承了ChannelInBoundHandler
衔肢。
每個(gè)ChannelHandlerContext
中有兩個(gè)指針next
和prev
庄岖,這是用來(lái)將多個(gè)ChannelHandlerContext
構(gòu)成雙向鏈表的。
2 Pipeline的構(gòu)造方法
我們以DefaultChannelPipeline
為例角骤,從它的構(gòu)造方法開始隅忿。這里首先將Channel
保存到Pipeline
的屬性中,又初始化了兩個(gè)屬性succeedFuture
和voidPromise
邦尊。這是兩個(gè)特殊的可以共享的Promise
背桐,這兩個(gè)Promise
不是重點(diǎn),不理解也沒(méi)關(guān)系蝉揍。
接下來(lái)的tail
和head
是兩個(gè)特殊的ChannelHandlerContext
链峭,這兩個(gè)是Pipeline
中的重要組件。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Pipeline在執(zhí)行完構(gòu)造方法以后的結(jié)構(gòu)如下圖所示疑苫,head
和tail
構(gòu)成了最簡(jiǎn)單的雙向鏈表熏版。
圖中藍(lán)色填充的就是ChannelHandlerContext
纷责,目前只有HeadContext
和TailContext
,ChannelHandlerContext
中的較窄的矩形表示ChannelHandler
撼短,由于HeadContext
和TailContext
并沒(méi)有包含ChannelHandler
再膳,而是繼承ChannelHandler
,所以這里我們用虛線表示曲横。上下貫通的ChannelHandler
表示既是ChannelInBoundHandler
又是ChannelOutBoundHandler
喂柒,只有上半部分的表示是ChannelInBoundHandler
,只有下半部分的表示是ChannelOutBoundHandler
禾嫉。
3 添加ChannelHandler
在ChannelPipeline
中有很多以add
開頭的方法灾杰,這些方法就是向ChannelPipeline
中添加ChannelHandler
的方法。
-
addAfter
:向某個(gè)ChannelHandler
后邊添加 -
addBefore
:向某個(gè)ChannelHandler
前面添加 -
addFirst
:添加到頭部熙参,不能在head
的前面艳吠,而是緊挨著head
,在head
的后面 -
addLast
:添加到尾部孽椰,不能在tail
的后面昭娩,而是緊挨著tail
,在tail
的前面
我們以最常用的的addLast
方法為例來(lái)分析一下Pipeline
中添加ChannelHandler
的操作黍匾。
這里所貼出的addLast
方法其實(shí)我們已經(jīng)在“服務(wù)端啟動(dòng)流程”這篇文章中打過(guò)照面了栏渺。方法參數(shù)中的EventExecutorGroup
意味著我們可以為這個(gè)ChannelHandler
單獨(dú)設(shè)置Excutor
而不使用Channel
所綁定的EventLoop
,一般情況下我們不這么做锐涯,所以group
參數(shù)為null
磕诊。
這里先把ChannelHandler
包裝成ChannelHandlerContext
,再添加到尾部纹腌,隨后調(diào)用ChannelHandler
的HandlerAdded
方法霎终。
在調(diào)用HandlerAdded
方法時(shí)有一點(diǎn)問(wèn)題,添加ChannelHandler
的操作不需要在EventLoop
線程中進(jìn)行升薯,而HandlerAdded
方法則必須在EventLoop
線程中進(jìn)行神僵。也就是說(shuō)存在添加Handler
時(shí)還未綁定EventLoop
的情況,此時(shí)則調(diào)用newCtx.setAddPending()
將當(dāng)前HandlerContext
設(shè)置為ADD_PENDING
狀態(tài)覆劈,并且調(diào)用callHandlerCallbackLater(newCtx, true)
將一個(gè)異步任務(wù)添加到一個(gè)單向隊(duì)鏈表中,即pendingHandlerCallbackHead
這個(gè)鏈表沛励。
如果當(dāng)前已經(jīng)綁定了EventLoop
责语,則看當(dāng)前調(diào)用線程是否為EventLoop
線程,如果不是則向EventLoop
提交一個(gè)異步任務(wù)調(diào)用callHandlerAdded0
方法目派,否則直接調(diào)用callHandlerAdded0
方法坤候。
下面咱們依次分析一下newContext
,callHandlerCallbackLater
和callHandlerAdd0
方法企蹭。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//先把`handler`包裝成`HandlerContext`
newCtx = newContext(group, filterName(name, handler), handler);
//添加到尾部
addLast0(newCtx);
//如果還未綁定`EventLoop`則稍后再發(fā)起對(duì)`HandlerAdded`方法的調(diào)用白筹。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//如果已經(jīng)綁定了EventLoop智末,并且當(dāng)前線程非EventLoop線程的話就提交一個(gè)異步任務(wù),就發(fā)起一個(gè)異步任務(wù)去調(diào)用HandlerAdded方法徒河。
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//如果當(dāng)前線程是EventLoop線程系馆,就直接調(diào)用HandlerAdded方法。
callHandlerAdded0(newCtx);
return this;
}
3.1 newContext
先看來(lái)一下newContext
方法顽照,這里直接調(diào)用了DefaultChannelHandlerContext
的構(gòu)造方法由蘑,咱們跟進(jìn)去看看。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
在DefaultChannelHandlerContext
的構(gòu)造方法中又調(diào)用了父類AbstractChannelHandlerContext
的構(gòu)造方法代兵,保存了handler
屬性尼酿。在調(diào)用父類構(gòu)造方法之前調(diào)用了isInboud
和isOutbound
方法判斷當(dāng)前的Handler
是否為ChannelInBoundHandler
或者ChannelOutBoundHandler
,這兩個(gè)方法很簡(jiǎn)單植影,不再展開。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
接下來(lái)看AbstractChannelHandlerContext
的構(gòu)造方法,這里非常簡(jiǎn)單抢野,保存了幾個(gè)屬性指孤,咱們看一下ordered
這個(gè)屬性恃轩。ordered
表示EventExecutor
在執(zhí)行異步任務(wù)時(shí)是否按添加順序執(zhí)行叉跛,這里一般情況下executor
為null
筷厘,表示使用Channel
所綁定的EventLoop
線程,而EventLoop
線程都是OrderedEventExecutor
的實(shí)現(xiàn)類充石。所以這里我們不考慮ordered
為false
的情況霞玄。
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
上面提到了ChannelHandlerContext
可以在構(gòu)造方法里單獨(dú)指定EventExecutor
,如果沒(méi)有單獨(dú)指定的話就使用Channel
所綁定的EventLoop
喊暖,代碼在哪里呢补鼻,就在AbstractChannelHandlerContext#executor
方法风范,非常簡(jiǎn)單硼婿,如果沒(méi)有為當(dāng)前ChannelHandler
指定excutor
則返回Channel
所綁定的EventLoop
寇漫。
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
3.2 callHandlerCallbackLater
在添加完ChannelHandler
之后將調(diào)用ChannledHandler
的handlerAdded
方法记焊,但是此時(shí)有可能還未綁定EventLoop
遍膜,而handlerAdded
方法的調(diào)用必須在EventLoop
線程內(nèi)執(zhí)行,此時(shí)就需要調(diào)用callHandlerCallbackLater
方法在pendingHandlerCallbackHead
鏈表中添加一個(gè)PendingHandlerAddedTask
挽懦。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
接下來(lái)咱們看一下PendingHandlerAddedTask
的代碼信柿,邏輯在execute
方法里,這里直接調(diào)用了callHandlerAdded0
醒第。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
}
}
}
}
3.3 callHandlerAdded0
不管是在未綁定EventLoop
的情況下延遲調(diào)用handlerAdded
還是在已經(jīng)綁定了EventLoop
的情況下立即調(diào)用HandlerAdded
,最終都會(huì)調(diào)用到callHandlerAdded0
方法淘讥。這里干了兩件事堤如,一是調(diào)用ChannelHandler
的handlerAdded
方法蒲列,二是將HandlerContext
的狀態(tài)設(shè)置為ADD_COMPLETE
狀態(tài)。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
}
3.4 添加多個(gè)ChannelHandler后的Pipeline
還記得咱們的“Netty整體架構(gòu)圖”嗎蝗岖,在這里咱們把Pipeline
部分單獨(dú)放大拿出來(lái)看一下侥猩,在添加完多個(gè)ChannelHandler
之后,Pipeline
的結(jié)構(gòu)是這樣的欺劳。
4 刪除ChannelHandler
在Pipeline
中有幾個(gè)以remove
開頭的方法划提,這些方法的作用就是刪除ChannelHandler
鹏往。
-
remove(ChannelHandler handler)
:從head
向tail
查找伊履,用==
判斷是否為同一實(shí)例唐瀑,只刪除第1個(gè)插爹。 -
remove(Class<T> handlerType)
:從head
向tail
查找哄辣,用isAssignableFrom
方法判斷是否為符合條件的類型,只刪除第1個(gè)递惋。 -
remove(String name)
:從head
向tail
查找柔滔,用name
精確匹配查找,只刪除第1個(gè)萍虽,因?yàn)?code>name不能重復(fù)睛廊,所以這里刪除第1個(gè)也是唯一的1個(gè)。 -
removeFirst
:刪除head
的后一個(gè)杉编,不能刪除tail
超全。 -
removeLast
:刪除tail
的前一個(gè),不能刪除head
邓馒。
上述無(wú)論哪種刪除方式在查找到對(duì)應(yīng)的HandlerContext
后都會(huì)調(diào)用到remove(final AbstractChannelHandlerContext ctx)
方法嘶朱,查找過(guò)程比較簡(jiǎn)單,咱們不再展開光酣,直接看remove(final AbstractChannelHandlerContext ctx)
方法疏遏。
看看這個(gè)方法的實(shí)現(xiàn),是不是和addLast(EventExecutorGroup group, String name, ChannelHandler handler)
很相似,非常相似财异。首先從雙向鏈表中刪除ChannelHandlerContext
倘零,再調(diào)用callHandlerRemoved0
方法,callHandlerRemoved0
方法內(nèi)會(huì)調(diào)用handlerRemoved
方法戳寸,這個(gè)調(diào)用必須在EventLoop
線程內(nèi)進(jìn)行呈驶。如果刪除時(shí)還未綁定EventLoop
則添加一個(gè)異步任務(wù)到鏈表pendingHandlerCallbackHead
中。
如果已經(jīng)綁定了EventLoop
并且當(dāng)前線程非EventLoop
線程則向EventLoop
提交一個(gè)異步任務(wù)疫鹊,否則直接調(diào)用callHandlerRemoved0
方法袖瞻。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
synchronized (this) {
//從雙向鏈表中刪除`ChannelHandlerContext`
remove0(ctx);
//如果還未綁定`EventLoop`,則稍后調(diào)用`handlerRemoved`方法
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
//如果已經(jīng)綁定了`EventLoop`拆吆,但是當(dāng)前線程非`EventLoop`線程的話聋迎,就發(fā)起一個(gè)異步任務(wù)調(diào)用callHandlerRemoved0方法
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
//如果當(dāng)前線程就是`EventLoop`線程,則直接調(diào)用callHandlerRemoved0方法锈拨。
callHandlerRemoved0(ctx);
return ctx;
}
callHandlerCallbackLater
方法咱們前面已經(jīng)分析過(guò)砌庄,和添加ChannelHandler
時(shí)不同的是,這里向鏈表添加的是PendingHandlerRemovedTask
奕枢,這個(gè)類也很簡(jiǎn)單娄昆,不再展開。
這里咱們只看一下callHandlerRemoved0
方法缝彬。這個(gè)方法很簡(jiǎn)單萌焰,調(diào)用handlerRemoved
方法,再把ChannelHandlerContext
的狀態(tài)設(shè)置為REMOVE_COMPLETE
谷浅。
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
4 pendingHandlerCallbackHead鏈表中的任務(wù)什么時(shí)候調(diào)用
在AbstractUnsafe
的register0
方法中扒俯,在綁定EventLoop
以后,會(huì)調(diào)用pipeline.invokeHandlerAddedIfNeeded()
方法一疯,我們看一下pipeline.invokeHandlerAddedIfNeeded()
方法撼玄。
private void register0(ChannelPromise promise) {
try {
// 去完成那些在綁定EventLoop之前觸發(fā)的添加handler操作,這些操作被放在pipeline中的pendingHandlerCallbackHead中墩邀,是個(gè)鏈表
pipeline.invokeHandlerAddedIfNeeded();
}
invokeHandlerAddedIfNeeded
方法調(diào)用了callHandlerAddedForAllHandlers
方法掌猛,咱們接著看下去。
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
callHandlerAddedForAllHandlers
方法的邏輯咱就不再展開來(lái)說(shuō)了眉睹,非常簡(jiǎn)單荔茬,就是遍歷pendingHandlerCallbackHead
這個(gè)單向鏈表,依次調(diào)用每個(gè)元素的execute
方法竹海,并且清空這個(gè)單向鏈表慕蔚。
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
5 總結(jié)
Pipeline
中的最重要的數(shù)據(jù)結(jié)構(gòu)就是由多個(gè)ChannelHandlerContext
組成的雙向鏈表,而每個(gè)ChannelHandlerContext
中包含一個(gè)ChannelHandler
斋配,ChannelHandler
既可以添加也可以刪除孔飒。在Pipeline
中有兩個(gè)特殊的ChannelHandlerContext
分別是HeadContext
及TailContext
灌闺,這兩個(gè)ChannelHandlerContext
中不包含ChannelHandler
,而是采用繼承的方式十偶。HeadContext
實(shí)現(xiàn)了ChannelOutBoundHandler
和ChannelInBoundHandler
菩鲜,而TailContext
實(shí)現(xiàn)了ChannelInBoundHandler
。
關(guān)于作者
王建新惦积,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負(fù)責(zé)服務(wù)治理猛频、RPC框架狮崩、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等鹿寻。愛技術(shù)睦柴、愛學(xué)習(xí),歡迎聯(lián)系交流毡熏。
原創(chuàng)文章坦敌,碼字不易,點(diǎn)贊分享痢法,手有余香狱窘。