Netty源碼愫讀(三)ChannelPipeline饶氏、ChannelHandlerContext相關(guān)源碼學(xué)習(xí)

1、Channel打厘、ChannelPipeline、ChannelHandler贺辰、ChannelHandlerContext關(guān)系

四者關(guān)系如下圖:

關(guān)系png.png

Channel:Channel為通信載體户盯,負責底層傳輸層的具體事件及消息處理,其封裝底層處理的復(fù)雜性饲化,通過統(tǒng)一接口將事件及消息交給ChannelPipeline處理莽鸭。

ChannelPipeline:ChannelPipeline為消息的管道,一個Channel對應(yīng)唯一ChannelPipeline吃靠,ChannelPipeline中包含多個ChannelHandlerContext硫眨,各個ChannelHandlerContext以鏈表的形式構(gòu)成消息處理的責任鏈,而ChannelPipeline并不對消息做處理巢块,其只是轉(zhuǎn)發(fā)給ChannelHandlerContext處理礁阁,而ChannelHandlerContext又交給具體的ChannelHandler處理,并將處理后的消息沿著鏈表轉(zhuǎn)發(fā)給下一個ChannelHandlerContext夕冲。

ChannelHandlerContext:ChannelHandlerContext為ChannelPipeline和ChannelHandler的上下文,其保存對應(yīng)的ChannelPipeline及ChannelHandler裂逐,并且根據(jù)添加順序歹鱼,多個ChannelHandlerContext之間構(gòu)成鏈表。ChannelHandlerContext提供和ChannelPipeline類似的方法卜高,但調(diào)用ChannelHandlerContext上的方法只會從當前的ChannelHandler開始向下一個ChannelHandler傳播弥姻;而調(diào)用ChannelPipeline上的方法會從鏈表頭或尾向下傳播南片。

ChannelHandler:ChannelHandler為具體的消息處理類,其由應(yīng)用層定義庭敦。消息由某個ChannelHandler處理完后疼进,會沿著鏈表將消息交由下個ChannelHandler處理。

2秧廉、ChannelPipeline源碼分析

ChannelPipeline類繼圖:

ChannelPipeline類繼圖.png

說明:

Iterable:遍歷器接口伞广,具體接口為:Iterable<Entry<String, ChannelHandler>>,其提供iterator()疼电、forEach()等方法嚼锄,用于遍歷管道中的ChannelHandler。

ChannelInboundInvoker:管道的入口事件處理接口蔽豺,對于Channel中的入口事件都是通過此接口進行處理的区丑。消息類型包括:register、unregister修陡、active沧侥、inactive、exception魄鸦、read等宴杀;

ChannelOutboundInvoker:管道的出口事件處理接口,對于Channel相關(guān)的出口事件都是通過此接口進行處理的号杏。消息類型包括:bind婴氮、connect、close盾致、write主经、flush等。

ChannelPipeline:管道相關(guān)操作接口庭惜,提供了對管道中的ChannelHandler進行增刪改查等接口罩驻,包括:addFirst、addLast等护赊。

DefaultChannelPipeline:ChannelPipeline的默認實現(xiàn)類惠遏。

2.1、遍歷處理器

pipeline提供其對應(yīng)的Handler的遍歷處理接口骏啰。Iterable<Entry<String, ChannelHandler>及ChannelPipeline中的部分方法节吮。

2.1.1、方法說明

方法名稱 返回值 功能說明
iterator() Iterator<Map.Entry<String, ChannelHandler>> 返回Map.Entry<String, ChannelHandler>的遍歷器判耕,此map即為pipeline中所有Handler的name與Handler的map透绩。
names() List<String> 返回所有handler的name的集合
toMap() Map<String, ChannelHandler> 返回handler的name與ChannelHandler的map

2.1.2、方法實現(xiàn)

iterator()及toMap()方法實現(xiàn):

public final Map<String, ChannelHandler> toMap() {
    Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == tail) {
            return map;
        }
        map.put(ctx.name(), ctx.handler());
        ctx = ctx.next;
    }
}

@Override
public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
    return toMap().entrySet().iterator();
}

由以上toMap()實現(xiàn)可知,map中為pipeline中從head到tail的handler的map帚豪;

2.2碳竟、inbound事件

