上一節(jié)我們講了 Netty 的啟動(dòng)流程,從啟動(dòng)流程入手分析了 Reactor 模型的第一步:channel 如何綁定 Selector正驻。然后講到了 EventLoop 在啟動(dòng)的時(shí)候發(fā)揮了什么作用劲藐。整個(gè)啟動(dòng)類我們從頭到尾過了一遍锌钮,今天我們來解決上節(jié)遺留的問題:Selector 如何將請求交給對應(yīng)的 handler處理。
1. handler 的初始化
還是先從啟動(dòng)類入手:
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
跟到childHandler(ChannelHandler childHandler)
方法里面:
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
這里做了一個(gè)賦值的操作扑浸。那么 childHandler
是在哪里被使用呢烧给?用 idea 的查看引用功能可以看到:
這里有個(gè) int()
方法看似比較關(guān)鍵,繼續(xù)跟進(jìn):
// 這是ServerBootStrapt對 他父類初始化 channel的實(shí)現(xiàn), 用于初始化 NioServerSocketChannel
void init(Channel channel) throws Exception {
//ChannelOption 是在配置 Channel 的 ChannelConfig 的信息
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
// 把 NioserverSocketChannel 和 options Map傳遞進(jìn)去, 給Channel里面的屬性賦值
// 這些常量值全是關(guān)于和諸如TCP協(xié)議相關(guān)的信息
channel.config().setOptions(options);
}
//再次一波 給Channel里面的屬性賦值 attrs0()是獲取到用戶自定義的業(yè)務(wù)邏輯屬性 -- AttributeKey
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 這個(gè)map中維護(hù)的是 程序運(yùn)行時(shí)的 動(dòng)態(tài)的 業(yè)務(wù)數(shù)據(jù) , 可以實(shí)現(xiàn)讓業(yè)務(wù)數(shù)據(jù)隨著
//netty的運(yùn)行原來存進(jìn)去的數(shù)據(jù)還能取出來
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// ChannelPipeline 本身 就是一個(gè)重要的組件, 他里面是一個(gè)一個(gè)的處理器,
//說他是高級過濾器,交互的數(shù)據(jù) 會一層一層經(jīng)過它
// 下面直接就調(diào)用了 p , 說明,在channel調(diào)用pipeline方法之前, pipeline已經(jīng)被創(chuàng)建出來了
// 在創(chuàng)建NioServerSocketChannel這個(gè)通道對象時(shí),在他的頂級抽象父類(AbstractChannel)中創(chuàng)建了一個(gè)默認(rèn)的pipeline對象
// ChannelHandlerContext 是 ChannelHandler和Pipeline 交互的橋梁
ChannelPipeline p = channel.pipeline();
// workerGroup 處理IO線程
final EventLoopGroup currentChildGroup = childGroup;
//添加我們自定義的 Initializer
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//默認(rèn) 往NioServerSocketChannel的管道里面添加了一個(gè) ChannelInitializer
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//這個(gè)handler 針對bossgroup的Channel
//給他添加上我們在server類中添加的handler()里面添加處理器
ChannelHandler handler = config.handler();
if (handler != null) {
//將ServerSocketChannel的Handler添加到pipeline
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//將ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init()
方法的傳參是 channel喝噪,感覺很好奇是被哪里調(diào)用的础嫡,往上找它的調(diào)用鏈:
-AbstractBootstrap.doBind()/Bootstrap.doResolveAndConnect()->
-AbstractBootstrap.initAndRegister()->
-AbstractBootstrap.init(Channel channel)
從調(diào)用鏈上看是不是很熟悉,是我們上一節(jié)分析過了的初始化里面的邏輯酝惧。上一節(jié) 在分析Bootstrap#initAndRegister
方法的時(shí)候驰吓,在這里初始化了 channel 并 綁定到 Reactor 線程上。但是唯獨(dú)漏掉了 init()
方法:
因?yàn)檫@里的東西太多系奉,還是單獨(dú)抽出來一節(jié)來聊才通透檬贰。
可以看到 init()
傳入的是某個(gè) ServerSocketChannel
創(chuàng)建連接時(shí)候創(chuàng)建的 channel,回到init()
方法內(nèi)部:
ChannelPipeline p = channel.pipeline();
從 channel 中獲取ChannelPipeline
對象缺亮,即一個(gè) channel 是跟一個(gè)特定的 ChannelPipeline 綁定的翁涤。這里又引申出另一個(gè)關(guān)鍵點(diǎn):ChannelPipeline桥言,我們喝口茶再討論。
2. ChannelPipeline
ChannelPipeline 是一個(gè)接口葵礼,實(shí)現(xiàn)類有兩個(gè):
DefaultChannelPipeline
EmbeddedChannelPipeline
其中 EmbeddedChannelPipeline 又繼承了 DefaultChannelPipeline号阿,所以我們來看一下 DefaultChannelPipeline的實(shí)現(xiàn),首先看默認(rèn)的構(gòu)造方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
從構(gòu)造方法上看鸳粉,這很顯然是在構(gòu)造一個(gè)鏈表的結(jié)構(gòu)扔涧。不難看出 ChannelPipeline 內(nèi)部應(yīng)該是保存了一個(gè)鏈?zhǔn)降拇鎯壿嫛D擎湵碇斜4娴挠质鞘裁茨亟焯福渴呛芏嗟腃hannelPipeline 對象嗎枯夜?我們看看 tail 是什么類型:
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
很顯然 ChannelPipeline 里面又包裝了一層,即我們插入到 ChannelPipeline 中的 handler 被包裝為 ChannelHandlerContext 對象艰山。
回想到我們自己的 ServerChannelInitializer湖雹,是不是通過 pipeline.addLast()
方法來添加各種 handler 的處理邏輯呢,ChannelPipeline 的類聲明上有很大一段 doc曙搬,不看白不看:
/**
* A list of {@link ChannelHandler}s which handles or intercepts inbound events and outbound operations of a
* {@link Channel}. {@link ChannelPipeline} implements an advanced form of the
* <a >Intercepting Filter</a> pattern
* to give a user full control over how an event is handled and how the {@link ChannelHandler}s in a pipeline
* interact with each other.
*
* <h3>Creation of a pipeline</h3>
* 每一個(gè)channel 都有自己的pipeline摔吏,就是在channel 創(chuàng)建的時(shí)候自動(dòng)創(chuàng)建一個(gè)pipeline
* Each channel has its own pipeline and it is created automatically when a new channel is created.
*ChannelPipeline是一個(gè)處理或者攔截Channel的出棧事件或者入棧操作的ChannelHandler列表,
*ChannelPipeline實(shí)現(xiàn)了一種高效的攔截過濾器模式的形式來讓用戶完全控制一個(gè)事件怎樣處理
*和pipeline的ChannelHandler怎樣和其他ChannelHandler交互纵装。
*
* <h3>How an event flows in a pipeline</h3>
* 下面的圖描述了一個(gè)I/O事件一般是怎樣在ChannelPipeline里的ChannelHandler處理的征讲,
* 一個(gè)I/O事件要么被ChannelInboundHandler處理,要么里邊的事件傳播方法轉(zhuǎn)發(fā)給最近的一個(gè)處理器橡娄,比如
* ChannelHandlerContext#fireChannelRead(Object)和ChannelHandlerContext#write(Object)诗箍。
* The following diagram describes how I/O events are processed by {@link ChannelHandler}s in a {@link ChannelPipeline}
* typically. An I/O event is handled by either a {@link ChannelInboundHandler} or a {@link ChannelOutboundHandler}
* and be forwarded to its closest handler by calling the event propagation methods defined in
* {@link ChannelHandlerContext}, such as {@link ChannelHandlerContext#fireChannelRead(Object)} and
* {@link ChannelHandlerContext#write(Object)}.
*
* <pre>
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
* </pre>
* 通過上圖可以看到入棧的和出棧的處理器互不干擾。
*在左圖瀑踢,一個(gè)入棧事件是從下到上的順序被綁定的處理器處理的,一個(gè)入棧處理器通常處理從I/O線程生成的數(shù)據(jù)才避,
*一個(gè)入棧事件是從下到上的順序被綁定的處理器處理的橱夭,一個(gè)入棧處理器通常處理從I/O線程生成的數(shù)據(jù),
* An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the
* diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the
* diagram. The inbound data is often read from a remote peer via the actual input operation such as
* {@link SocketChannel#read(ByteBuffer)}. If an inbound event goes beyond the top inbound handler, it is discarded
* silently, or logged if it needs your attention.
* <p>
*右圖桑逝,一個(gè)出棧事件會被出棧處理器處理棘劣,一個(gè)出棧處理器生成或者傳輸出棧數(shù)據(jù),比如寫請求楞遏,
*如果一個(gè)出棧事件超出最底層的處理器茬暇,那么他將會被I/O,線程處理,與其關(guān)聯(lián)的SocketChannel#write(ByteBuffer)操作寡喝。
* An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the
* diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests.
* If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the
* {@link Channel}. The I/O thread often performs the actual output operation such as
* {@link SocketChannel#write(ByteBuffer)}.
* <p>
* For example, let us assume that we created the following pipeline:
* <pre>
* {@link ChannelPipeline} p = ...;
* p.addLast("1", new InboundHandlerA());//入棧處理器
* p.addLast("2", new InboundHandlerB());//入棧處理器
* p.addLast("3", new OutboundHandlerA());//出棧處理器
* p.addLast("4", new OutboundHandlerB());//出棧處理器
* p.addLast("5", new InboundOutboundHandlerX());//既是入棧又是出棧處理器
* </pre>
*在前邊提到的例子中糙俗,以Inbound開頭的都是入棧處理器,以O(shè)utbound開頭的都是出棧處理器预鬓。
* In the example above, the class whose name starts with {@code Inbound} means it is an inbound handler.
* The class whose name starts with {@code Outbound} means it is a outbound handler.
* <p>
* In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound.
* When an event goes outbound, the order is 5, 4, 3, 2, 1. On top of this principle, {@link ChannelPipeline} skips
* the evaluation of certain handlers to shorten the stack depth:
* <ul>
* <li>3 and 4 don't implement {@link ChannelInboundHandler}, and therefore the actual evaluation order of an inbound
* event will be: 1, 2, and 5.</li>
* <li>1 and 2 don't implement {@link ChannelOutboundHandler}, and therefore the actual evaluation order of a
* outbound event will be: 5, 4, and 3.</li>
* <li>If 5 implements both {@link ChannelInboundHandler} and {@link ChannelOutboundHandler}, the evaluation order of
* an inbound and a outbound event could be 125 and 543 respectively.</li>
* </ul>
*
* <h3>Forwarding an event to the next handler</h3>
*
* As you might noticed in the diagram shows, a handler has to invoke the event propagation methods in
* {@link ChannelHandlerContext} to forward an event to its next handler. Those methods include:
*一個(gè)處理器調(diào)用ChannelHandlerContext的事件傳播方法轉(zhuǎn)發(fā)給下一個(gè)處理器巧骚,這些方法包括:
* <ul>
* <li>Inbound event propagation methods:
* <ul>
* 入棧事件傳播方法
* <li>{@link ChannelHandlerContext#fireChannelRegistered()}</li>
* <li>{@link ChannelHandlerContext#fireChannelActive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelRead(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelReadComplete()}</li>
* <li>{@link ChannelHandlerContext#fireExceptionCaught(Throwable)}</li>
* <li>{@link ChannelHandlerContext#fireUserEventTriggered(Object)}</li>
* <li>{@link ChannelHandlerContext#fireChannelWritabilityChanged()}</li>
* <li>{@link ChannelHandlerContext#fireChannelInactive()}</li>
* <li>{@link ChannelHandlerContext#fireChannelUnregistered()}</li>
* </ul>
* </li>
* <li>Outbound event propagation methods:
* <ul>
* 出棧事件傳播方法
* <li>{@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#write(Object, ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#flush()}</li>
* <li>{@link ChannelHandlerContext#read()}</li>
* <li>{@link ChannelHandlerContext#disconnect(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#close(ChannelPromise)}</li>
* <li>{@link ChannelHandlerContext#deregister(ChannelPromise)}</li>
* </ul>
* </li>
* </ul>
*
* and the following example shows how the event propagation is usually done:
*
* <pre>
* public class MyInboundHandler extends {@link ChannelInboundHandlerAdapter} {
* {@code @Override}
* public void channelActive({@link ChannelHandlerContext} ctx) {
* System.out.println("Connected!");
* ctx.fireChannelActive();
* }
* }
*
* public clas MyOutboundHandler extends {@link ChannelOutboundHandlerAdapter} {
* {@code @Override}
* public void close({@link ChannelHandlerContext} ctx, {@link ChannelPromise} promise) {
* System.out.println("Closing ..");
* ctx.close(promise);
* }
* }
* </pre>
*
* <h3>Building a pipeline</h3>
* <p>
* A user is supposed to have one or more {@link ChannelHandler}s in a pipeline to receive I/O events (e.g. read) and
* to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers
* in each channel's pipeline, but your mileage may vary depending on the complexity and characteristics of the
* protocol and business logic:
*
* <ol>
* <li>Protocol Decoder - translates binary data (e.g. {@link ByteBuf}) into a Java object.</li>
* <li>Protocol Encoder - translates a Java object into binary data.</li>
* <li>Business Logic Handler - performs the actual business logic (e.g. database access).</li>
* </ol>
*
* and it could be represented as shown in the following example:
*
* <pre>
* static final {@link EventExecutorGroup} group = new {@link DefaultEventExecutorGroup}(16);
* ...
*
* {@link ChannelPipeline} pipeline = ch.pipeline();
*
* pipeline.addLast("decoder", new MyProtocolDecoder());
* pipeline.addLast("encoder", new MyProtocolEncoder());
*
* // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
* // in a different thread than an I/O thread so that the I/O thread is not blocked by
* // a time-consuming task.
* //如果你的業(yè)務(wù)邏輯是完全同步的或者完成的非常快,你不需要添加提供特別的線程池去異步執(zhí)行劈彪,反之竣蹦,
* //你就需要一個(gè)異步的處理邏輯以保證不要阻塞IO的速度
* // If your business logic is fully asynchronous or finished very quickly, you don't
* // need to specify a group.
* pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
* </pre>
*
* <h3>Thread safety</h3>
* <p>
* A {@link ChannelHandler} can be added or removed at any time because a {@link ChannelPipeline} is thread safe.
* For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it
* after the exchange.
*/
上面的注釋解釋了 ChannelPipeline 做了什么事情:這是一個(gè) handler 的 list,handler 用于處理或攔截入站事件和出站事件沧奴,pipeline 實(shí)現(xiàn)了過濾器的高級形式痘括,以便用戶完全控制事件如何處理以及 handler 在 pipeline 中如何交互。
再把視線收回到 init()
方法滔吠,在 init()
中調(diào)用 p.addLast()
方法纲菌,將 ChannelInitializer 插入到鏈表的末端。
接著看addLast()
方法:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查 Channelhandler 的名字是否重復(fù), 如果不重復(fù), 則調(diào)用 newContext() 方法為這個(gè)Handler 創(chuàng)建一個(gè)對應(yīng)的 DefaultChannelHandlerContext 實(shí)例
// 檢查handler是否是@Sharable屠凶,是否已添加
checkMultiplicity(handler);
//為了添加一個(gè) handler 到pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// 如果 registered 為 false,則表示這個(gè)channel還未注冊到EventLoop上.
// 在這種情況下驰后,我們添加一個(gè)Task到PendingHandlerCallback中,
// 等到這個(gè)channel注冊成功之后矗愧,將會調(diào)用立即調(diào)用 ChannelHandler.handlerAdded(...) 方法灶芝,已達(dá)到channel添加的目的
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 檢查是否重復(fù)
private static void checkMultiplicity(ChannelHandler handler) {
// handler是否為ChannelHandlerAdapter類型,不是則不做處理
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 判斷handler是否添加了Sharable注解 && 是否添加過了
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
// 創(chuàng)建新的節(jié)點(diǎn)
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// 調(diào)用DefaultChannelHandlerContext的構(gòu)造函數(shù)
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
在 addLast()
方法中, 首先檢查 Channelhandler 的名字是否重復(fù)唉韭,如果不重復(fù)則調(diào)用 newContext()
方法為這個(gè) Handler 創(chuàng)建一個(gè)對應(yīng)的 DefaultChannelHandlerContext 實(shí)例夜涕,并與之關(guān)聯(lián)起來( Context 中有一個(gè) handler 屬性保存著對應(yīng)的 handler 實(shí)例)。
checkMultiplicity()
方法中使用一個(gè)成員變量 added 標(biāo)識一個(gè) Channel 是否已經(jīng)添加過属愤,如果當(dāng)前要添加的Handler 是非共享并且已經(jīng)添加過那就拋出異常女器,否則標(biāo)識該 Handler 已經(jīng)添加。由此可見一個(gè) Handler 如果是 Sharable 的就可以無限次被添加到 Pipeline 中住诸,我們客戶端代碼如果要讓一個(gè) Handler 被共用驾胆,只需要加一個(gè) @Sharable 標(biāo)注即。
為了添加一個(gè) handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext贱呐。先看一下創(chuàng)建 ChannelHandlerContext
的這一句:
newCtx = newContext(group, filterName(name, handler), handler);
filterName()
這個(gè)方法其實(shí)作用還是挺重要丧诺,即你再 addLast()方法中給當(dāng)前 handler 取的名字,在這里做檢查:
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
// 1.如果傳入的name為空 則生成
// Netty生成的name默認(rèn)為=> 簡單類名#0
// 如果簡單類名#0已存在則將基數(shù)+1 生成name為簡單類名#1 以此遞增
return generateName(handler);
}
// 2.檢查是否有重名 檢查通過則返回
checkDuplicateName(name);
return name;
}
這個(gè)方法用于給handler創(chuàng)建一個(gè)唯一性的名字奄薇。
繼續(xù)跟進(jìn)newContext()
驳阎,在上面的代碼中我們看到新實(shí)例化了一個(gè) newCtx 對象,并將 handler 作為參數(shù)傳遞到構(gòu)造方法中。 那么 DefaultChannelHandlerContext 會做什么呢馁蒂?繼續(xù)看看:
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
上面是 DefaultChannelHandlerContext 的構(gòu)造函數(shù)呵晚,我們看到在調(diào)用父類構(gòu)造器中傳入兩個(gè)方法:
isInbound(handler)
isOutbound(handler)
從方法名稱上大概可以知道:一個(gè)是判斷當(dāng)前 handler 是否是進(jìn)入事件的處理器,另一個(gè)是判斷當(dāng)前 handler 是否是返回事件的處理器沫屡。這兩個(gè) boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 并初始化父類的這兩個(gè)字段:inbound 與 outbound饵隙。
- inbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelInboundHandler 方法;
- outbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelOutboundHandler 方法沮脖。
插播要點(diǎn):
Netty是如何判斷ChannelHandler類型的癞季?
在添加ChannelHandler并創(chuàng)建 ChannelHandlerContext 的時(shí)候劫瞳,通過 instanceof 判斷 handler 是否是 ChannelInboundHandler 和 ChannelOutboundHanler,并將結(jié)果保存到 AbstractChannelHandlerContext 的 inbound 和 outbound 兩個(gè)boolean 變量中绷柒。
在添加ChannelHandler并創(chuàng)建ChannelHandlerContext的時(shí)候志于,通過instanceof判斷handler是否是ChannelInboundHandler和ChannelOutboundHanler,并將結(jié)果保存到AbstractChannelHandlerContext的inbound和outbound兩個(gè)boolean變量中废睦。
而我們在自定義的 ServerChannelInitializer 類中是繼承了 ChannelInitializer
:
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 自己的邏輯Handler
pipeline.addLast("handler", new HwServerHandler());
}
}
Netty 中通過ChannelPipeline
來保證 ChannelHandler 之間的處理順序伺绽。每一個(gè) Channel 對象創(chuàng)建的時(shí)候,都會自動(dòng)創(chuàng)建一個(gè)關(guān)聯(lián)的 ChannelPipeline 對象嗜湃,我們可以通過 io.netty.channel.Channel
對象的pipeline()
方法獲取這個(gè)對象實(shí)例奈应。
ChannelInitializer
又是實(shí)現(xiàn)ChannelInboundHandler
接口,那這就意味著
DefaultChannelHandlerContext 的 inbound = true购披,outbound = false杖挣。
即在初始化 handler 的時(shí)候默認(rèn)所有的 handler 都是入棧處理器。
當(dāng)創(chuàng)建好 Context 之后刚陡,就將這個(gè)Context 插入到 Pipeline 的雙向鏈表中:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
以上就是新增 handler 插入pipeline 的過程惩妇,下面接著看 Pipeline 節(jié)點(diǎn)的刪除功能。
netty 有個(gè)最大的特性之一就是 Handler 可插拔筐乳,做到動(dòng)態(tài)編織 pipeline歌殃,比如在首次建立連接的時(shí)候,需要通過進(jìn)行權(quán)限認(rèn)證蝙云,在認(rèn)證通過之后氓皱,就可以將此 context 移除,下次 pipeline 在傳播事件的時(shí)候就就不會調(diào)用到權(quán)限認(rèn)證處理器勃刨。
下面是權(quán)限認(rèn)證 Handler 最簡單的實(shí)現(xiàn)波材,第一個(gè)數(shù)據(jù)包傳來的是認(rèn)證信息,如果校驗(yàn)通過身隐,就刪除此 Handler廷区,否則,直接關(guān)閉連接:
// 鑒權(quán)Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("AuthHandler has been removed ! ");
}
}
我們來看看 DefaultChannelPipeline 中的 remove 方法:
public class DefaultChannelPipeline implements ChannelPipeline {
...
// 從Pipeline中刪除ChannelHandler
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
...
// 獲取 ChannelHandler 抡医,獲取不到就拋出異常
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
...
// 刪除
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
// ctx不能為heand與tail
assert ctx != head && ctx != tail;
synchronized (this) {
// 從pipeline中刪除ChannelHandlerContext節(jié)點(diǎn)
remove0(ctx);
// 如果為false躲因,則表明channel還沒有注冊到eventloop上
// 在刪除這種場景下早敬,我們先添加一個(gè)Task忌傻,一旦channel注冊成功就會調(diào)用這個(gè)Task,這個(gè)Task就會立即調(diào)用ChannelHandler.handlerRemoved(...)方法搞监,來從pipeline中刪除context水孩。
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
return ctx;
}
...
// 刪除節(jié)點(diǎn) ChannelHandlerContext
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
...
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
// 回調(diào) handlerRemoved 方法
// 也就是我們前面例子 AuthHandler 中的 handlerRemoved() 方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 設(shè)置為ctx 狀態(tài)為 REMOVE_COMPLETE
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
...
}
好了, 刪除的邏輯就分析到這里了琐驴。通過以上的分析俘种,我們先總結(jié)一下:能發(fā)現(xiàn)在 handler 初始化的時(shí)候有三個(gè)關(guān)鍵的對象:
- ChannelHandler
- ChannelPipeline
- ChannelHandlerContext
他們之間的關(guān)系是:ChannelHandler 通過 ChannelPipeline 來封裝一個(gè)雙向鏈表維持各個(gè) handler 關(guān)系秤标。在 ChannelPipeline 中 不能包裝 ChannelPipeline 對象,原因是處理的邏輯都是有上下文關(guān)聯(lián)的宙刘,所以封裝了 ChannelHandlerContext 對象來包裝 handler 苍姜,維護(hù)處理邏輯以及他們之間的上下文關(guān)系。
另外悬包,ChannelHandler 又分為兩個(gè)大類的事件:
- ChannelInboundHandler:處理輸入數(shù)據(jù)和所有類型的狀態(tài)變化
- ChannelOutboundHandler:處理輸出數(shù)據(jù)衙猪,可以攔截所有操作
除了分開的 輸入 和 輸出 事件以外, Netty 還提供了一個(gè) 支持同時(shí)處理輸入和輸出事件的全能處理器:ChannelDuplexHandler布近,ChannelDuplexHandler 則同時(shí)實(shí)現(xiàn)了 ChannelInboundHandler 和 ChannelOutboundHandler 接口垫释。如果一個(gè)所需的 ChannelHandler 既要處理入站事件又要處理出站事件,推薦繼承此類撑瞧。
從 ChannelPipeline 類的 doc 文檔中給的注釋看:
* <pre>
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
inbound 事件和 outbound 事件的流向是不一樣的棵譬,inbound 的傳遞方式是通過調(diào)用相應(yīng)的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過調(diào)用 ChannelHandlerContext.OUT_EVT() 方法预伺。
inbound 事件
ChannelInboundHandler
中有如下方法:
ChannelInboundHandler
中的方法具體說明見下表:
方法名 | 方法說明 |
---|---|
channelInactive | 激活事件订咸,綁定端口成功后調(diào)用pipeline.fireChannelActive()
|
channelInactive | 非激活事件,連接關(guān)閉后調(diào)用pipeline.fireChannelInactive()
|
channelRead | 讀事件扭屁,channel 有數(shù)據(jù)時(shí)調(diào)用pipeline.fireChannelRead()
|
channelReadComplete | 讀完事件算谈,channel 讀完之后調(diào)用pipeline.fireChannelReadComplete()
|
channelRegistered | 注冊事件,channel 注冊到 EventLoop 上后調(diào)用料滥,例如服務(wù)崗啟動(dòng)時(shí)pipeline.fireChannelRegistered()
|
channelUnregistered | 注銷事件然眼,channel 從 EventLoop 上注銷后調(diào)用,例如關(guān)閉連接成功后pipeline.fireChannelUnregistered()
|
channelWritabilityChanged | 可寫狀態(tài)變更事件葵腹,當(dāng)一個(gè) Channel 的可寫的狀態(tài)發(fā)生改變的時(shí)候執(zhí)行高每,可以保證寫的操作不要太快,防止 OOM践宴,pipeline.fireChannelWritabilityChanged() |
exceptionCaught | 異常事件 說明:我們可以看出鲸匿,Inbound 事件都是由 I/O 線程觸發(fā),用戶實(shí)現(xiàn)部分關(guān)注的事件被動(dòng)調(diào)用 |
userEventTriggered | 用戶事件觸發(fā)阻肩,例如心跳檢測ctx.fireUserEventTriggered(evt)
|
我們可以看出带欢,Inbound 事件都是由I/O線程觸發(fā),Netty 為了更好的處理 channel 中的數(shù)據(jù),給 JDK 原生的channel 添加了 pipeline 組件烤惊,Netty 會把原生 JDK 的 channel 中的數(shù)據(jù)導(dǎo)向這個(gè) pipeline乔煞,從 pipeline 中的 header 開始往下傳播,用戶對這個(gè)過程擁有百分百的控制權(quán)柒室,可以把數(shù)據(jù)拿出來處理也可以往下傳播渡贾。一直傳播到tail節(jié)點(diǎn),tail節(jié)點(diǎn)會進(jìn)行回收雄右。如果在傳播的過程中最終沒到尾節(jié)點(diǎn)自己也沒回收就會面臨內(nèi)存泄露的問題空骚。
所以對于 inbound 事件纺讲,操作者是有 100% 的主動(dòng)權(quán)來控制應(yīng)該做什么,在什么時(shí)候做什么囤屹。下面以channelRead()
事件的傳播為例熬甚,來分析 inbound 事件的傳播。
我們在客戶端新增3個(gè) handler 用于處理事件:
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
ctx.fireChannelRead(msg);
}
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().pipeline().fireChannelRead("hello world"); // 1
// ctx.fireChannelRead("hello world"); // 2
}
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
此刻客戶端處理事件的順序?yàn)椋?/p>
在新連接接入時(shí)肋坚,會先回調(diào) channelActive()
方法则涯,此時(shí) InBoundHandlerB 的 channelActive()
方法得到執(zhí)行,觸發(fā)客戶端 pipeline.fireChannelRead()
方法冲簿,將 channlRead 事件傳播至 pipeline粟判。在實(shí)際工作中一般是由 NioEventLoop 輪詢到讀IO事件,并觸發(fā)NioByteUnsafe.read()操作峦剔。
這里為了分析方便档礁,使用 InBoundHandlerB 的 channelActive()
方法模擬觸發(fā)客戶端 channel 讀取到數(shù)據(jù)并傳播至 pipeline 的邏輯,并分為兩種情況分析:
-
ctx.channel().pipeline().fireChannelRead("hello world");
: 調(diào)用pipeline#fireChannelRead()
方法傳播事件吝沫; -
ctx.fireChannelRead("hello world");
: 調(diào)用ChannelHandlerContext#fireChannelRead()
方法傳播事件呻澜。
分析兩種觸發(fā)方式的區(qū)別。
-
ctx.channel().pipeline().fireChannelRead("hello world");
: 調(diào)用pipeline#fireChannelRead()
方法傳播事件惨险。
// DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// HeadContext
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
DefaultChannelPipeline.fireChannelRead()
方法首先調(diào)用到 HeadContext.channelRead()
方法:
// HeadContext
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
HeadContext.channelRead()
方法將事件往后傳播:
// HeadContext
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
// 遍歷鏈表
AbstractChannelHandlerContext ctx = this;
do {
// 往后查找inbound handler
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
此時(shí)找到 InBoundHandlerA羹幸,并調(diào)用invokeChannelRead()
方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
就這樣,InBoundHandlerA 的 channelRead()
方法就會回調(diào)到辫愉。類似 InBoundHandlerC栅受、InBoundHandlerB的 channelRead()
方法也會回調(diào)到。最終 channelRead()
事件到達(dá) TailContext:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
根據(jù)上面的分析可知恭朗,通過 pipleline 觸發(fā) inbound 事件傳播時(shí)屏镊,從 HeadContext 開始傳播。對于 inbound 事件痰腮,會按照 ChannelInboundHandler 添加的順序處理該事件而芥,HeadContext 首先處理該事件,然后依次傳遞到 pipeline 中的 ChannelInboundHandler 中膀值。
-
ctx.fireChannelRead("hello world");
: 調(diào)用ChannelHandlerContext#fireChannelRead()
方法傳播事件棍丐。
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
通過調(diào)用 ChannelHandlerContext#fireChannelRead()
方法傳播 channelRead 事件時(shí),直接查找到當(dāng)前節(jié)點(diǎn)的下一個(gè) inbound
節(jié)點(diǎn)沧踏,將事件傳播至該節(jié)點(diǎn)歌逢,不會從 HeadContext 開始傳遞。
以上就是 inbound 事件的大概傳播過程悦冀,我們要注意一點(diǎn):inbound 事件是按照 addLast()
的添加順序依次執(zhí)行的趋翻。下面接著看 outbound 事件睛琳。
outbound事件
ChannelOutboundHandler
中有如下方法:
ChannelOutboundHandler
中的事件看起來見名知意:
方法名 | 方法說明 |
---|---|
bind | 綁定端口 |
close | 關(guān)閉 channel |
connect | 用于客戶端盒蟆,連接一個(gè)遠(yuǎn)程機(jī)器 |
disconnect | 關(guān)閉遠(yuǎn)程連接 |
deregister | 執(zhí)行斷開連接 disconnect 操作后調(diào)用踏烙,將 channel 從 EventLoop 中注銷 |
flush | 將通道排隊(duì)的數(shù)據(jù)刷新到遠(yuǎn)程機(jī)器上 |
read | 用于新接入連接時(shí),注冊成功多路復(fù)用器上后历等,修改監(jiān)聽為 OP_READ 操作位 |
write | 向通道寫數(shù)據(jù) |
從 ChannelOutboundHandler 接口的定義可以看出讨惩,outbound 事件包括端口綁定 bind、連接 connect寒屯、斷開連接 disconnect荐捻、關(guān)閉連接 close、取消 channel 在 EventLoop 的注冊寡夹、讀寫數(shù)據(jù)处面、刷新數(shù)據(jù)等。這些操作一般都是由用戶主動(dòng)觸發(fā)的菩掏,這與 inbound 事件(如 channelRead)被動(dòng)觸發(fā)的情況不同魂角。
下面以write事件的傳播為例,來分析outbound事件的傳播細(xì)節(jié)智绸。在服務(wù)端新增3個(gè)事件處理器:
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
ctx.write(msg, promise);
}
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
public void handlerAdded(final ChannelHandlerContext ctx) {
// 定時(shí)任務(wù)野揪。模擬用戶寫操作
ctx.executor().schedule(() -> {
ctx.channel().write("hello world"); // 1
// ctx.write("hello world"); // 2
}, 3, TimeUnit.SECONDS);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerC: " + msg);
ctx.write(msg, promise);
}
}
此時(shí)客戶端 channel pipeline 的結(jié)構(gòu)如下圖:
還記得我們上面看 ChannelPipeline 的 doc 分析, 入站事件是從前到后的順序執(zhí)行瞧栗,出站事件是從后往前執(zhí)行的嗎斯稳?按照這邏輯,可以看到在新連接接入時(shí)迹恐,會先回調(diào) OutBoundHandlerB#handlerAdded()
方法挣惰,該方法會調(diào)度一個(gè)定時(shí)任務(wù),模擬用戶觸發(fā)的寫操作殴边,將 write 事件傳播至 pipeline通熄。
這里分兩種情況進(jìn)行分析,介紹兩種觸發(fā)方式的區(qū)別:
-
ctx.channel().write("hello world");
: 調(diào)用channel(也即pipeline)#write()
方法傳播事件找都; -
ctx.write("hello world");
: 調(diào)用ChannelHandlerContext#write()
方法傳播事件唇辨。
-
ctx.channel().write("hello world");
: 調(diào)用channel(也即pipeline)#write()
方法傳播事件。
// DefaultChannelPipeline
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
// TailContext
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
// TailContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 這里
write(msg, false, promise);
return promise;
}
// TailContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 找到下一個(gè)outbound handler
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
執(zhí)行 ctx.channel().write("hello world");
時(shí)能耻,會調(diào)用到 TailContxt.write()
方法赏枚。TailContxt.write()
方法中首先找出下一個(gè) outbound handler:
// TailContext
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
// 反向遍歷
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
findContextOutbound()
方法通過鏈表反向遍歷的方式查找下一個(gè) outbound handler,這里是找到了OutBoundHandlerB晓猛,并調(diào)用 OutBoundHandlerB.invokeWrite()
方法饿幅。
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
// OutBoundHandlerB
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
在調(diào)用完 OutBoundHandlerB.write()
方法后,通過ctx.write(msg, promise);
繼續(xù)傳播事件:
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
跟上面類似的邏輯戒职,調(diào)用ctx.write(msg, promise);
時(shí)直接查找下一個(gè) outbound handler栗恩,這里是OutBoundHandlerC。接下來是通過next.invokeWrite(m, promise);
調(diào)用 OutBoundHandlerC.write()
方法洪燥,與上面相同磕秤。就這樣乳乌,write 事件將傳播至 HeadContext。
// HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
HeadContext 將調(diào)用 NioByteUnsafe.write()
方法市咆,最終處理這個(gè)寫出的數(shù)據(jù):
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// mas轉(zhuǎn)換汉操,比如堆內(nèi)存轉(zhuǎn)為直接內(nèi)存
msg = filterOutboundMessage(msg);
// 計(jì)算消息的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 消息添加到ChannelOutboundBuffer
outboundBuffer.addMessage(msg, size, promise);
}
根據(jù)上面的分析可知,通過 channel(也即pipleline)觸發(fā) outbound 事件傳播時(shí)蒙兰,從 TailContext 開始傳播磷瘤。對于 outbound 事件,會按照 ChannelOutboundHandler 添加的順序逆序處理該事件搜变,TailContext 由于是 inbound 類型的 ChannelHandler采缚,它直接將 outbound 事件傳播至下一個(gè) outbound 節(jié)點(diǎn),然后逐漸傳遞到 pipeline 中的 HeadContext 節(jié)點(diǎn)挠他,最終事件由 HeadContext 節(jié)點(diǎn)處理仰担。
-
ctx.write("hello world");
: 調(diào)用ChannelHandlerContext#write()
方法傳播事件。
// OutBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// ... 省略
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 直接查找下一個(gè)outbound handler
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
調(diào)用 ChannelHandlerContext#write()
方法傳播 outbound 事件時(shí)绩社,直接從當(dāng)前節(jié)點(diǎn)開始反向遍歷 context 鏈表摔蓝,查找下一個(gè) outbound handler,并調(diào)用其 write 方法愉耙,然后將 write 事件傳播至 HeadContext贮尉。outbound事件傳播我們就先分析到這里。
記住一個(gè)要點(diǎn)就是:Outbound類型的事件是從鏈表的 tail 開始傳播的,所以執(zhí)行的順序和我們的添加進(jìn)去的順序相反朴沿。
異常事件的傳播
上面都是正常的事件猜谚,那如果是發(fā)生異常的情況,異常會以什么樣的方式傳播下去呢赌渣?我們來寫個(gè)demo魏铅。
在服務(wù)端添加如下的 handler:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
Handler 如下:
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InBoundHandlerA.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 拋出異常
throw new RunTimeException("from InBoundHandlerB");
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InBoundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InBoundHandlerC.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutBoundHandlerA.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutBoundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutBoundHandlerC.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
public class BusinessException extends Exception {
public BusinessException(String message) {
super(message);
}
}
從 InBoundHandlerB 的定義可以看出,在接收到 channelRead 事件時(shí)將拋出 RunTimeException坚芜,這種情況模擬了 inbound 事件在 pipeline 傳播以及處理過程中發(fā)生的異常览芳。下面先來分析 inbound 事件傳播過程中發(fā)生異常時(shí),異常事件傳播的細(xì)節(jié)鸿竖。
1.inbound事件傳播過程中發(fā)生異常
假設(shè) channel 讀取到了一定數(shù)據(jù)沧竟,并回調(diào)了InBoundHandlerB.channelRead()
方法,此時(shí)拋出RunTimeException 異常:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new BusinessException("from InBoundHandlerB");
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// handler(): InBoundHandlerB
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
此時(shí)將進(jìn)入notifyHandlerException(t);
:
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaught(cause);
}
notifyHandlerException()
方法直接調(diào)用invokeExceptionCaught(cause);
傳播異常事件:
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
// ...
}
} else {
fireExceptionCaught(cause);
}
}
在 inbound 事件傳播過程中發(fā)生異常時(shí)缚忧,首先調(diào)用發(fā)生異常所在 handler 的 exceptionCaught 方法悟泵,即 InBoundHandlerB.exceptionCaught()
:
// InBoundHandlerB
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InBoundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
然后調(diào)用ctx.fireExceptionCaught(cause);
繼續(xù)傳播異常事件:
// InBoundHandlerB對應(yīng)的ChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
// next: InBoundHandlerC
// 直接調(diào)用next節(jié)點(diǎn)的exceptionCaught方法
invokeExceptionCaught(next, cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause); // InBoundHandlerC
} catch (Throwable error) {
// ..
}
} else {
fireExceptionCaught(cause);
}
}
可以看到,在異常發(fā)生節(jié)點(diǎn) InBoundHandlerB 繼續(xù)傳播事件時(shí)闪水,是直接調(diào)用了 InBoundHandlerB 對應(yīng) context 節(jié)點(diǎn)的 next 節(jié)點(diǎn)InBoundHandlerC#exceptionCaught(
) 方法糕非,而不管下一個(gè)節(jié)點(diǎn)是 inbound 還是 outbound 類型。異常事件按順序經(jīng)過 InBoundHandlerB、InBoundHandlerC朽肥、OutBoundHandlerA禁筏、OutBoundHandlerB、OutBoundHandlerC鞠呈,最終到達(dá) TailContext:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
如果 TailContext 之前的 handler 都未處理該異常事件,在 TailContext 將以 warn 日志的方式記錄該異常信息右钾,并釋放內(nèi)存蚁吝。
2.outbound事件傳播過程中發(fā)生異常
下面以channel.writeAndFlush()
事件的傳播為例,分析 outbound 事件傳播過程中發(fā)生異常時(shí)舀射,異常事件的傳播細(xì)節(jié)窘茁。
// AbstractChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 某個(gè)outbound handler
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
// 異常捕獲
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
假設(shè) writeAndFlush 事件傳播至 context 鏈表中的某個(gè)節(jié)點(diǎn),因此將調(diào)用以上的invokeWriteAndFlush()
方法脆烟。繼續(xù)看 invokeWrite0(msg, promise);
中的邏輯:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise)) {
// promise設(shè)置為失敗
PromiseNotificationUtil.tryFailure(promise, cause, logger);
}
}
可見,如果在 ChannelOutboundHandler.write()
方法中發(fā)生異常,只是調(diào)用notifyOutboundHandlerException()
方法倔矾,將promise
設(shè)置為失敗狀態(tài)浮还,不拋出任何異常。
再來看invokeFlush0();
的邏輯:
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
可見拜鹤,如果在ChannelOutboundHandler.flush()
方法中發(fā)生異常框冀,將調(diào)用notifyHandlerException()
方法:
private void notifyHandlerException(Throwable cause) {
// ...省略
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
// 省略...
}
} else {
fireExceptionCaught(cause);
}
}
同樣的,會觸發(fā)異常事件從當(dāng)前節(jié)點(diǎn)向后傳播敏簿,最后到達(dá)TailContext明也。
插播要點(diǎn):
對于ChannelHandler的添加應(yīng)該遵循什么樣的順序?
對于 inbound 事件的傳播惯裕,事件的處理順序與 ChannelInboundHandler 的添加順序相同温数;
對于 outbound 事件的傳播,事件的處理順序與 ChannelOutboundHandler 的添加順序相反蜻势。對于異常事件的傳播撑刺,事件的處理順序與 ChannelHandler 的添加順序相同,與 inbound握玛、outbound 無關(guān)猜煮。
用戶手動(dòng)觸發(fā)事件傳播,不同的觸發(fā)方式有什么樣的區(qū)別败许?
-
ctx.channel.xxx()
: 對于 inbound 事件王带,從 pipeline 頭部節(jié)點(diǎn) head 開始傳播;對于 outbound 事件市殷,從 pipeline尾 部節(jié)點(diǎn) tail 開始傳播愕撰。 -
ctx.xxx()
: 對于 inbound 事件,從當(dāng)前節(jié)點(diǎn)下一節(jié)點(diǎn)開始傳播(指向尾部tail);對于 outbound 事件搞挣,從當(dāng)前節(jié)點(diǎn)下一節(jié)點(diǎn)開始傳播(指向頭部head)带迟。
xxx()
方法指 fireChannelRead、write 等方法囱桨。
無論我們是調(diào)用 inbound 中的方法仓犬,還是 outbound 中的方法,當(dāng)你在某一個(gè) handler 中捕獲了一個(gè)事件舍肠,如果你還想繼續(xù)讓該事件傳下去搀继,那么你就應(yīng)該在處理邏輯之后調(diào)用 ChannelHandlerContext 繼續(xù)將事件傳遞下去。
這里再重新畫一張完整的圖翠语,展示EventLoop叽躯,Channel,以及 handler 的關(guān)系:
3. 自定義 handler 如何實(shí)現(xiàn)
Netty 自身提供了一些 handler肌括,包括編解碼處理点骑,心跳檢測,超時(shí)檢測等等谍夭。那如果是我們自定義的處理器改如何實(shí)現(xiàn)呢黑滴?
拿 inbound 事件來舉例,Netty 提供兩種方式供我們實(shí)現(xiàn) ChannelHandler
紧索,一種是繼承 ChannelInboundHandlerAdapter
跷跪,一種是繼承 SimpleChannelInboundHandler
。
// ChannelInboundHandlerAdapter
public class ServerHandler1 extends ChannelInboundHandlerAdapter {
// todo
}
// SimpleChannelInboundHandler
public class ServerHandler2 extends SimpleChannelInboundHandler<Object> {
// todo
}
這兩個(gè)類從名字上可知道大概的區(qū)別齐板,ChannelInboundHandlerAdapter
相當(dāng)于是一個(gè)簡寫版吵瞻,SimpleChannelInboundHandler
是完整版,各取所需甘磨。SimpleChannelInboundHandler
是一個(gè)泛型類橡羞,而 ChannelInboundHandlerAdapter
并不是一個(gè)泛型類。這里的泛型主要用于消息格式轉(zhuǎn)換使用济舆。在使用 ChannelInboundHandlerAdapter
進(jìn)行操作時(shí)卿泽,我們需要自己轉(zhuǎn)換消息格式,而 SimpleChannelInboundHandler
就是 Netty 幫我們轉(zhuǎn)換消息格式滋觉,只要我們在泛型中聲明轉(zhuǎn)換的類型签夭,并且這個(gè)類型支持被轉(zhuǎn)換,那么我們就可以使用這個(gè)轉(zhuǎn)換后的消息進(jìn)行操作椎侠。
本節(jié)我們就到這里打住第租,沒說到的也不說了,口干舌燥我纪!自己有點(diǎn)暈菜了慎宾。代碼太繞丐吓,大家努力學(xué)習(xí)。