一精续、Channel概述
?在Java NIO中Channel是提供與I/O服務的直接連接。Channel用在字節(jié)緩沖區(qū)和位于通道另一側的實體(通常是一個文件或套接字)之間有效地傳輸數據肆资。
?Channel可以形象地比喻為銀行出納窗口使用的導管。您的薪水支票就是您要傳送的信息,載體(Carrier)就好比一個緩沖區(qū)垫竞。您先填充緩沖區(qū)(將您的支票放到載體上),接著將緩沖“寫”到通道中(將載體丟進導管中),然后信息負載就被傳遞到通道另一側的 I/O 服務(銀行出納員)欢瞪。
?該過程的回應是:出納員填充緩沖區(qū)(將您的收據放到載體上)活烙,接著開始一個反方向的通道傳輸(將載體丟回到導管中)。載體就到了通道的您這一側(一個填滿了的緩沖區(qū)正等待您的查驗)遣鼓,然后您就會 flip 緩沖區(qū)(打開蓋子)并將它清空(移除您的收據)⌒フ担現(xiàn)在您可以開車走了,下一個對象(銀行客戶)將使用同樣的載體(Buffer)和導管(Channel)對象來重復上述過程骑祟。
?多數情況下回懦,通道與操作系統(tǒng)的文件描述符(File Descriptor)和文件句柄(File Handle)有著一對一的關系。雖然通道比文件描述符更廣義次企,但您將經常使用到的多數通道都是連接到開放的文件描述符的怯晕。 Channel 類提供維持平臺獨立性所需的抽象過程,不過仍然會模擬現(xiàn)代操作系統(tǒng)本身的 I/O 性能缸棵。
?通道是一種途徑舟茶,借助該途徑,可以用最小的總開銷來訪問操作系統(tǒng)本身的 I/O 服務堵第。緩沖區(qū)則是通道內部用來發(fā)送和接收數據的端點吧凉。
?與在NIO中類似,在netty中踏志,Channel就是連接網絡Socket和具有read, write, connect, and bind能力組件的一條通道客燕,并且大大降低了直接使用Socket進行通信的復雜度;Channel是網絡操作抽象類狰贯,聚合了一組功能也搓,包括但不限于網絡讀寫、客戶端發(fā)起連接涵紊、主動關閉連接傍妒,同時也包含了 Netty 框架相關的一些功能,包括獲取 Channel 的 EventLoop摸柄,獲取緩沖區(qū)分配器 ByteBufAllocator 和 Pipeline 等颤练。
二、 原理
如圖所示:
- 一旦客戶端連接成功驱负,將新建一個Channel同該客戶端進行綁定嗦玖;
- Channel從EventLoopGroup獲取一個EventLoop,并注冊到該EventLoop跃脊,Channel生命周期內都和該EventLoop在一起(注冊時獲得SelectionKey)宇挫;
- Channel同客戶端進行網絡連接、關閉和讀寫酪术,生成相對應的Event(改變SelectionKey信息)器瘪,觸發(fā)EventLoop調度線程進行執(zhí)行翠储;
- 如果是讀事件,執(zhí)行線程調度Pipeline來處理用戶業(yè)務邏輯橡疼;
?Channel是Netty抽象出來的網絡IO讀寫相關的接口援所,為什么不直接使用JDK NIO原生的Channel而要另起灶爐呢,主要原因如下:
- JDK的SocketChannel和ServerSocketChannel沒有提供統(tǒng)一的Channel接口供業(yè)務開發(fā)者使用欣除,對于用戶而言住拭,沒有統(tǒng)一的操作視圖,使用起來并不方便历帚;
- JDK的SocketChannel和ServerSocketChannel的主要職責就是網絡操作滔岳,由于它們是SPI類接口,由具體的虛擬機廠家來提供抹缕,所以通過繼承SPI功能類來擴展其功能的難度很大。直接實現(xiàn)ServerSocketChannel和SocketChannel抽象類墨辛,其工作量和重新開發(fā)一個新的Channel的功能類差不多卓研;
- Netty的Channel需要能跟Netty整體框架融合在一起,例如IO模型睹簇、基于ChannelPipeline的定制模型奏赘,以及基于元數據描述配置化的TCP參數等,這些JDK的SocketChannel和ServerSocketChannel都沒有提供太惠,需要重新封裝磨淌。
- 自定義的Channel,功能實現(xiàn)更加靈活凿渊。
?基于以上的4個原因梁只,Netty重新設計了Channel接口,并且給予了很多不同的實現(xiàn)埃脏,它的設計原理很簡單搪锣,但是功能卻比較復雜,主要的設計理念如下:
- 在Channel接口層彩掐,采用Facade模式進行統(tǒng)一封裝构舟,將網絡IO操作,及相關聯(lián)的其他操作封裝起來堵幽,統(tǒng)一對外提供狗超。
- Channel接口的定義盡量大而全,為SocketChannel和ServerSocketChannel提供統(tǒng)一的視圖朴下,由不同的子類實現(xiàn)不同的功能努咐,公共功能在抽象父類實現(xiàn),最大限度上實現(xiàn)功能和接口的重用殴胧。
-
具體采用聚合而非包含的方式麦撵,將相關的功能類聚合在Channel中愁憔,由Channel統(tǒng)一負責分配和調度,功能實現(xiàn)更加靈活
损俭。
三辣恋、 Channel功能介紹
?Channel的功能比較繁雜,通過分類的方式對它主要的功能進行介紹羔沙。
3.1 網絡IO操作
?Channel網絡IO相關的方法定義如下:
下面對這些API的功能進行分類說明躺涝,讀寫相關的API列表。
- Chanenl read():從當前Channel讀取數據到第一個inbound緩沖區(qū)中扼雏,如果數據被成功讀取坚嗜,觸發(fā)ChannelHandler.channelRead(ChannelHandlerContext,Object)事件,讀取操作調用完之后诗充,緊接著會觸發(fā)ChannelHandler.channelReadComplete(ChannelHandlerContext)事件苍蔬,這樣業(yè)務的ChannelHandler可以決定是否需要繼續(xù)讀取數據。如果已經有讀操作請求被掛起蝴蜓,則后續(xù)的讀操作會被忽略碟绑。
- ChannelFuture write(Object msg):請求將當前的msg通過ChannelPipeline寫入到目標Channel中。注意茎匠,write操作只是將消息存入到消息發(fā)送環(huán)形數組中格仲,并沒有真正被發(fā)送,只有調用flush操作才會被寫入到Channel中诵冒,發(fā)送給對方凯肋。
- ChannelFuture write(Object msg, ChannelPromise promise):功能與write(Object msg)相同,但是攜帶了ChannelPromise參數負責設置寫入操作的結果汽馋。
- ChannelFuture writeAndFlush(Object msg,ChannelPromise promise):與方法3功能類似侮东,不同之處在于它會將消息寫入Channel中發(fā)送,等價于單獨調用write和flush操作的組合豹芯。
- ChannelFuture writeAndFlush(Object msg):功能等于方法4苗桂,但是沒有ChannelPromise promise參數。
- Channel flush():將之前寫入到發(fā)送環(huán)形數組中的消息全部寫入到目標Channel中告组,發(fā)送給通信對方煤伟。
- ChannelFuture close(ChannelPromise promise):主動關閉當前連接,通過ChannelPromise設置操作結果并進行結果通知木缝,無論操作是否成功便锨,都可以通過ChannelPromise獲取操作結果。改操作會級聯(lián)觸發(fā)ChannelPipe中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件我碟。
- ChannelFuture disconnect(ChannelPromise promise):請求斷開與遠程通信對端的連接并使用ChannelPromise獲取操作結果的通知信息放案。該方法會級聯(lián)觸發(fā)ChannelHandler.disconnect(ChannelHandlerContext,ChannelPromise)事件。
- ChannelFuture connect(SocketAddress remoteAddress):客戶端使用指定的服務端地址發(fā)起連接請求矫俺,如果連接因為應答超時而失敗吱殉,ChannelFuture中的操作結果ConnectTimeoutException異常掸冤,如果連接被拒絕,操作結果為ConnectException友雳。該方法會級聯(lián)觸發(fā)ChannelHandler.connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)事件稿湿。
- ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress):功能與方法9類似,唯一不同的是先綁定本地地址localAddress押赊,然后在連接服務端饺藤。
- ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise):與方式9功能類似,唯一不同的是多了一個ChannelPromise參數用于寫入操作結果流礁。
- ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise):與方法11功能類似涕俗,唯一不同的就是綁定了本地地址。
- ChannelFuture bind(SocketAddress localAddress):綁定指定的本地Socket地址神帅,該方法會級聯(lián)觸發(fā)ChannelHandler.bind(ChannelHandlerContext,SocketAddress,ChannelPromise)事件再姑。
- ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise):與13方法功能類似,多了一個ChannelPromise參數用于寫入操作結果找御。
- ChannelConfig config():獲取當前Channel的配置信息元镀,例如CONNECT_TIMEOUT_MILLIS。
- blooean is opoen():判斷當前Channel是否已經打開
- boolean isRegistered():判斷當前Channel是否已經注冊到EventLoop上萎坷。
- boolean isActive():判斷當前Channel是否已經處于激活狀態(tài)
- ChannelMetadata metadata():獲取當前Channel的元數據描述信息凹联,包括TCP參數配等沐兰。
- SocketAddress localAddress():獲取當前Channel的本地綁定地址
- SocketAddress remoteAddress():獲取當前Channel通信的遠程Socket地址哆档。
3.2 其他常用的API功能說明
?第一個比較重要的方法是eventLoop,Channel需要注冊到EventLoop的多路復用器上住闯,用于處理IO事件瓜浸,通過eventLoop方法可以獲取到Channel注冊的EventLoop。EventLoop本質上就是處理網絡讀寫事件的Reactor線程比原。在Netty中插佛,它不僅僅用來處理網絡事件,也可以用來執(zhí)行定時任務和用戶自定義NIO Task等任務量窘。
?第二個比較常用的方法時metadata方法雇寇,熟悉TCP協(xié)議的同學都可能知道,當創(chuàng)建Socket的時候需要指定TCP參數蚌铜,例如接收和發(fā)送的TCP緩沖區(qū)大小锨侯,TCP的超時事件,是否重用地址等等冬殃。在Netty中囚痴,每個Channel對應一個物理連接,每個鏈接都有自己的TCP參數配置审葬。所以Channel會聚合一個ChannelMetadata用來對TCP參數提供元數據描述信息深滚,通過metadata方法就可以獲取當前Channel的TCP參數配置奕谭。
?第三個方法時parent,對于服務端Channel而言痴荐,它的父Channel為空血柳,對于客戶端Channel,它的父Channel就是創(chuàng)建它的ServerSocketChannel蹬昌。
?第四個方法是用戶獲取Channel標識id混驰,它返回ChannelId對象,ChannelId是Channel的唯一標識皂贩,它的可能生成策略如下:
- (1)機器的MAC地址(EUI-48或者EUI-64)等可以代表全局唯一的信息栖榨。
- (2)當前進程的ID。
- (3)當前系統(tǒng)時間的毫秒----System.currentTimeMillis
- (4)當前系統(tǒng)時間的納秒----System.nanoTime
- (5)32位的隨機整型數
- (6)32位的自增的序列
四明刷、Channel狀態(tài)
4.1 狀態(tài)相關API
boolean isOpen(); // 是否開放
boolean isRegistered(); // 是否注冊到一個EventLoop
boolean isActive(); // 是否激活
boolean isWritable(); // 是否可寫
?open表示Channel的開放狀態(tài)婴栽,True表示Channel可用,F(xiàn)alse表示Channel已關閉不再可用辈末。registered表示Channel的注冊狀態(tài)愚争,True表示已注冊到一個EventLoop,F(xiàn)alse表示沒有注冊到EventLoop挤聘。active表示Channel的激活狀態(tài)轰枝,對于ServerSocketChannel,True表示Channel已綁定到端口组去;對于SocketChannel鞍陨,表示Channel可用(open)且已連接到對端。Writable表示Channel的可寫狀態(tài)从隆,當Channel的寫緩沖區(qū)outboundBuffer非null且可寫時返回True诚撵。
一個正常結束的Channel狀態(tài)轉移有以下兩種情況:
REGISTERED->CONNECT/BIND->ACTIVE->CLOSE->INACTIVE->UNREGISTERED
REGISTERED->ACTIVE->CLOSE->INACTIVE->UNREGISTERED
?其中第一種是服務端用于綁定的Channel或者客戶端用于發(fā)起連接的Channel,第二種是服務端接受的SocketChannel键闺。一個異常關閉的Channel則不會服從這樣的狀態(tài)轉移寿烟。
五、 Channel源碼分析
?Channel的實現(xiàn)類非常多辛燥,繼承關系復雜筛武,從學習的角度我們抽取最重要的兩個NioServerSocketChannel和NioSocketChannel。
?服務端NioServerSocketChannel的繼承關系類圖如下:
客戶端NioSocketChannel的繼承關系類圖如下:
5.1 AbstractChannel源碼分析
成員變量定義
?在分析Abstract源碼之前先了解下它的成員變量定義挎塌,首先定義了五個靜態(tài)全局異常:
- FLUSH0_CLOSED_CHANNEL_EXCEPTION
- ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION
- CLOSE_CLOSED_CHANNEL_EXCEPTION
- WRITE_CLOSED_CHANNEL_EXCEPTION
- FLUSH0_NOT_YET_CONNECTED_EXCEPTION
然后看AbstractChannel的字段:
private final Channel parent; // 父Channel
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline; // 處理通道
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress; // 本地地址
private volatile SocketAddress remoteAddress; // 遠端地址
private volatile EventLoop eventLoop; // EventLoop線程
private volatile boolean registered; // 是否注冊到EventLoop
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
然后徘六,我們看其中的構造方法:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
?newUnsafe()和newChannelPipeline()可由子類覆蓋實現(xiàn)。在Netty的實現(xiàn)中每一個Channel都有一個對應的Unsafe內部類:AbstractChannel--AbstractUnsafe勃蜘,AbstractNioChannel--AbstractNioUnsafe等等硕噩,newUnsafe()方法正好用來生成這樣的對應關系。ChannelPipeline將在之后講解缭贡,這里先了解它的功能:作為用戶處理器Handler的容器為用戶提供自定義處理I/O事件的能力即為用戶提供業(yè)務邏輯處理炉擅。AbstractChannel中對I/O事件的處理辉懒,都委托給ChannelPipeline處理,代碼基本一樣:
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
?通過AbstractChannel源碼看下網絡IO相關的操作谍失,前面有提到網絡IO操作時講到它會觸發(fā)Channelpipeline中對應的事件方法眶俩,Netty基于事件驅動,我們也可以理解為當Channel進行IO操作時會產生對應的IO事件快鱼,然后驅動事件在ChannelPipe中傳播颠印,由相對應的ChannelHandler對事件進行攔截和處理,不關心的事件可以直接忽略抹竹。采用事件驅動的方式可以非常輕松的通過事件定義來劃分事件攔截切面线罕,方便業(yè)務的定制和功能擴展,相比AOP窃判,其性能更高钞楼,但是功能卻基本等價。
?網絡IO操作直接調用DefaultPipeline的相關方法袄琳,由DefaultChannelPipeline中對應的ChannelHandler進行具體的邏輯處理询件。
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public ChannelFuture deregister() {
return pipeline.deregister();
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
return pipeline.disconnect(promise);
}
@Override
public ChannelFuture close(ChannelPromise promise) {
return pipeline.close(promise);
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
return pipeline.deregister(promise);
}
@Override
public Channel read() {
pipeline.read();
return this;
}
AbstractChannel其他方法都比較簡單,主要關注狀態(tài)判定的方法:
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable(); // 寫緩沖區(qū)不為null且可寫
}
@Override
public boolean isRegistered() {
return registered;
}
對于Channel的實現(xiàn)來說唆樊,其中的內部類Unsafe才是關鍵宛琅,因為其中含有I/O事件處理的細節(jié)。AbstractUnsafe作為AbstractChannel的內部類逗旁,定義了I/O事件處理的基本框架嘿辟,其中的細節(jié)留給子類實現(xiàn)。我們將依次對各個事件框架進行分析痢艺。
- register事件框架
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) { // 已經注冊則失敗
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) { // EventLoop不兼容當前Channel
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 當前線程為EventLoop線程直接執(zhí)行仓洼;否則提交任務給EventLoop線程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly(); // 異常時關閉Channel
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
?類似eventLoop.inEventLoop()及之后的一段代碼在Netty中使用了很多次介陶,這是為了保證I/O事件以及用戶定義的I/O事件處理邏輯(業(yè)務邏輯)在一個線程中處理堤舒。我們看提交的任務register0():
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 模板方法,細節(jié)由子類完成
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded(); // 將用戶Handler添加到ChannelPipeline
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // 觸發(fā)Channel注冊事件
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// ServerSocketChannel接受的Channel此時已被激活
if (isActive()) {
// 首次注冊且激活觸發(fā)Channel激活事件
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead(); // 可視為模板方法
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly(); // 可視為模板方法
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0()方法定義了注冊到EventLoop的整體框架哺呜,整個流程如下:
(1).注冊的具體細節(jié)由doRegister()方法完成舌缤,子類中實現(xiàn)。
(2).注冊后將處理業(yè)務邏輯的用戶Handler添加到ChannelPipeline某残。
(3).異步結果設置為成功国撵,觸發(fā)Channel的Registered事件。
(4).對于服務端接受的客戶端連接玻墅,如果首次注冊介牙,觸發(fā)Channel的Active事件,如果已設置autoRead澳厢,則調用beginRead()開始讀取數據环础。
對于(4)的是因為fireChannelActive()中也根據autoRead配置囚似,調用了beginRead()方法。beginRead()方法其實也是一個框架线得,細節(jié)由doBeginRead()方法在子類中實現(xiàn):
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
異常處理的closeForcibly()方法也是一個框架饶唤,細節(jié)由doClose()方法在子類中實現(xiàn):
@Override
public final void closeForcibly() {
assertEventLoop();
try {
doClose();
} catch (Exception e) {
logger.warn("Failed to close a channel.", e);
}
}
register框架中有一對safeSetXXX()方法,將未完成的Promise標記為完成且成功或失敗贯钩,其實現(xiàn)如下:
/**
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
*/
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
至此募狂,register事件框架分析完畢。
- bind事件框架
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
?bind事件框架較為簡單角雷,主要完成在Channel綁定完成后觸發(fā)Channel的Active事件祸穷。其中的invokeLater()方法向Channel注冊到的EventLoop提交一個任務:
private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e);
}
}
?closeIfClosed()方法當Channel不再打開時關閉Channel,代碼如下:
protected final void closeIfClosed() {
if (isOpen()) {
return;
}
close(voidPromise());
}
close()也是一個框架勺三,后面會單獨進行分析粱哼;
- disconnect事件框架
@Override
public final void disconnect(final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect(); // 模板方法,細節(jié)由子類實現(xiàn)
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() { // 觸發(fā)Inactive事件
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel
}
- close事件框架
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise); // 已經關閉檩咱,保證底層close只執(zhí)行一次
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess(); // 當Channel關閉時揭措,將此次close異步請求結果也設置為成功
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. // 設置為空禁止write操作,同時作為標記字段表示正在關閉
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise); // 由上面的prepareToClose返回的executor執(zhí)行
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() { // Channel注冊的EventLoop執(zhí)行
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
// 寫緩沖隊列中的數據全部設置失敗
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise); // 當前調用線程執(zhí)行
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose(); // 模板方法刻蚯,細節(jié)由子類實現(xiàn)
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
?close事件框架保證只有一個線程執(zhí)行了真正關閉的doClose()方法绊含,prepareToClose()做一些關閉前的清除工作并返回一個Executor,如果不為空炊汹,需要在該Executor里執(zhí)行doClose0()方法躬充;為空,則在當前線程執(zhí)行(為什么這樣設計讨便?)充甚。寫緩沖區(qū)outboundBuffer同時也作為一個標記字段,為空表示Channel正在關閉此時禁止寫操作霸褒。fireChannelInactiveAndDeregister()方法需要invokeLater()使用EventLoop執(zhí)行伴找,是因為其中會調用deRegister()方法觸發(fā)Inactive事件,而事件執(zhí)行需要在EventLoop中執(zhí)行废菱。
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
- deregister事件框架
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) { // 已經deregister
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister(); // 模板方法技矮,子類實現(xiàn)具體細節(jié)
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive(); // 根據參數觸發(fā)Inactive事件
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered(); // 首次調用觸發(fā)Unregistered事件
}
safeSetSuccess(promise);
}
}
});
}
?deregister事件框架的處理流程很清晰,其中殊轴,使用invokeLater()方法是因為:用戶可能會在ChannlePipeline中將當前Channel注冊到新的EventLoop衰倦,確保ChannelPipiline事件和doDeregister()在同一個EventLoop完成。
?需要注意的是:事件之間可能相互調用旁理,比如:disconnect->close->deregister樊零。
- write事件框架
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
// 聯(lián)系close操作,outboundBuffer為空表示Channel正在關閉孽文,禁止寫數據
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg); // 釋放msg 防止泄露
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
?事實上驻襟,這是Netty定義的write操作的全部代碼十性,完成的功能是將要寫的消息Msg加入到寫緩沖區(qū)。其中的filterOutboundMessage()可對消息進行過濾整理塑悼,例如把HeapBuffer轉為DirectBuffer劲适,具體實現(xiàn)由子類負責:
protected Object filterOutboundMessage(Object msg) throws Exception {
return msg; // 默認實現(xiàn)
}
- flush事件框架
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) { // Channel正在關閉直接返回
return;
}
outboundBuffer.addFlush(); // 添加一個標記
flush0();
}
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return; // 正在flush返回防止多次調用
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return; // Channel正在關閉或者已沒有需要寫的數據
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
// Channel已經非激活,將所有進行中的寫請求標記為失敗
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer); // 模板方法厢蒜,細節(jié)由子類實現(xiàn)
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
?flush事件中執(zhí)行真正的底層寫操作霞势,Netty對于寫的處理引入了一個寫緩沖區(qū)ChannelOutboundBuffer,由該緩沖區(qū)控制Channel的可寫狀態(tài)斑鸦,其具體實現(xiàn)愕贡,將會在緩沖區(qū)一章中分析。
?至此巷屿,Unsafe中的事件方法已經分析完7個固以,但還有connect和read沒有引入,下一節(jié)將進行分析嘱巾。(見下一篇文章)