當發(fā)生I/O事件時,如鏈路建立連接狸臣、鏈路關(guān)閉莹桅、讀取數(shù)據(jù)完成等,都會產(chǎn)生一個事件烛亦,事件在pipeline中進行傳播和處理诈泼,它是實際處理的總?cè)肟凇etty將有限的網(wǎng)絡(luò)I/O事件進行統(tǒng)一抽象此洲,ChannelInboundInvoker即為pipeline抽象的入口接口厂汗。pipeline中以fireXXX命名的方法都是從I/O線程流向用戶業(yè)務(wù)Handler的inbound消息。

2.2.1呜师、方法說明

方法名稱 返回值 功能說明
fireChannelRegistered() ChannelInboundInvoker 當Channel 已經(jīng)注冊到它的EventLoop 并且能夠處理I/O 時被調(diào)用
fireChannelUnregistered() ChannelInboundInvoker 當Channel 從它的EventLoop 注銷并且無法處理任何I/O 時被調(diào)用
fireChannelActive() ChannelInboundInvoker 當Channel 處于活動狀態(tài)時被調(diào)用娶桦;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒
fireChannelInactive() ChannelInboundInvoker 當Channel 離開活動狀態(tài)并且不再連接它的遠程節(jié)點時被調(diào)用
fireExceptionCaught(Throwable cause) ChannelInboundInvoker Channel異常事件
fireUserEventTriggered(Object event) ChannelInboundInvoker 當ChannelnboundHandler.fireUserEventTriggered()方法被調(diào)
fireChannelRead(Object msg) ChannelInboundInvoker 當從Channel 讀取數(shù)據(jù)時被調(diào)用
fireChannelReadComplete() ChannelInboundInvoker 當Channel上的一個讀操作完成時被調(diào)用
fireChannelWritabilityChanged() ChannelInboundInvoker 當Channel 的可寫狀態(tài)發(fā)生改變時被調(diào)用。用戶可以確保寫操作不會完成得太快(以避免發(fā)生OutOfMemoryError)或者可以在Channel 變?yōu)樵俅慰蓪憰r恢復(fù)寫入汁汗≈云瑁可以通過調(diào)用Channel 的isWritable()方法來檢測Channel 的可寫性。與可寫性相關(guān)的閾值可以通過Channel.config().setWriteHighWatesetWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法來設(shè)置

2.2.2知牌、方法實現(xiàn)

pipeline中inbound事件的處理都非常簡單祈争,其主要交由AbstractChannelHandlerContext中對應(yīng)的靜態(tài)方法進行處理。

部分處理源碼如下:

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}

2.3角寸、outbound事件

ChannelOutboundInvoker是outbound消息的接口菩混,由用戶或者代碼發(fā)起的I/O操作被稱為outbound消息,即為從pipeline中流程的消息的統(tǒng)稱扁藕。

2.3.1沮峡、方法說明

方法名稱 返回值 功能說明
bind(SocketAddress localAddress) ChannelFuture 當請求將Channel 綁定到本地地址時被調(diào)用,綁定成功或失敗都通過ChannelFuture進行通知
connect(SocketAddress remoteAddress) ChannelFuture 當請求將Channel 連接到遠程節(jié)點時被調(diào)用亿柑,當連接超時時拋出ConnectTimeoutException邢疙,當連接被拒絕時,將拋出ConnectException
connect(SocketAddress remoteAddress, SocketAddress localAddress) ChannelFuture
disconnect() ChannelFuture 當請求將Channel 從遠程節(jié)點斷開時被調(diào)用望薄,不論處理成功或失敗疟游,都會進行通知
close() ChannelFuture 當請求關(guān)閉Channel 時被調(diào)用
deregister() ChannelFuture 當請求將Channel 從它的EventLoop 注銷時被調(diào)用
bind(SocketAddress localAddress, ChannelPromise promise) ChannelFuture 當請求將Channel 綁定到本地地址時被調(diào)用
connect(SocketAddress remoteAddress, ChannelPromise promise) ChannelFuture 當請求將Channel 連接到遠程節(jié)點時被調(diào)用
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ChannelFuture 當請求將Channel 連接到遠程節(jié)點時被調(diào)用
disconnect(ChannelPromise promise) ChannelFuture 當請求將Channel 從遠程節(jié)點斷開時被調(diào)用
close(ChannelPromise promise) ChannelFuture 請求關(guān)閉Channel 時被調(diào)用
deregister(ChannelPromise promise) ChannelFuture 當請求將Channel 從它的EventLoop 注銷時被調(diào)用
read() ChannelFuture 當請求從Channel 讀取更多的數(shù)據(jù)時被調(diào)用
write(Object msg) ChannelFuture 當請求通過Channel 將數(shù)據(jù)寫到遠程節(jié)點時被調(diào)用
write(Object msg, ChannelPromise promise) ChannelFuture 當請求通過Channel 將數(shù)據(jù)寫到遠程節(jié)點時被調(diào)用
flush() ChannelOutboundInvoker 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用
writeAndFlush(Object msg, ChannelPromise promise) ChannelFuture 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用
writeAndFlush(Object msg) ChannelFuture 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用
newPromise() ChannelPromise 返回一個新的ChannelPromise
newProgressivePromise() ChannelProgressivePromise 返回一個新的ChannelProgressivePromise
newSucceededFuture() ChannelFuture 返回一個已被標記為成功的ChannelFuture,所有與此ChannelFuture綁定的監(jiān)聽器都將被通知痕支,所有阻塞調(diào)用也將直接返回
newFailedFuture(Throwable cause) ChannelFuture 返回一個已被標記為失敗的ChannelFuture颁虐,所有與此ChannelFuture綁定的監(jiān)聽器都將被通知,所有阻塞調(diào)用也將直接返回
voidPromise() ChannelPromise 返回一個不同操作也重用的ChannelPromise卧须,但使用有一定限制另绩,需要小心使用

