在netty中忘分,每個channel都有僅有一個ChannelPipeline與之對應档押,ChannelPipeline中又維護了一個由ChannelHandlerContext 組成的雙向鏈表,這個鏈表的頭是 HeadContext, 鏈表的尾是 TailContext, 并且每個 ChannelHandlerContext 中關聯著一個 ChannelHandler.
我們來回顧下Channel的初識化過程:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
AbstractChannel 有一個 pipeline 字段, 在構造器中會初始化它為 DefaultChannelPipeline的實例. 也即每個 Channel 都有一個 ChannelPipeline.
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在 DefaultChannelPipeline 構造器中, 首先將與之關聯的 Channel 保存到字段 channel 中, 然后實例化兩個 ChannelHandlerContext, 一個是 HeadContext 實例 head, 另一個是 TailContext 實例 tail. 接著將 head 和 tail 互相指向, 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 為節(jié)點的雙向鏈表, 這個鏈表是 Netty 實現 Pipeline 機制的關鍵.
從類層次結構圖中可以很清楚地看到, head 實現了 ChannelInboundHandler, 而 tail 實現了 ChannelOutboundHandler 接口, 并且它們都實現了 ChannelHandlerContext 接口, 因此可以說 head 和 tail 即是一個 ChannelHandler, 又是一個 ChannelHandlerContext.
接著看一下 HeadContext 的構造器:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}
它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = false, outbound = true.
TailContext 的構造器與 HeadContext 的相反, 它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = true, outbound = false.
ChannelInitializer 的添加
我們知道最開始的時候 ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 但是這個 Pipeline并不能實現什么特殊的功能, 因為我們還沒有給它添加自定義的 ChannelHandler.
通常來說, 我們在初始化 Bootstrap, 會添加我們自定義的 ChannelHandler, 就以我們熟悉的 EchoClient 來舉例吧:
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
在調用 handler 時, 傳入了 ChannelInitializer 對象, 它提供了一個 initChannel 方法供我們初始化 ChannelHandler.
ChannelInitializer 實現了 ChannelHandler, 那么它是在什么時候添加到 ChannelPipeline 中的呢? 它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
...
}
上面的代碼將 handler() 返回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 返回的是handler 其實就是我們在初始化 Bootstrap 調用 handler 設置的 ChannelInitializer 實例,因此這里就是將 ChannelInitializer 插入到了 Pipeline 的末端.
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name); // 檢查此 handler 是否有重復的名字
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
上面的 addLast 方法中, 首先檢查這個 ChannelHandler 的名字是否是重復的, 如果不重復的話, 則為這個 Handler 創(chuàng)建一個對應的 DefaultChannelHandlerContext 實例, 并與之關聯起來(Context 中有一個 handler 屬性保存著對應的 Handler 實例). 判斷此 Handler 是否重名的方法很簡單: Netty 中有一個 name2ctx Map 字段, key 是 handler 的名字, 而 value 則是 handler 本身. 因此通過如下代碼就可以判斷一個 handler 是否重名了:
private void checkDuplicateName(String name) {
if (name2ctx.containsKey(name)) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
為了添加一個 handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 因此在上面的代碼中我們可以看到新實例化了一個 newCtx 對象, 并將 handler 作為參數傳遞到構造方法中.
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
super(pipeline, group, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
DefaultChannelHandlerContext 的構造器中, 調用了兩個很有意思的方法: isInbound 與 isOutbound, 這兩個方法是做什么的呢?
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
可以看到當一個 handler 實現了 ChannelInboundHandler 接口, 則 isInbound 返回真; 相似地, 當一個 handler 實現了 ChannelOutboundHandler 接口, 則 isOutbound 就返回真. 而這兩個 boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 并初始化父類的兩個字段: inbound 與 outbound.
那么這里的 ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound 與 inbound 字段分別是什么呢? 那就看一下 ChannelInitializer 到底實現了哪個接口不就行了?
可以清楚地看到, ChannelInitializer 僅僅實現了 ChannelInboundHandler 接口, 因此這里實例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false. 其實這兩個字段關系到 pipeline 的事件的流向與分類塌西,這個我們后面再詳細分析脖阵。
當創(chuàng)建好 Context 后, 就將這個 Context 插入到 Pipeline 的雙向鏈表中:
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
自定義 ChannelHandler 的添加過程
在上面, 我們已經分析了一個 ChannelInitializer 如何插入到 Pipeline 中的, 接下來就來探討一下 ChannelInitializer 在哪里被調用, ChannelInitializer 的作用, 以及我們自定義的 ChannelHandler 是如何插入到 Pipeline 中的.
在netty客戶端我們已經分析過 Channel 的注冊過程了, 這里簡單回顧一下:
首先在 AbstractBootstrap.initAndRegister中, 通過 group().register(channel), 調用 MultithreadEventLoopGroup.register 方法
在MultithreadEventLoopGroup.register 中, 通過 next() 獲取一個可用的 SingleThreadEventLoop, 然后調用它的 register
在 SingleThreadEventLoop.register 中, 通過 channel.unsafe().register(this, promise) 來獲取 channel 的 unsafe() 底層操作對象, 然后調用它的 register.
在 AbstractUnsafe.register 方法中, 調用 register0 方法注冊 Channel
在 AbstractUnsafe.register0 中, 調用 AbstractNioChannel#doRegister 方法
AbstractNioChannel.doRegister 方法通過 javaChannel().register(eventLoop().selector, 0, this) 將 Channel 對應的 Java NIO SockerChannel 注冊到一個 eventLoop 的 Selector 中, 并且將當前 Channel 作為 attachment.
而我們自定義 ChannelHandler 的添加過程, 發(fā)生在 AbstractUnsafe.register0 中, 在這個方法中調用了 pipeline.fireChannelRegistered() 方法, 其實現如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
上面的代碼很簡單, 就是調用了 head.fireChannelRegistered() 方法而已.
還記得 head 的 類層次結構圖不, head 是一個 AbstractChannelHandlerContext 實例, 并且它沒有重寫 fireChannelRegistered 方法, 因此 head.fireChannelRegistered 其實是調用的 AbstractChannelHandlerContext.fireChannelRegistered:
public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}
這個方法的第一句是調用 findContextInbound 獲取一個 Context, 那么它返回的 Context 到底是什么呢? 我們跟進代碼中看一下:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
很顯然, 這個代碼會從 head 開始遍歷 Pipeline 的雙向鏈表, 然后找到第一個屬性 inbound 為 true 的 ChannelHandlerContext 實例. 想起來了沒? 我們在前面分析 ChannelInitializer 時, 花了大量的筆墨來分析了 inbound 和 outbound 屬性, 你看現在這里就用上了. 回想一下, ChannelInitializer 實現了 ChannelInboudHandler, 因此它所對應的 ChannelHandlerContext 的 inbound 屬性就是 true, 因此這里返回就是 ChannelInitializer 實例所對應的 ChannelHandlerContext. 即:
當獲取到 inbound 的 Context 后, 就調用它的 invokeChannelRegistered 方法:
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
我們已經強調過了, 每個 ChannelHandler 都與一個 ChannelHandlerContext 關聯, 我們可以通過 ChannelHandlerContext 獲取到對應的 ChannelHandler. 因此很顯然了, 這里 handler() 返回的, 其實就是一開始我們實例化的 ChannelInitializer 對象, 并接著調用了 ChannelInitializer.channelRegistered 方法.
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
initChannel 這個方法我們很熟悉了吧, 它就是我們在初始化 Bootstrap 時, 調用 handler 方法傳入的匿名內部類所實現的方法:
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
因此當調用了這個方法后, 我們自定義的 ChannelHandler 就插入到 Pipeline 了, 此時的 Pipeline 如下圖所示:
當添加了自定義的 ChannelHandler 后, 會刪除 ChannelInitializer 這個 ChannelHandler, 即 "ctx.pipeline().remove(this)", 因此最后的 Pipeline 如下: