Channel介紹
Channel是JDK 的NIO類庫中的重要組成部分版姑,我們?cè)谥暗拇a中也經(jīng)常用到io.netty.channel.socket.nio.NioSocketChannel和io.netty.channel.socket.nio.NioServerSocketChannel用于阻塞性IO操作
NioServerSocketChannel類的繼承圖
從上可以看出,Channel是最頂層的抽象戴卜,一個(gè)Channel抽象為網(wǎng)絡(luò)socket的連接或者讀寫魄懂,連接飘言,綁定IO的一個(gè)組件峦睡。
一個(gè)Channel需要提供給用戶:
1韧骗、channel當(dāng)前的狀態(tài)(打開還是已連接?)
2零聚、channel的配置參數(shù)(channelConfig)
3袍暴、channel支持哪些IO操作
4、處理channel IO操作的事件
在Netty中所有的IO操作都是異步的隶症,也就意味著IO的請(qǐng)求會(huì)立刻返回政模,并不能保證請(qǐng)求是否已完成。在Netty中IO請(qǐng)求會(huì)返回ChannelFuture實(shí)例蚂会。
Channel是有層級(jí)的淋样,例如,對(duì)于服務(wù)端而言胁住,父channel為空趁猴,對(duì)于客戶端NioSocketChannel,它的父Channel就是創(chuàng)建它的ServerSocketChannel.
Channel主要API方法
下面對(duì)我們對(duì)Channel的主要API做簡(jiǎn)單介紹
Channel read() 從channel中讀取數(shù)據(jù)到第一個(gè)buffer彪见,如果數(shù)據(jù)被成功讀取觸發(fā)channelRead事件儡司,如果數(shù)據(jù)被讀取完成會(huì)觸發(fā)channelReadComplete事件,如果有讀操作被掛起余指,那么后續(xù)讀操作會(huì)取消捕犬。
ChannelFuture write(Object msg) 將當(dāng)前msg通過ChannelPipeLine寫入到Channel,這個(gè)方法不會(huì)真正執(zhí)行flush操作,所以當(dāng)寫入完成后要執(zhí)行flush()方法才能將數(shù)據(jù)真正發(fā)送出去碉碉。
write(Object msg, ChannelPromise promise) 跟write功能一樣柴钻,只是寫入完成后會(huì)回調(diào)promise。
Channel flush() 將channel中所有緩存消息全部寫入到目標(biāo)channel垢粮,發(fā)送給通信對(duì)方贴届。
ChannelFuture writeAndFlush(Object msg) 作用等于write+flush
ChannelFuture disconnect() 請(qǐng)求與遠(yuǎn)程通信對(duì)端斷開連接,這個(gè)操作會(huì)級(jí)聯(lián)觸發(fā)ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise)事件
ChannelFuture connect(SocketAddress remoteAddress) 使用指定的remoteAddress發(fā)起連接請(qǐng)求足丢,并且返回ChannelFuture對(duì)象粱腻,如果連接超時(shí)會(huì)ChannelFuture返回操作結(jié)果是ConnectTimeoutException,如果連接被拒絕操作結(jié)果是ConnectException斩跌,該操作會(huì)觸發(fā)ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)事件绍些。
EventLoop eventLoop() 返回channel注冊(cè)的eventLoop
ChannelConfig config() 返回channel的配置
ChannelMetadata metadata() 返回當(dāng)前channel的元數(shù)據(jù)信息,比如TCP參數(shù)配置耀鸦。
SocketAddress localAddress() 返回channel綁定的本地地址
SocketAddress remoteAddress() 返回channel通信的遠(yuǎn)程地址
Channel parent() 返回父級(jí)channel
AbstractChannel分析
AbstractChannel是Channel的基本實(shí)現(xiàn)類柬批,它采用聚合的方式封裝了各種功能,從成員變量聚合了以下內(nèi)容:
private final Channel parent :父級(jí)channel
private final ChannelId id = DefaultChannelId.newInstance() 全局唯一id
private final Unsafe unsafe : unsafe示例
private final DefaultChannelPipeline pipeline; 當(dāng)前channle對(duì)應(yīng)的DefaultChannelPipeline
private final EventLoop eventLoop; 當(dāng)前channel注冊(cè)的EventLoop
前面提到channel的IO操作會(huì)觸發(fā)會(huì)產(chǎn)生對(duì)應(yīng)的IO事件袖订,然后事件在ChannelPipeLine中傳播氮帐,并由對(duì)應(yīng)的ChannleHandler處理。
AbstractChannel的IO操作直接調(diào)用DefaultChannelPipeline的相關(guān)方法由DefaultChannelPipeline處理相關(guān)邏輯
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
……
AbstractNioChannel分析
AbstractChannel是Channel的基本實(shí)現(xiàn)類洛姑,那么AbstractNioChannel就是Channel基于選擇器的實(shí)現(xiàn)類上沐,它實(shí)現(xiàn)了核心的將Channel注冊(cè)到Selector的功能。
首先看下成員變量
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;//JDK NIO SelectableChannel
protected final int readInterestOp;//JDK selectionKey 的OP_READ
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
核心注冊(cè)功能源碼:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
定義布爾類型變量selected標(biāo)識(shí)是否注冊(cè)成功楞艾,調(diào)用SelectableChannel的register將當(dāng)前channel注冊(cè)到EventLoop的多路復(fù)用器Selector上参咙。
其中SelectableChannel的注冊(cè)方法定義如下:
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
第二個(gè)參數(shù)是 指定監(jiān)聽的網(wǎng)絡(luò)操作位,表示channel對(duì)哪幾類網(wǎng)絡(luò)事件感興趣硫眯,具體定義如下:
public static final int OP_READ = 1 << 0; //讀事件
public static final int OP_WRITE = 1 << 2;//寫事件
public static final int OP_CONNECT = 1 << 3;//客戶端連接服務(wù)端事件
public static final int OP_ACCEPT = 1 << 4;//服務(wù)端接收客戶端連接事件
其中注冊(cè)時(shí)傳的ops等于0蕴侧,表示不對(duì)任何事件感興趣,只是完成注冊(cè)操作两入,注冊(cè)成功之后返回selectionKey,通過selectionKey可以獲取注冊(cè)的channel.
如果注冊(cè)返回的selectionKey被取消净宵,則拋出CancelledKeyException異常,如果是第一次拋異常則調(diào)用selectNow將取消的selectionKey刪除并將selected置為true并再次注冊(cè)裹纳,如果仍未成功則拋出CancelledKeyException異常择葡。
處理讀操作之前設(shè)置網(wǎng)絡(luò)操作位為讀
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
先判斷Channel是否關(guān)閉,如果處于關(guān)閉則直接返回痊夭,然后獲取當(dāng)前的SelectKey的操作位與讀操作位位于刁岸,如果結(jié)果為0標(biāo)識(shí)沒有設(shè)置過讀操作位,最后通過或設(shè)置讀操作位
AbstractNioByteChannel分析
AbstractNioByteChannel是Channel操作字節(jié)的實(shí)現(xiàn)她我,核心代碼是protected void doWrite(ChannelOutboundBuffer in)
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
for (;;) {
Object msg = in.current(true);
if (msg == null) {
// Wrote all messages.
clearOpWrite();
break;
}
首先從ChannelOutboundBuffer環(huán)形數(shù)組中讀取數(shù)據(jù)虹曙,如果數(shù)據(jù)為空迫横,調(diào)用clearOpWrite清除讀標(biāo)志位
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
如果數(shù)據(jù)不為空,判斷是否是ByteBuf類型酝碳,若是強(qiáng)制轉(zhuǎn)換為ByteBuf矾踱,并判斷ByteBuf中是否有可讀字節(jié),如果沒有則將該消息從數(shù)組中刪除疏哗,并繼續(xù)處理其他消息呛讲。
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
設(shè)置半包標(biāo)志setOpWrite,消息是否全部發(fā)送標(biāo)識(shí)done返奉,發(fā)送總消息數(shù)flushedAmount贝搁,對(duì)循環(huán)發(fā)送次數(shù)writeSpinCount判斷,如果為-1從channel配置重獲取循環(huán)發(fā)送次數(shù)芽偏,調(diào)用doWriteBytes進(jìn)行循環(huán)發(fā)送雷逆,如果本次發(fā)送的字節(jié)數(shù)為0那么說明TCP緩沖區(qū)已滿,所以講寫半包標(biāo)識(shí)置為true并跳出循環(huán)污尉。
如果發(fā)送字節(jié)數(shù)大于0那么對(duì)發(fā)送字節(jié)數(shù)進(jìn)行累加膀哲,如果當(dāng)前buf沒有可讀字節(jié)數(shù)了則標(biāo)識(shí)buf寫入完成,設(shè)置消息發(fā)送標(biāo)識(shí)為true并跳出循環(huán)被碗。
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
調(diào)用progress更新進(jìn)度信息某宪,如果消息已完全發(fā)送則將該buf從環(huán)形數(shù)組中移除,否則調(diào)用incompleteWrite方法
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
如果寫半包標(biāo)識(shí)setOpWrite為true,調(diào)用setOpWrite()重新設(shè)置寫操作位setOpWrite()
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
SelectKey設(shè)置為OP_WRITE后锐朴,Selector會(huì)不斷輪詢對(duì)應(yīng)的Channel處理沒有發(fā)送完成的半包消息兴喂,直到清除OP_WRITE標(biāo)志為止。
如果沒有設(shè)置半包標(biāo)識(shí)焚志,則需要新起一個(gè)線程來處理瞻想。