2.3.2瞬痘、實現(xiàn)源碼

實現(xiàn)源碼如下:

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

@Override
public final ChannelFuture close() {
    return tail.close();
}

@Override
public final ChannelFuture deregister() {
    return tail.deregister();
}

@Override
public final ChannelPipeline flush() {
    tail.flush();
    return this;
}

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(
        SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}

@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
    return tail.disconnect(promise);
}

@Override
public final ChannelFuture close(ChannelPromise promise) {
    return tail.close(promise);
}

@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
    return tail.deregister(promise);
}

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return tail.writeAndFlush(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

@Override
public final ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel);
}

@Override
public final ChannelProgressivePromise newProgressivePromise() {
    return new DefaultChannelProgressivePromise(channel);
}

@Override
public final ChannelFuture newSucceededFuture() {
    return succeededFuture;
}

@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
    return new FailedChannelFuture(channel, null, cause);
}

@Override
public final ChannelPromise voidPromise() {
    return voidPromise;
}

由以上源碼可知,outbound的具體實現(xiàn)都是交由tail(ChannelHandlerContext)來實現(xiàn)的板熊。

2.4、ChannelPipeline鏈表維護

ChannelPipeline中維護了一個ChannelHandlerContext的鏈表察绷,I/O事件通過鏈表在用戶的Handler中傳播干签。

2.4.1、鏈表維護接口

方法名稱 返回值 功能說明
addFirst(String name, ChannelHandler handler) ChannelPipeline 將handler添加到pipeline隊列的頭部
addLast(String name, ChannelHandler handler) ChannelPipeline 將handler添加到pipeline隊列的尾部
addBefore(String baseName, String name, ChannelHandler handler) ChannelPipeline 將handler添加到baseName對應(yīng)的handler之前
addAfter(String baseName, String name, ChannelHandler handler) ChannelPipeline 將handler添加到baseName對應(yīng)的handler之后
addFirst(ChannelHandler... handlers) ChannelPipeline 按順序批量添加Handler到隊列頭部
addLast(ChannelHandler... handlers) ChannelPipeline 按順序批量添加Handler到隊列尾部
remove(ChannelHandler handler) ChannelPipeline 移除handler
remove(String name) ChannelHandler 移除名字為name的handler
remove(Class<T> handlerType) ChannelPipeline 移除類型為handlerType的handler
removeFirst() ChannelPipeline 移除第一個handler
removeLast() ChannelPipeline 移除最后一個handler
replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) ChannelPipeline 用newHandler替換oldHandler
first() ChannelHandler 獲取第一個Handler
firstContext() ChannelHandlerContext 獲取第一個Context
last() ChannelHandler 獲取最后一個Handler
lastContext() ChannelHandlerContext 獲取最后一個Context
get(String name) ChannelHandler 通過名字獲取Handler
context(ChannelHandler handler) ChannelHandlerContext 通過Handler獲取其對應(yīng)的Context
context(String name) ChannelHandlerContext 通過Handler的名字獲取其對應(yīng)的Context

注:以上接口中添加的頭尾不包括head節(jié)點和tail節(jié)點拆撼,這兩節(jié)點為netty框架的節(jié)點容劳,不允許用戶修改。

2.4.2闸度、接口實現(xiàn)

