原文鏈接:https://wangwei.one/posts/netty-pipeline-source-analyse-1.html
前面,我們分析了Netty EventLoop的 創(chuàng)建 與 啟動(dòng) 原理,接下里我們來(lái)分析Netty中另外兩個(gè)重要組件—— ChannelHandler 與 Pipeline张惹。Netty中I/O事件的傳播機(jī)制均由它負(fù)責(zé)困介,下面我們來(lái)看看它是如何實(shí)現(xiàn)的。
Netty版本:4.1.30
我們前面在講 Channel創(chuàng)建 時(shí)敛滋,在AbstractChannel的構(gòu)造函數(shù)中田篇, 一筆帶過(guò)地提到了Pipeline替废,現(xiàn)在我們來(lái)深入分析一下它的原理。
概述
Netty channel lifecycle
前面斯辰,我們?cè)诜治?Netty channel 源碼時(shí)舶担,分析了Channel的創(chuàng)建坡疼、初始化彬呻、注冊(cè)、綁定過(guò)程柄瑰。在Netty中闸氮,channel的生命周期如下所示:
- ChannelRegistered:Channel注冊(cè)到了EventLoop上
- ChannelActive:Channel激活,連接到了遠(yuǎn)程某一個(gè)節(jié)點(diǎn)上教沾,可以收發(fā)數(shù)據(jù)了
- ChannelInactive:斷開連接
- ChannelUnregistered:Channel從EventLoop上取消注冊(cè)
Netty channelHandler
Channel 每一次狀態(tài)的變化蒲跨,都會(huì)產(chǎn)生一個(gè)事件,調(diào)用 ChannelHandler 中對(duì)應(yīng)的方法進(jìn)行處理授翻,我們看下 ChannelHandler的UML或悲,其中最為重要的兩個(gè)ChannelHandler:
- ChannelInboundHandler:處理入站數(shù)據(jù)以及channel的各種狀態(tài)變化
- ChannelOutboundHandler:處理出站數(shù)據(jù)并允許攔截所有操作
Netty ChannelPipeline
前面我們?cè)诜治鯟hannel創(chuàng)建過(guò)程時(shí)孙咪,每一個(gè)新創(chuàng)建的Channel都將會(huì)被分配一個(gè)新的ChannelPipeline。ChannelPipeline是一個(gè)攔截流經(jīng)Channel的入站和出站事件的ChannelHandler實(shí)例鏈巡语,如圖所示:
一個(gè) Channel 包含了一個(gè) ChannelPipeline翎蹈,ChannelPipeline內(nèi)部是一個(gè)雙向的鏈表結(jié)構(gòu),內(nèi)部由一個(gè)個(gè)的ChannelHandlerContext節(jié)點(diǎn)組成男公,ChannelPipeline有頭尾兩個(gè)固定的節(jié)點(diǎn)HeadContext與TailContext荤堪。用戶自定的ChannelHandler就是由ChannelHandlerContext包裝成Pipeline的節(jié)點(diǎn),參與Channel整個(gè)生命周期中所觸發(fā)的入站事件與出站事件以及相應(yīng)數(shù)據(jù)流的攔截處理枢赔。
根據(jù)事件的起源澄阳,事件將會(huì)被ChannelInboundHandler(入站處理器)或者ChannelOutboundHandler(出站處理器)處理。隨后踏拜,通過(guò)調(diào)用ChannelHandlerContext實(shí)現(xiàn)碎赢,它將被轉(zhuǎn)發(fā)給同一超類型的下一個(gè)ChannelHandler,如圖所示:
Pipeline UML
我們先來(lái)看下 ChannelPipeline 以及 ChannelHandlerContext 的類圖結(jié)構(gòu)速梗,它們都實(shí)現(xiàn)了ChannelInboundInvoker與ChannelOutboundInvoker接口揩抡。
Pipeline初始化
AbstractChannel構(gòu)造函數(shù)如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 創(chuàng)建默認(rèn)Pipeline
pipeline = newChannelPipeline();
}
// 創(chuàng)建默認(rèn)Pipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline 構(gòu)造函數(shù)如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 設(shè)置尾部節(jié)點(diǎn)
tail = new TailContext(this);
// 設(shè)置頭部節(jié)點(diǎn)
head = new HeadContext(this);
// 將tail與head串聯(lián)起來(lái)
head.next = tail;
tail.prev = head;
}
我們可以看到Pipeline其實(shí)是一個(gè)雙向鏈表的結(jié)構(gòu),剛剛初始化的時(shí)候镀琉,Pipeline(管道)中只有兩個(gè)節(jié)點(diǎn)峦嗤,如圖:
接下來(lái)我們看看組成Pipeline節(jié)點(diǎn)的對(duì)象—— ChannelHandlerContext。
ChannelHandlerContext
ChannelHandlerContext 實(shí)現(xiàn)了AttributeMap屋摔、ChannelInboundInvoker烁设、ChannelOutboundInvoker接口。Pipeline中的事件傳播钓试,都是由ChannelHandlerContext負(fù)責(zé)装黑,將發(fā)生的事件從一個(gè)節(jié)點(diǎn)傳到下一個(gè)節(jié)點(diǎn)。
ChannelHandlerContext接口
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
// 返回ChannelHandlerContext中綁定的Channel
Channel channel();
// 返回專用于執(zhí)行任務(wù)的 EventExecutor
EventExecutor executor();
// 返回ChannelHandlerContext的唯一名稱弓熏。該名字將在ChannelHandler被添加到ChannelPipeline時(shí)會(huì)被用到恋谭,從ChannelPipeline中訪問(wèn)注冊(cè)的ChannelHandler時(shí),也會(huì)被用到挽鞠。
String name();
// 返回ChannelHandlerContext中綁定的ChannelHandler
ChannelHandler handler();
// 屬于這個(gè)ChannelHandlerContext的ChannelHandler從ChannelPipeline移除了疚颊,返回true
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
// 返回分配的ChannelPipeline
ChannelPipeline pipeline();
// 返回用于分配ByteBuf的ByteBufAllocator
ByteBufAllocator alloc();
}
AttributeMap接口
實(shí)現(xiàn) AttributeMap 接口,表示ChannelHandlerContext節(jié)點(diǎn)可以存儲(chǔ)自定義的屬性信认。
// 屬性Map接口
public interface AttributeMap {
// 通過(guò)Key獲取屬性
<T> Attribute<T> attr(AttributeKey<T> key);
// 判斷屬性是否存在
<T> boolean hasAttr(AttributeKey<T> key);
}
ChannelInboundInvoker接口
實(shí)現(xiàn)ChannelInboundInvoker接口材义,表示節(jié)點(diǎn)可以用于傳播入站相關(guān)的事件。
public interface ChannelInboundInvoker {
// 當(dāng)Channel注冊(cè)到EventLoop上時(shí)
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRegistered();
// 當(dāng)Channel從EventLoop上取消注冊(cè)
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelUnregistered();
// 當(dāng)Channel處理激活狀態(tài)嫁赏,意味著連接已經(jīng)建立
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelActive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelActive();
// 當(dāng)Channel處理失效狀態(tài)其掂,意味著連接已經(jīng)斷開
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelInactive();
// 在pipeline中某個(gè)一個(gè)入站(inbound)操作出現(xiàn)了異常
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的exceptionCaught(ChannelHandlerContext)方法
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
// 收到用戶自定義的事件
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的userEventTriggered(ChannelHandlerContext)方法
ChannelInboundInvoker fireUserEventTriggered(Object event);
// Channel接收到了消息
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelRead(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRead(Object msg);
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelReadComplete();
// 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelWritabilityChanged(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker接口
實(shí)現(xiàn)ChannelOutboundInvoker接口,意味著節(jié)點(diǎn)可以用來(lái)處理出站相關(guān)的事件潦蝇。
public interface ChannelOutboundInvoker {
// 將Channel綁定到一個(gè)本地地址款熬,這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
// 將Channel連接到一個(gè)遠(yuǎn)程地址深寥,這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
// 將Channel斷開連接。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutbound- Handler的disconnect(ChannelHandlerContext, Channel Promise)方法
ChannelFuture disconnect();
ChannelFuture disconnect(ChannelPromise promise);
// 將Channel關(guān)閉贤牛。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutbound- Handler的close(ChannelHandlerContext, ChannelPromise)方法
ChannelFuture close();
ChannelFuture close(ChannelPromise promise);
// 將Channel從它先前所分配的EventExecutor(即EventLoop)中注銷翩迈。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的deregister (ChannelHandlerContext, ChannelPromise)方法
ChannelFuture deregister();
ChannelFuture deregister(ChannelPromise promise);
// 請(qǐng)求從Channel中讀取更多的數(shù)據(jù)。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的read(ChannelHandlerContext)方法
ChannelOutboundInvoker read();
// 將消息寫入Channel盔夜。這將調(diào)用ChannelPipeline中的下一個(gè)Channel- OutboundHandler的write(ChannelHandlerContext, Object msg, Channel- Promise)方法负饲。注意:這并不會(huì)將消息寫入底層的Socket,而只會(huì)將它放入隊(duì)列中喂链。要將它寫入Socket返十,需要調(diào)用flush()或者writeAndFlush()方法
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
// 沖刷Channel所有掛起的寫入。這將調(diào)用ChannelPipeline中的下一個(gè)Channel- OutboundHandler的flush(ChannelHandlerContext)方法
ChannelOutboundInvoker flush();
// 這是一個(gè)先調(diào)用write()方法再接著調(diào)用flush()方法的便利方法
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
}
TailContext & HeadContext
接下來(lái)椭微,我們看看Pipeline中的頭部與尾部節(jié)點(diǎn)洞坑。
TailContext節(jié)點(diǎn)
TailContext是尾部節(jié)點(diǎn),inbound類型蝇率,主要處理Pipeline中數(shù)據(jù)流的收尾工作迟杂。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 調(diào)用AbstractChannelHandlerContext構(gòu)造器
// TailContext是一個(gè)inbound(入站)節(jié)點(diǎn)
super(pipeline, null, TAIL_NAME, true, false);
// 設(shè)置添加完成
setAddComplete();
}
// 返回Handler,就是它自身
@Override
public ChannelHandler handler() {
return this;
}
...
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
...
}
// 如果pipeline中有異常沒做處理本慕,最終會(huì)由TailContext打贏一個(gè)警告日志
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
// 釋放對(duì)象
ReferenceCountUtil.release(cause);
}
}
// 如果pipeline中有read消息沒有處理排拷,最終會(huì)由TailContext打贏一個(gè)警告日志
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
// 設(shè)置 ChannelHandlerContext 狀態(tài)為添加完成,狀態(tài)=2
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
AbstractChannelHandlerContext
AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象實(shí)現(xiàn):
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
// 下一個(gè)節(jié)點(diǎn)
volatile AbstractChannelHandlerContext next;
// 上一個(gè)節(jié)點(diǎn)
volatile AbstractChannelHandlerContext prev;
// 是否為inBound類型
private final boolean inbound;
// 是否為outbound類型
private final boolean outbound;
// 綁定的默認(rèn)pipeline
private final DefaultChannelPipeline pipeline;
// 節(jié)點(diǎn)名
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
...
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
// 設(shè)置HandlerContext名稱
this.name = ObjectUtil.checkNotNull(name, "name");
// 綁定pipeline
this.pipeline = pipeline;
// 綁定executor(這里為null)
this.executor = executor;
// 如果節(jié)點(diǎn)為inbound類型就設(shè)置為true
this.inbound = inbound;
// 如果節(jié)點(diǎn)為outbound類型就設(shè)置為true
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
...
}
DefaultChannelHandlerContext
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 調(diào)用 AbstractChannelHandlerContext 構(gòu)造函數(shù)
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
// 是否為inBound類型
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
// 是否為outBound類型
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
HeadContext
HeadContext是頭部節(jié)點(diǎn)锅尘,outbound類型监氢,用于傳播事件和進(jìn)行一些底層socket操作。
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
// 調(diào)用AbstractChannelHandlerContext構(gòu)造器
// HeadContext是一個(gè)outbound(出站)節(jié)點(diǎn)
super(pipeline, null, HEAD_NAME, false, true);
// 設(shè)置Unsafe對(duì)象
unsafe = pipeline.channel().unsafe();
// 設(shè)置添加完成
setAddComplete();
}
// 返回ChannelHandler藤违,就只它自身
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// 調(diào)用 unsafe 進(jìn)行bind操作
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 connect 操作
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 disconnect 操作
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 close 操作
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 deregister 操作
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
// 調(diào)用 unsafe 進(jìn)行 read 操作
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 write 操作
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 調(diào)用 unsafe 進(jìn)行 flush 操作
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 傳播ExceptionCaught事件
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
// 傳播channelRegistered事件
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// 傳播channelUnregistered事件
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelActive 事件
ctx.fireChannelActive();
// 在 https://wangwei.one/posts/netty-channel-source-analyse.html 中分析過(guò)了
// 主要是在channel激活之后浪腐,向底層的selector注冊(cè)一個(gè)SelectionKey.OP_ACCEPT監(jiān)聽事件
// 這樣channel在連接之后,就可以監(jiān)聽到一個(gè)read事件
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelInactive 事件
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 傳播 channelRead 事件
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelReadComplete 事件
ctx.fireChannelReadComplete();
//
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 傳播 userEventTriggered 事件
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 傳播 channelWritabilityChanged 事件
ctx.fireChannelWritabilityChanged();
}
}
Pipeline 節(jié)點(diǎn)添加
上面我們分析了Pipeline的基本結(jié)構(gòu)顿乒,接下來(lái)我們看看Pipeline添加節(jié)點(diǎn)(也就是Handler處理器)的過(guò)程议街。該過(guò)程主要分為三步:
- 判斷是否重復(fù)添加
- 創(chuàng)建節(jié)點(diǎn)并添加至鏈表
- 回調(diào)添加完成事件
以這段常見的代碼為例:
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 添加 serverHandler
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
我們從 ChannelPipeline.addLast() 方法進(jìn)去:
public class DefaultChannelPipeline implements ChannelPipeline {
...
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 循環(huán)處理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢查是否重復(fù)
checkMultiplicity(handler);
// 創(chuàng)建新節(jié)點(diǎn)
newCtx = newContext(group, filterName(name, handler), handler);
// 添加新節(jié)點(diǎn)
addLast0(newCtx);
// 如果 registered 為 false,則表示這個(gè)channel還未注冊(cè)到EventLoop上.
// 在這種情況下,我們添加一個(gè)Task到PendingHandlerCallback中璧榄,
// 等到這個(gè)channel注冊(cè)成功之后特漩,將會(huì)調(diào)用立即調(diào)用 ChannelHandler.handlerAdded(...) 方法,已達(dá)到channel添加的目的
if (!registered) {
// 設(shè)置為待添加狀態(tài)
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 獲取executor
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 設(shè)置為待添加狀態(tài)
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
// 回調(diào)添加完成事件
callHandlerAdded0(newCtx);
}
});
return this;
}
}
// 回調(diào)添加完成事件
callHandlerAdded0(newCtx);
return this;
}
// 檢查是否重復(fù)
private static void checkMultiplicity(ChannelHandler handler) {
// handler是否為ChannelHandlerAdapter類型犹菱,不是則不做處理
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 判斷handler是否添加了Sharable注解 && 是否添加過(guò)了
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
// 創(chuàng)建新的節(jié)點(diǎn)
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// 調(diào)用DefaultChannelHandlerContext的構(gòu)造函數(shù)
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 在tail節(jié)點(diǎn)之前添加新節(jié)點(diǎn)
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 回調(diào)ChannelHandler中的handlerAdded方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 我們必須在handlerAdded方法之前調(diào)用setAddComplete方法拾稳。否則的話吮炕,一旦handlerAdded方法產(chǎn)生了任何pipeline事件腊脱,由于狀態(tài)的緣故,ctx.handler()將會(huì)丟失這些事件的處理龙亲。
// 設(shè)置新節(jié)點(diǎn)的狀態(tài)為添加完成狀態(tài)
ctx.setAddComplete();
// 調(diào)用handlerAdded接口
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
...
// 如果添加失敗陕凹,則刪除新節(jié)點(diǎn)
remove0(ctx);
...
}
}
...
}
我們來(lái)看下setAddComplete()方法:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
// 通過(guò)自旋操作悍抑,設(shè)置狀態(tài)為ADD_COMPLETE
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
...
// 設(shè)置為 ADD_PENDING 狀態(tài)
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated;
// This should always be true as it MUST be called before setAddComplete() or setRemoved().
}
...
}
回調(diào)用戶自定義Handler中的handlerAdded方法:
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.printf("ServerHandler added ....");
}
...
}
ChannelInitializer
關(guān)于回調(diào)ChannelHandler中的handlerAdded()方法,最常見的一個(gè)場(chǎng)景就是杜耙,使用 ChannelInitializer 來(lái)添加我們自定義的ChannelHandler搜骡。ChannelInitializer被添加完成之后,會(huì)回調(diào)到它的 initChannel 方法佑女。
接下來(lái)记靡,我們看看 ChannelInitializer 這個(gè)類,它是一個(gè)特殊的ChannelInboundHandler团驱,它提供了一種在Channel注冊(cè)到EventLoop后初始化Channel的簡(jiǎn)便方法摸吠。
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
/**
* 當(dāng) ch 注冊(cè)成功之后,該方法就會(huì)被調(diào)用嚎花,該方法結(jié)束返回之后寸痢,此ChannelInitializer實(shí)例將會(huì)從Channel所綁定的ChannelPipeline中移除
*
* @param ch 所注冊(cè)的Channel
*
*/
protected abstract void initChannel(C ch) throws Exception;
...
// ChannelInitializer 添加成功后,會(huì)回調(diào)到handlerAdded()接口
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 標(biāo)記ctx為true,且之前沒有標(biāo)記過(guò)紊选。防止重復(fù)執(zhí)行
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
// 調(diào)用initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 最終會(huì)刪除 ChannelInitializer 實(shí)例
remove(ctx);
}
return true;
}
return false;
}
// 刪除 ChannelInitializer 實(shí)例
private void remove(ChannelHandlerContext ctx) {
try {
// 獲取 Pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 從 Pipeline 中返回 ChannelInitializer 實(shí)例
if (pipeline.context(this) != null) {
// 刪除 ChannelInitializer 實(shí)例
// 刪除邏輯請(qǐng)看下一小節(jié)
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
}
遍歷 ChannelHandlerContext 節(jié)點(diǎn)查詢出ChannelHandler實(shí)例
public class DefaultChannelPipeline implements ChannelPipeline {
...
// 通過(guò)handler獲取ChannelHandlerContext
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
...
}
Pipeline中除了addLast方法外啼止, 還有addFirst、addBefore兵罢、addAfter等方法献烦,邏輯類似,可以自行研究學(xué)習(xí)卖词。
Pipeline 節(jié)點(diǎn)刪除
上面仿荆,我們講了Pipeline節(jié)點(diǎn)的添加,這小結(jié)我們看看Pipeline節(jié)點(diǎn)的刪除功能坏平。
netty 有個(gè)最大的特性之一就是Handler可插拔拢操,做到動(dòng)態(tài)編織pipeline,比如在首次建立連接的時(shí)候舶替,需要通過(guò)進(jìn)行權(quán)限認(rèn)證令境,在認(rèn)證通過(guò)之后,就可以將此context移除顾瞪,下次pipeline在傳播事件的時(shí)候就就不會(huì)調(diào)用到權(quán)限認(rèn)證處理器舔庶。
下面是權(quán)限認(rèn)證Handler最簡(jiǎn)單的實(shí)現(xiàn),第一個(gè)數(shù)據(jù)包傳來(lái)的是認(rèn)證信息陈醒,如果校驗(yàn)通過(guò)惕橙,就刪除此Handler,否則钉跷,直接關(guān)閉連接
// 鑒權(quán)Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("AuthHandler has been removed ! ");
}
}
我們來(lái)看看 DefaultChannelPipeline 中的 remove 方法:
public class DefaultChannelPipeline implements ChannelPipeline {
...
// 從Pipeline中刪除ChannelHandler
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
...
// 獲取 ChannelHandler 弥鹦,獲取不到就拋出異常
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
...
// 刪除
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
// ctx不能為heand與tail
assert ctx != head && ctx != tail;
synchronized (this) {
// 從pipeline中刪除ChannelHandlerContext節(jié)點(diǎn)
remove0(ctx);
// 如果為false,則表明channel還沒有注冊(cè)到eventloop上
// 在刪除這種場(chǎng)景下,我們先添加一個(gè)Task彬坏,一旦channel注冊(cè)成功就會(huì)調(diào)用這個(gè)Task朦促,這個(gè)Task就會(huì)立即調(diào)用ChannelHandler.handlerRemoved(...)方法,來(lái)從pipeline中刪除context栓始。
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
// 回調(diào) handlerRemoved 方法
callHandlerRemoved0(ctx);
return ctx;
}
...
// 刪除節(jié)點(diǎn) ChannelHandlerContext
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
...
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
// 回調(diào) handlerRemoved 方法
// 也就是我們前面例子 AuthHandler 中的 handlerRemoved() 方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 設(shè)置為ctx 狀態(tài)為 REMOVE_COMPLETE
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
...
}
好了务冕, 刪除的邏輯就分析到這里了。
小結(jié)
這一講我們分析了Pipeline的創(chuàng)建過(guò)程幻赚,了解Pipeline中的鏈表結(jié)構(gòu)以及每個(gè)節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)禀忆。還分析了Pipeline是如何添加節(jié)點(diǎn)的,又是如何刪除節(jié)點(diǎn)的落恼。接下來(lái) 油湖,我們會(huì)分析Pipeline如何進(jìn)行事件傳播的。