一. NioUnsafe 接口
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
* 返回底層的NIO通道
*/
SelectableChannel ch();
/**
* 完成連接;
* 在 NioEventLoop 的 `processSelectedKey` 方法中調(diào)用专酗,
* 當(dāng)?shù)讓拥腘IO通道接收到連接事件 OP_CONNECT 時(shí)調(diào)用
*/
void finishConnect();
/**
* NIO的 SelectableChannel通道中獲取到遠(yuǎn)端的數(shù)據(jù)
* 在 NioEventLoop 的 `processSelectedKey` 方法中調(diào)用盗扇,
* 當(dāng)?shù)讓拥腘IO通道接收到讀取事件 OP_READ 時(shí)調(diào)用祷肯。
*/
void read();
/**
* 強(qiáng)制刷新粱玲;
* 在 NioEventLoop 的 `processSelectedKey` 方法中調(diào)用,
* 當(dāng)?shù)讓拥腘IO通道接收到可寫(xiě)事件 OP_WRITE 時(shí)調(diào)用抽减。
*/
void forceFlush();
}
NioUnsafe
接口比 Unsafe
多了四個(gè)方法:
-
SelectableChannel ch()
返回底層的NIO通道 - 剩下三個(gè)方法都與
NioEventLoop
類(lèi)中接收到的NIO
通道事件有關(guān):-
finishConnect()
接收到連接事件OP_CONNECT
時(shí)調(diào)用。 -
read()
接收到可讀事件OP_READ
時(shí)調(diào)用卵沉。 -
forceFlush()
收到可寫(xiě)事件OP_WRITE
時(shí)調(diào)用。
-
二. AbstractNioUnsafe 類(lèi)
2.1 實(shí)現(xiàn) NioUnsafe
接口中三個(gè)方法
2.1.1 ch()
// 在 AbstractNioChannel 中的方法
protected SelectableChannel javaChannel() {
return ch;
}
@Override
public final SelectableChannel ch() {
return javaChannel();
}
2.1.2 finishConnect()
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
// 注意琼掠,只有在連接嘗試既沒(méi)有被取消也沒(méi)有超時(shí)時(shí),事件循環(huán)才會(huì)調(diào)用此方法瓷蛙。
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
// 調(diào)用 AbstractNioChannel 的 doFinishConnect 方法進(jìn)行完成連接操作
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// See https://github.com/netty/netty/issues/1770
// 檢查是否為null戈毒,因?yàn)閏onnectTimeoutFuture僅在超時(shí)時(shí)間 connectTimeoutMillis > 0時(shí)才創(chuàng)建
// 連接已經(jīng)完成艰猬,取消連接超時(shí)任務(wù)
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
- NIO通道接收到連接事件
OP_CONNECT
,表示已經(jīng)成功建立連接了埋市。- 調(diào)用
AbstractNioChannel
的doFinishConnect
方法進(jìn)行完成連接操作。- 調(diào)用
fulfillConnectPromise
方法道宅,完成ChannelPromise
的通知胸蛛,以及是否發(fā)送ChannelActive
事件和ChannelInactive
事件樱报。- 最后因?yàn)檫B接已經(jīng)完成葬项,就需要取消連接超時(shí)任務(wù)肃弟。
2.1.3 forceFlush()
@Override
protected final void flush0() {
//只有當(dāng)沒(méi)有掛起的刷新時(shí)才立即刷新零蓉。
//如果有一個(gè)掛起的刷新操作笤受,事件循環(huán)將在稍后調(diào)用forceFlush()敌蜂,因此不需要現(xiàn)在調(diào)用它。
if (!isFlushPending()) {
super.flush0();
}
}
@Override
public final void forceFlush() {
// 直接調(diào)用super.flush0()章喉,強(qiáng)制立即刷新
super.flush0();
}
/**
* 返回是否準(zhǔn)備刷新
*/
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
// 選擇鍵有效,且有可讀事件 OP_WRITE時(shí)秸脱,返回 true
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
調(diào)用父類(lèi)
flush0()
方法進(jìn)行刷新。
還會(huì)復(fù)寫(xiě)父類(lèi)的flush0()
的方法摊唇,只有當(dāng)沒(méi)有掛起的刷新時(shí)才立即刷新。
2.1.4 read()
這個(gè)是 AbstractNioUnsafe
中唯一沒(méi)有實(shí)現(xiàn)NioUnsafe
的方法巷查,它在 AbstractNioByteChannel
和 AbstractNioMessageChannel
中提供不同的實(shí)現(xiàn),等后面再說(shuō)岛请。
2.2 實(shí)現(xiàn)AbstractUnsafe
中連接方法
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 檢查通道是否仍然打開(kāi)
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
// 當(dāng) connectPromise 不為空,說(shuō)明已經(jīng)有人嘗試連接了
// 防止重復(fù)嘗試連接
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
// 調(diào)用 AbstractNioChannel 的 doConnect 方法進(jìn)行連接
if (doConnect(remoteAddress, localAddress)) {
// 完成 ChannelPromise 的通知盅称,
// 以及是否發(fā)送 ChannelActive 事件和 ChannelInactive 事件
fulfillConnectPromise(promise, wasActive);
} else {
/**
* 因?yàn)檫B接操作是一個(gè)異步操作,
* 是否連接成功缩膝,是由底層 NIO通道接收到連接事件 OP_CONNECT 為準(zhǔn)的咧擂,
* 所以這里要設(shè)置一個(gè)超時(shí)任務(wù)逞盆,當(dāng)規(guī)定時(shí)間內(nèi)松申,還沒(méi)有連接成功俯逾,
* 那么就要關(guān)閉通道和相關(guān)的通知操作。
*
* 還要考慮用戶(hù)主動(dòng)取消這次連接請(qǐng)求桌肴。
*/
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
// 設(shè)置一個(gè)超時(shí)任務(wù),規(guī)定時(shí)間內(nèi)坠七,它沒(méi)有被取消旗笔,就會(huì) close(voidPromise()) 關(guān)閉通道
// 在 finishConnect() 和 doClose() 方法中彪置,會(huì)取消這個(gè)超時(shí)任務(wù)
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
if (connectPromise != null && !connectPromise.isDone()
&& connectPromise.tryFailure(new ConnectTimeoutException(
"connection timed out: " + remoteAddress))) {
// 連接超時(shí)蝇恶,關(guān)閉通道
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 用戶(hù)主動(dòng)取消這次連接請(qǐng)求, 要取消連接超時(shí)任務(wù)撮弧,以及關(guān)閉通道
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
/**
* 完成 ChannelPromise 的通知,以及是否發(fā)送 ChannelActive 事件和 ChannelInactive 事件
*/
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
// 嘗試設(shè)置 promise 為成功完成贿衍,
// 如果設(shè)置失敗,即返回 false贸辈,表示用戶(hù)取消了這次連接請(qǐng)求
boolean promiseSet = promise.trySuccess();
// 無(wú)論用戶(hù)是否取消了這次連接請(qǐng)求,
// 都判斷是否發(fā)送 ChannelActive 事件
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// 如果用戶(hù)取消了這次連接請(qǐng)求裙椭,
// 則關(guān)閉通道,然后可能會(huì)發(fā)送 ChannelInactive 事件
if (!promiseSet) {
close(voidPromise());
}
}
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// 使用 tryFailure() 而不是 setFailure(),
// 來(lái)避免與cancel()的競(jìng)爭(zhēng)扫尺。
promise.tryFailure(cause);
closeIfClosed();
}
方法流程:
- 調(diào)用
AbstractNioChannel
的doConnect
方法進(jìn)行連接。如果返回
true
,表示一直阻塞等待連接成功正驻。
如果返回false
,表示是一個(gè)非阻塞連接,需要等待底層NIO
通道接收到連接事件OP_CONNECT
,才代表連接成功姑曙。 - 阻塞連接成功
那么就調(diào)用
fulfillConnectPromise(...)
方法,完成ChannelPromise
的通知,以及是否發(fā)送ChannelActive
事件和ChannelInactive
事件伤靠。 - 非阻塞連接
- 需要?jiǎng)?chuàng)建一個(gè)超時(shí)任務(wù),當(dāng)規(guī)定時(shí)間內(nèi)宴合,還沒(méi)有連接成功焕梅,那么就要關(guān)閉通道和相關(guān)的通知操作卦洽。
- 再考慮用戶(hù)主動(dòng)取消這次連接請(qǐng)求時(shí),要取消連接超時(shí)任務(wù)阀蒂,以及關(guān)閉通道。
三. AbstractNioChannel 中實(shí)現(xiàn)的方法
3.1 EventLoop
的兼容性
@Override
protected boolean isCompatible(EventLoop loop) {
// 與當(dāng)前通道兼容的事件輪詢(xún)器必須是 NioEventLoop 的子類(lèi)
return loop instanceof NioEventLoop;
}
與
AbstractNioChannel
匹配的事件輪詢(xún)器必須是NioEventLoop
的子類(lèi)蚤霞。
3.2 注冊(cè)
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 通過(guò)NIO SelectableChannel 的register方法,
// 將NIO通道注冊(cè)到事件輪詢(xún)器的 Selector 上争便,
// 這樣就可以監(jiān)聽(tīng)NIO通道的 IO事件断医,包括接收,連接鉴嗤,可讀,可寫(xiě)醉锅。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
通過(guò)
NIO SelectableChannel
的register
方法,將NIO
通道注冊(cè)到事件輪詢(xún)器的Selector
上硬耍。
這樣就可以監(jiān)聽(tīng)NIO
通道的IO
事件,包括接收经柴,連接,可讀坯认,可寫(xiě)。
3.3 取消注冊(cè)
@Override
protected void doDeregister() throws Exception {
// 將通道從已注冊(cè)的事件輪詢(xún)器中取消
eventLoop().cancel(selectionKey());
}
將通道從已注冊(cè)的事件輪詢(xún)器中取消牛哺。
3.4 開(kāi)始讀
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() 方法調(diào)用,
// 會(huì)調(diào)用到這里
final SelectionKey selectionKey = this.selectionKey;
// 當(dāng)前選擇鍵是否有效
if (!selectionKey.isValid()) {
return;
}
// 設(shè)置當(dāng)前通道是 可讀狀態(tài)
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 設(shè)置底層NIO通道讀事件 OP_READ 或 OP_ACCEPT
* 與 AbstractNioUnsafe 的 removeReadOp() 方法正好相反引润。
*/
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
就是設(shè)置底層
NIO
通道讀事件。
3.5 關(guān)閉
@Override
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
// 使用tryFailure()而不是setFailure() 方法淳附,
// 來(lái)避免與取消 cancel()的競(jìng)爭(zhēng)凰荚。
promise.tryFailure(new ClosedChannelException());
connectPromise = null;
}
// 關(guān)閉操作時(shí)褒脯,需要取消連接超時(shí)任務(wù)
Future<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
}
注意這個(gè)方法便瑟,沒(méi)有調(diào)用底層
NIO
通道的關(guān)閉close
方法番川;也就是說(shuō)子類(lèi)一般都需要復(fù)寫(xiě)它。
3.6 小結(jié)
AbstractNioChannel
沒(méi)有實(shí)現(xiàn)寫(xiě)操作相關(guān)的方法颁督,以及連接操作相關(guān)方法 doConnect(...)
和 doFinishConnect()
。
四. 讀數(shù)據(jù)操作
在 AbstractNioByteChannel
和 AbstractNioMessageChannel
類(lèi)中沉御,實(shí)現(xiàn)了兩種方式的讀數(shù)據(jù)操作。
4.1 AbstractNioByteChannel
中讀數(shù)據(jù)
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 通過(guò) allocHandle吠裆,在接收數(shù)據(jù)時(shí)分配緩存區(qū) ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 通過(guò) doReadBytes(byteBuf) 方法,從底層 NIO 通道中讀取數(shù)據(jù)到 ByteBuf 中试疙,
// 并返回讀取數(shù)據(jù)的大小祝旷;
// 通過(guò) lastBytesRead 方法記錄上次讀操作已讀取的字節(jié)。
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 沒(méi)有可讀數(shù)據(jù)了怀跛;釋放緩沖區(qū)。
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
// 增加當(dāng)前讀循環(huán)中已讀消息的數(shù)量
allocHandle.incMessagesRead(1);
readPending = false;
// 通過(guò)管道 pipeline 發(fā)送 ChannelRead 讀取事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 通過(guò)allocHandle.continueReading()方法吻谋,
// 判斷是否需要繼續(xù)讀取。
} while (allocHandle.continueReading());
// 這次讀取已完成
allocHandle.readComplete();
// 通過(guò)管道 pipeline 發(fā)送 ChannelReadComplete 讀取完成事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- 通過(guò)
doReadBytes(byteBuf)
方法,從底層NIO
通道中讀取數(shù)據(jù)到輸入緩沖區(qū)ByteBuf
中滨溉。- 通過(guò)
pipeline.fireChannelRead(...)
方法,發(fā)送ChannelRead
讀取事件晦攒。- 通過(guò)
allocHandle.continueReading()
判斷是否需要繼續(xù)讀取。- 這次讀取完成脯颜,調(diào)用
pipeline.fireChannelReadComplete()
方法,發(fā)送ChannelReadComplete
讀取完成事件。
4.2 AbstractNioMessageChannel
中讀數(shù)據(jù)
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 將消息讀入給定數(shù)組并返回所讀入的數(shù)量
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
// 小于 0饱亮,表示已經(jīng)關(guān)閉
closed = true;
break;
}
// 增加當(dāng)前讀循環(huán)中已讀消息的數(shù)量
allocHandle.incMessagesRead(localRead);
// 判斷是否需要繼續(xù)讀取
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
// 遍歷讀取消息的數(shù)組readBuf
for (int i = 0; i < size; i ++) {
readPending = false;
// 通過(guò)管道 pipeline 發(fā)送 ChannelRead 讀取事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 這次讀取已完成
allocHandle.readComplete();
// 通過(guò)管道 pipeline 發(fā)送 ChannelReadComplete 讀取完成事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
- 使用
readBuf
數(shù)組舍沙,一次讀取操作所有的數(shù)據(jù)對(duì)象。- 通過(guò)
doReadMessages(readBuf)
方法拂铡,將消息讀入給定數(shù)組readBuf
壹无,并返回所讀入的數(shù)量localRead
感帅。- 通過(guò)
localRead
的值,判斷是否讀取完成失球,或者通道已經(jīng)關(guān)閉。- 通過(guò)
continueReading(allocHandle)
方法实苞,判斷是否需要繼續(xù)讀取豺撑。- 遍歷讀取消息的數(shù)組
readBuf
, 通過(guò)管道pipeline
發(fā)送ChannelRead
讀取事件硬梁;遍歷完成,通過(guò)管道pipeline
發(fā)送ChannelReadComplete
讀取完成事件荧止。