以下對主要接口的源碼進行分析竭贩。

addFirst():

public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);

        newCtx = newContext(group, name, handler);

        addFirst0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

主要流程為:

  • 檢查Handler是否在多個pipeline中重復(fù)添加。被注解為@Sharable的Handler是可以在多個pipeline中重復(fù)添加的莺禁,否則為保證線程安全留量,不允許在多個pipeline中添加。
  • 檢查handler名字是否重復(fù)哟冬。如果添加時的name為空楼熄,則由框架自動生成name,生成規(guī)則為:[SimpleName] + "#" + [數(shù)字]浩峡,數(shù)字從0累加可岂,知道名字不重復(fù)為止。如果添加時的name不空翰灾,則檢查name是否重復(fù)缕粹,重復(fù)則拋出IllegalArgumentException異常,否則驗證通過纸淮;
  • 根據(jù)pipeline平斩、EventExecutorGroup、name萎馅、handler新建一個ChannelHandlerContext双戳;
  • 挑用addFirst0()將新建的context添加到pipeline中head的下一個節(jié)點;
  • 若Channel還未在EventLoop中注冊糜芳,則注冊PendingHandlerAddedTask任務(wù)飒货,當Channel注冊成功時,調(diào)用ChannelHandler.handlerAdded()方法峭竣;若Channel已經(jīng)注冊成功則直接調(diào)用callHandlerAdded0()方法來通過管道調(diào)用所有Handler的ChannelHandler.handlerAdded()方法塘辅。

addLast():

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

addLast()的實現(xiàn)源碼與addFirst基本一樣,唯一區(qū)別是將handler添加的pipeline的tail節(jié)點的前一個節(jié)點皆撩。

2.5扣墩、DefaultChannelPipeline源碼分析

DefaultChannelPipeline為ChannelPipeline接口的實現(xiàn)哲银。也定義了Pipeline中的head和tail節(jié)點及實現(xiàn)等。

2.5.1呻惕、基本屬性

private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
        AtomicReferenceFieldUpdater.newUpdater(
                DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();

private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;

/**
 * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
 * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
 *
 * We only keep the head because it is expected that the list is used infrequently and its size is small.
 * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
 * complexity.
 */
private PendingHandlerCallback pendingHandlerCallbackHead;

/**
 * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
 * change.
 */
private boolean registered;

HEAD_NAME:head對應(yīng)的Handler的名字荆责;

TAIL_NAME:tail對應(yīng)的handler的名字;

nameCaches:Handler與其name的map的緩存亚脆;

ESTIMATOR:消息中字節(jié)大小統(tǒng)計器做院;

head:pipeline隊列的頭節(jié)點,其是ChannelHandlerContext與ChannelHandler的實現(xiàn)濒持。

tail:pipeline隊列的尾節(jié)點键耕,其是ChannelHandlerContext與ChannelHandler的實現(xiàn)。

channel:pipeline對應(yīng)的Channel柑营;

succeededFuture:處理成功的異步結(jié)果屈雄;

voidPromise:通用的異步處理結(jié)果;

childExecutors:子執(zhí)行器官套;

estimatorHandle:消息字節(jié)大小統(tǒng)計器的處理器酒奶;

firstRegistration:是否第一次注冊

pendingHandlerCallbackHead:頭節(jié)點一些事件的異步回調(diào)任務(wù);

2.5.2奶赔、構(gòu)造函數(shù)

構(gòu)造函數(shù)源碼:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

構(gòu)造函數(shù)比較簡單讥蟆,主要新建的succeededFuture和voidPromise異步通知,以及鏈表的頭結(jié)點(head)和尾節(jié)點(tail)纺阔。

2.5.3瘸彤、HeadContext源碼解析

HeadContext為Pipeline的頭節(jié)點實現(xiàn),其即時ChannelHandlerContext的實現(xiàn)笛钝,也是ChannelHandler的實現(xiàn)质况。

2.5.3.1、HeadContext類繼承關(guān)系

HeadContext類繼承圖:

HeadContext類繼承圖.png

HeadContext實現(xiàn)ChannelHandler的inbound接口和outbound接口玻靡,也實現(xiàn)了ChannelHandlerContext的inbound及outbound接口结榄。

2.5.4、TailContext源碼解析

TailContext為pipeline的尾節(jié)點實現(xiàn)囤捻,其即時ChannelHandlerContext的實現(xiàn)臼朗,也是ChannelHandler的實現(xiàn)。

TailContext類繼承圖:

TailContext類繼承圖.png

TailContext在實現(xiàn)ChannelHandlerContext接口蝎土,同時實現(xiàn)ChannelHandler的inbound接口视哑。

3、ChannelHandlerContext源碼解析

ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之間的關(guān)聯(lián)誊涯,每當有ChannelHandler 添加到ChannelPipeline 中時挡毅,都會創(chuàng)建ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的ChannelHandler 和在同一個ChannelPipeline 中的其他ChannelHandler 之間的交互暴构。

ChannelHandlerContext中的一些接口在ChannelPipeline中也有實現(xiàn)跪呈,但傳播方向有一點重要的不同段磨。如果調(diào)用Channel 或者ChannelPipeline 上的這些方法,它們將沿著整個ChannelPipeline 進行傳播耗绿。而調(diào)用位于ChannelHandlerContext上的相同方法苹支,則將從當前所關(guān)聯(lián)的ChannelHandler 開始,并且只會傳播給位于該ChannelPipeline 中的下一個能夠處理該事件的ChannelHandler误阻。

ChannelHandlerContext類繼承圖:

ChannelHandlerContext類繼承圖.png

ChannelInboundInvoker:是網(wǎng)絡(luò)I/O的事件的統(tǒng)一抽象沐序,即為inbound事件,方法都以fireXXX開頭堕绩,pipeline也實現(xiàn)此接口。

ChannelOutboundInvoker:是用戶線程或代碼發(fā)起的I/O操作邑时,被稱為outbound事件奴紧。

AttributeMap:存儲屬性鍵值對;

AbstractChannelHandlerContext:ChannelHandlerContext的抽象實現(xiàn)類晶丘,對通用處理進行了處理黍氮;

DefaultChannelHandlerContext:ChannelHandlerContext的默認實現(xiàn),netty框架即使用此實現(xiàn)浅浮;

HeadContext/TailContext:pipeline的頭結(jié)點和尾節(jié)點實現(xiàn)沫浆;

3.1、AbstractChannelHandlerContext源碼分析

AbstractChannelHandlerContext為ChannelHandlerContext的抽象實現(xiàn)滚秩。

3.1.1专执、基本屬性

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
 */
private static final int ADD_PENDING = 1;
/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
 */
private static final int ADD_COMPLETE = 2;
/**
 * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int REMOVE_COMPLETE = 3;
/**
 * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
 * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int INIT = 0;

private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;

// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;

private volatile int handlerState = INIT;

next:pipeline中的下一個ChannelHandlerContext節(jié)點;

prev:pipeline中的上一個ChannelHandlerContext節(jié)點郁油;

inbound:標識此Context對應(yīng)的Handler是否為ChannelInboundHandler類型本股;

outbound:標識此Context對應(yīng)的Handler是否為ChannelOutboundHandler類型;

pipeline:此Context對應(yīng)的Pipeline桐腌;

name:此Context的名字拄显;

ordered:事件順序標志;

executor:事件執(zhí)行線程案站;

succeededFuture:成功的異步處理結(jié)果躬审;

invokeChannelReadCompleteTask:讀完成處理任務(wù);

invokeReadTask:讀數(shù)據(jù)任務(wù)蟆盐;

invokeChannelWritableStateChangedTask:Channel寫狀態(tài)變更任務(wù)承边;

invokeFlushTask:沖刷數(shù)據(jù)任務(wù);

handlerState:當前Handler的狀態(tài)

handlerState有以下四種狀態(tài):

// 初始狀態(tài) 
private static final int INIT = 0; 
// 對應(yīng)Handler的handlerAdded方法將要被調(diào)用但還未調(diào)用 
private static final int ADD_PENDING = 1; 
// 對應(yīng)Handler的handlerAdded方法被調(diào)用 
private static final int ADD_COMPLETE = 2; 
// 對應(yīng)Handler的handlerRemoved方法被調(diào)用 
private static final int REMOVE_COMPLETE = 3;

3.1.1石挂、構(gòu)造函數(shù)

構(gòu)造函數(shù)源碼:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

3.1.2炒刁、inbound事件

AbstractChannelHandlerContext中對inbound事件的處理大同小異,本處只對fireChannelRegistered進行分析誊稚,其他事件處理流程基本相同翔始;

源碼:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

fireChannelRegistered():此方法主要是找到下個inbound類型的Context罗心,并交由invokeChannelRegistered(final AbstractChannelHandlerContext next):靜態(tài)方法進行處理;

invokeChannelRegistered(final AbstractChannelHandlerContext next):此靜態(tài)方法主要判斷事件處理是否在執(zhí)行線程中城瞎,是則直接處理渤闷;否則異步處理。同時脖镀,pipeline中也會調(diào)用此方法對注冊事件進行傳播飒箭,pipeline中fireChannelRegistered事件的處理就是調(diào)用此靜態(tài)方法,而參數(shù)為HeadContext蜒灰,即從head節(jié)點開始傳播注冊事件弦蹂;

invokeChannelRegistered():此方法首先判斷Context的Handler是否已經(jīng)在pipeline中添加完成,完成則直接調(diào)用對應(yīng)Handler的channelRegistered()方法對注冊事件進行處理强窖;否則直接調(diào)用fireChannelRegistered()將事件交由下個inbound類型的Context處理凸椿。

3.1.3、outbound事件

與inbound事件相同翅溺,Context的outbound事件的傳播流程也大體相同脑漫,本處以bind()事件為例進行傳播流程的分析,其他事件傳播流程類似咙崎。

源碼:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

從bind()方法可知优幸,其主要查找下一個Context并調(diào)用invokeBind()進行處理,而invokeBind()又調(diào)用Handler的bind()褪猛;Handler的bind()通用處理是沿著outbound的Context向head節(jié)點傳播网杆,其最終調(diào)用的是pipeline中head節(jié)點的Handler的bind()方法,而head節(jié)點的bind的方法會調(diào)用底層Channel的Unsafe的bind()方法進行最終的bind()操作伊滋。

3.2跛璧、DefaultChannelHandlerContext源碼分析

DefaultChannelHandlerContext為netty的默認ChannelHandlerContext實現(xiàn),其實現(xiàn)非常簡單新啼。

源碼:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

3.2.1追城、基本屬性

handler:context對應(yīng)的ChannelHandler;

3.2.1燥撞、構(gòu)造函數(shù)

構(gòu)造函數(shù)主要通過isInbound()方法和isOutbound()方法判斷此ChannelHandler為inbound或outbound處理器座柱。其他處理都交由AbstractChannelHandlerContext。

相關(guān)閱讀:
Netty源碼愫讀(一)ByteBuf相關(guān)源碼學(xué)習(xí) 【http://www.reibang.com/p/016daa404957
Netty源碼愫讀(二)Channel相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/02eac974258e
Netty源碼愫讀(四)ChannelHandler相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/6ee0a3b9d73a
Netty源碼愫讀(五)EventLoop與EventLoopGroup相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/05096995d296
Netty源碼愫讀(六)ServerBootstrap相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/a71a9a0291f3

參考書籍:
《Netty實戰(zhàn)》
《Netty權(quán)威指南》

參考博客:

http://www.reibang.com/p/4c35541eec10
http://www.reibang.com/p/0b79872eb515
http://www.reibang.com/p/a0a51fd79f62
http://www.wolfbe.com/detail/201609/379.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末物舒,一起剝皮案震驚了整個濱河市色洞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌冠胯,老刑警劉巖火诸,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異荠察,居然都是意外死亡置蜀,警方通過查閱死者的電腦和手機奈搜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盯荤,“玉大人馋吗,你說我怎么就攤上這事∏锍樱” “怎么了宏粤?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長灼卢。 經(jīng)常有香客問我绍哎,道長,這世上最難降的妖魔是什么鞋真? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任崇堰,我火速辦了婚禮,結(jié)果婚禮上灿巧,老公的妹妹穿的比我還像新娘。我一直安慰自己揽涮,他們只是感情好抠藕,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蒋困,像睡著了一般盾似。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上雪标,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天零院,我揣著相機與錄音,去河邊找鬼村刨。 笑死告抄,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的嵌牺。 我是一名探鬼主播打洼,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼逆粹!你這毒婦竟也來了募疮?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤僻弹,失蹤者是張志新(化名)和其女友劉穎阿浓,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蹋绽,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡芭毙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年筋蓖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稿蹲。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡扭勉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出苛聘,到底是詐尸還是另有隱情涂炎,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布设哗,位于F島的核電站唱捣,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏网梢。R本人自食惡果不足惜震缭,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望战虏。 院中可真熱鬧拣宰,春花似錦、人聲如沸烦感。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽手趣。三九已至晌该,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間绿渣,已是汗流浹背朝群。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留中符,地道東北人姜胖。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像淀散,于是被迫代替她去往敵國和親谭期。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容