分析的netty版本4.1.17
一鲸伴、Channel
Netty的抽象了一個(gè)頂層接口Channel相比原來NIO提供的Channel有更多的功能环戈,當(dāng)然也是相對復(fù)雜的筛婉。
1. Channel的功能
1.1 網(wǎng)絡(luò)的IO操作
網(wǎng)絡(luò)的IO操作包含read,write,flush,close,disconnect,connect,bind,config,localAddress,remoteAdress等IO的功能操作般堆。
1.2 其他功能
- eventLoop():這個(gè)比較重要耿芹,channel是需要注冊到eventLoop多路復(fù)用器上的,通過這個(gè)方法可以獲取當(dāng)前channel所注冊的eventLoop浆竭;當(dāng)然eventLoop除了處理IO操作還可以執(zhí)行定時(shí)任務(wù)和自定義的NIOTask浸须。
metadata():在Netty中每一個(gè)channel都對應(yīng)一個(gè)物理連接,這個(gè)元數(shù)據(jù)表示的就是每一個(gè)連接對應(yīng)的TCP參數(shù)配置邦泄,通過這個(gè)方法可以獲取相對應(yīng)的配置信息删窒。
parent():對于服務(wù)端Channel來講,它的父channel是空顺囊,而客戶端的channel肌索,它的父channel就是創(chuàng)建它的ServerSocketChannel.
id():這個(gè)返回的是ChannelId對象,它是由mac地址特碳,進(jìn)程id诚亚,自增序列,系統(tǒng)時(shí)間數(shù)午乓,隨機(jī)數(shù)等構(gòu)成的站宗。
2.Channel結(jié)構(gòu)和源碼
2.1NioServerSocketChannel繼承結(jié)構(gòu)
2.2 NioSocketChannel繼承結(jié)構(gòu)
簡單的看看上面兩個(gè)圖的,做下對比:
兩個(gè)相同之處很明顯AbstractChannel---->AbstractNioChannel及DefaultAttributeMap
主要不同點(diǎn)是 NioSocketChannel繼承的是AbstractNioByteChannel接口是SockerChannel;NioServerSocketChannle繼承是的AbstractNioMessageChannle以及實(shí)現(xiàn)接口ServerSocketChannel
2.3 channel的生命周期
Netty 有一個(gè)簡單但強(qiáng)大的狀態(tài)模型益愈,并完美映射到ChannelInboundHandler 的各個(gè)方法梢灭。下面是Channel 生命周期中四個(gè)不同的狀態(tài):
狀態(tài)描述
channelUnregistered() :Channel已創(chuàng)建,還未注冊到一個(gè)EventLoop上
channelRegistered(): Channel已經(jīng)注冊到一個(gè)EventLoop上
channelActive() :Channel是活躍狀態(tài)(連接到某個(gè)遠(yuǎn)端),可以收發(fā)數(shù)據(jù)
channelInactive(): Channel未連接到遠(yuǎn)端
一個(gè)Channel 正常的生命周期如下圖所示敏释。隨著狀態(tài)發(fā)生變化相應(yīng)的事件產(chǎn)生库快。這些事件被轉(zhuǎn)發(fā)到ChannelPipeline中的ChannelHandler 來觸發(fā)相應(yīng)的操作。
2.4 相關(guān)源碼
1)AbstractChannel
列出了主要的成員變量颂暇,和主要網(wǎng)絡(luò)IO操作的實(shí)現(xiàn)
???重點(diǎn)看了下網(wǎng)絡(luò)讀寫操作缺谴,網(wǎng)絡(luò)I/O操作時(shí)講到它會觸發(fā)ChannelPipeline中對應(yīng)的事件方法。Netty 基于事件驅(qū)動耳鸯,我們也可以理解為當(dāng)Chnanel進(jìn)行I/O操作時(shí)會產(chǎn)生對應(yīng)的I/O事件湿蛔,然后驅(qū)動事件在ChannelPipeline中傳播,由對應(yīng)的ChannelHandler對事件進(jìn)行攔截和處理县爬,不關(guān)心的事件可以直接忽略阳啥。采用事件驅(qū)動的方式可以非常輕松地通過事件定義來劃分事件攔截切面,方便業(yè)務(wù)的定制和功能擴(kuò)展财喳,相比AOP察迟,其性能更高,但是功能卻基本等價(jià)耳高。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
private final Channel parent; //父channel
private final ChannelId id; //channel 的唯一id
private final Unsafe unsafe; //unsafe底層io操作應(yīng)用
private final DefaultChannelPipeline pipeline; //執(zhí)行channel鏈
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop; //channel所注冊的eventLoop
private volatile boolean registered; //變量是否完成注冊
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
...
//主要的IO操作扎瓶,發(fā)先都是通過pipeline事件傳播實(shí)現(xiàn)
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public Channel flush() {
pipeline.flush();
return this;
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}
@Override
public Channel read() {
pipeline.read();
return this;
}
2) AbstractNioChannel
會使用到nio的相關(guān)類,Selector做相關(guān)操作位的使用
public abstract class AbstractNioChannel extends AbstractChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
private final SelectableChannel ch; //Socketchannle和ServerSocketChannel的公共操作類泌枪,用來設(shè)置SelectableChannel相關(guān)參數(shù)和IO操作
protected final int readInterestOp; //代表JDK的SelectionKey的OP_READ
volatile SelectionKey selectionKey; //JDK的selectionKey
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture; //連接超時(shí)定時(shí)器future
private SocketAddress requestedRemoteAddress; //請求通信地址
核心API
A:doRegister() :定義一個(gè)布爾類型的局部變量selected來標(biāo)識注冊操作是否成功概荷,調(diào)用nio的AbstractSelectableChannel的register方法,將當(dāng)前的Channel注冊到EventLoop的多路復(fù)用器selector上碌燕。
//核心操作误证,注冊操作
//1) 如果當(dāng)前注冊返回的selectionKey已經(jīng)被取消,則拋出CancelledKeyException異常修壕,捕獲該異常進(jìn)行處理愈捅。
// 2) 如果是第一次處理該異常,調(diào)用多路復(fù)用器的selectNow()方法將已經(jīng)取消的selectionKey從多路復(fù)用器中刪除掉慈鸠。操作成功之后蓝谨,將selected置為true, 說明之前失效的selectionKey已經(jīng)被刪除掉青团。繼續(xù)發(fā)起下一次注冊操作像棘,如果成功則退出,
//3) 如果仍然發(fā)生CancelledKeyException異常,說明我們無法刪除已經(jīng)被取消的selectionKey,按照J(rèn)DK的API說明,這種意外不應(yīng)該發(fā)生壶冒。如果發(fā)生這種問題,則說明可能NIO的相關(guān)類庫存在不可恢復(fù)的BUG,直接拋出CancelledKeyException異常到上層進(jìn)行統(tǒng)一處理截歉。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
}
channel注冊時(shí)候通過操作位表示對某個(gè)事件感興趣:
//SelectionKey
public static final int OP_READ = 1 << 0; //讀操作位
public static final int OP_WRITE = 1 << 2; //寫操作位
public static final int OP_CONNECT = 1 << 3; //客戶端連接操作位
public static final int OP_ACCEPT = 1 << 4; //服務(wù)端接受連接操作位
//如果注冊的操作位為0表示只是完成注冊功能胖腾,說明對任何事件都不感興趣
注冊時(shí)可以指定附件,后續(xù)獲取到事件通知時(shí)可以從SelectionKey中獲取到附件,上面是將當(dāng)前AbstractNioSocket實(shí)現(xiàn)子類自身當(dāng)做附件咸作,如果注冊成功則可以通過返回的SelectionKey從多路復(fù)用器中獲取channel對象锨阿。
B:doBeginRead() 讀之前的準(zhǔn)備
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return; //key無效的話直接返回
}
readPending = true; //表示讀pending中
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) { //表示當(dāng)前沒有讀操作位
selectionKey.interestOps(interestOps | readInterestOp); //設(shè)置讀操作位
}
}
//SelectionKey中定義的是否可讀操作
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}
3) AbstractNioByteChannel
先看成員變量
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final String EXPECTED_TYPES =
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
StringUtil.simpleClassName(FileRegion.class) + ')';
private Runnable flushTask; //flush工作的task任務(wù),主要是繼續(xù)寫半包消息
}
接下來看核心API doWrite操作
配置中設(shè)置循環(huán)次數(shù)是避免半包中數(shù)據(jù)量過大,IO線程一直嘗試寫操作记罚,此時(shí)IO線程無法處理其他IO操作或者定時(shí)任務(wù)墅诡,比如新的消息或者定時(shí)任務(wù),如果網(wǎng)絡(luò)IO慢或者對方讀取慢等造成IO線程假死的狀態(tài)
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1; // 寫自選次數(shù)
boolean setOpWrite = false; //寫操作位為0
for (;;) {
Object msg = in.current();
if (msg == null) { //從環(huán)形數(shù)組ChannelOutboundBuffer彈出一條消息桐智,如果為null末早,表示消息已經(jīng)發(fā)送完成,
// Wrote all messages.
clearOpWrite(); //清除寫標(biāo)志位说庭,退出循環(huán)
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) { //如果可讀字節(jié)為0然磷,則丟棄該消息,循環(huán)處理其他消息
in.remove();
continue;
}
boolean done = false; //消息是否全部發(fā)送完畢表示
long flushedAmount = 0; //發(fā)送的字節(jié)數(shù)量
if (writeSpinCount == -1) {
//如果為-1的時(shí)候從配置中獲取寫循環(huán)次數(shù)
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf); //由子類實(shí)現(xiàn)寫
if (localFlushedAmount == 0) { //這里表示本次發(fā)送字節(jié)為0刊驴,發(fā)送TCP緩沖區(qū)滿了姿搜,所以此時(shí)為了避免空循環(huán)一直發(fā)送,這里就將半包寫表示設(shè)置為true并退出循環(huán)
setOpWrite = true;
break;
}
//發(fā)送成功就對發(fā)送的字節(jié)計(jì)數(shù)
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) { //如果沒有可讀字節(jié)捆憎,表示已經(jīng)發(fā)送完畢
done = true; //表示發(fā)送完成舅柜,并退出循環(huán)
break;
}
}
//通知promise當(dāng)前寫的進(jìn)度
in.progress(flushedAmount);
if (done) { //如果發(fā)送完成,移除緩沖的數(shù)據(jù)
in.remove();
} else {
如果沒有完成會調(diào)用incompleteWrite方法
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) { //這個(gè)是文件傳輸和上面類似
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
//如果沒有完成寫看看需要做的事情
incompleteWrite(setOpWrite);
}
//未完成寫操作躲惰,看看操作
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) { //如果當(dāng)前的寫操作位true致份,那么當(dāng)前多路復(fù)用器繼續(xù)輪詢處理
setOpWrite();
} else { //否則重新新建一個(gè)task任務(wù),讓eventLoop后面點(diǎn)執(zhí)行flush操作礁扮,這樣其他任務(wù)才能夠執(zhí)行
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
4) AbstractNioMessageChannel
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
boolean inputShutdown; //只有一個(gè)成員變量知举,表示是否數(shù)據(jù)讀取完畢
}
主要實(shí)現(xiàn)方法是doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
通過代碼分析我們發(fā)現(xiàn),AbstractNioMessageChannel 和AbstractNioByteChannel的消息發(fā)送實(shí)現(xiàn)比較相似太伊,不同之處在于:一個(gè)發(fā)送的是ByteBuf或者FileRegion雇锡,它們可以直接被發(fā)送;另一個(gè)發(fā)送的則是POJO對象。
5) NioServerSocketChannel
先看成員變量
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); //有channel的元數(shù)據(jù)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a >#2308</a>.
*/
return provider.openServerSocketChannel(); //打開ServerSocketChannel
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
private final ServerSocketChannelConfig config; //用于配置serversocketchannel的tcp相關(guān)參數(shù)
再看看對應(yīng)的接口實(shí)現(xiàn)操作:
@Override
public boolean isActive() {
return javaChannel().socket().isBound(); //判斷端口是否屬于綁定狀態(tài)S
}
@Override
public InetSocketAddress remoteAddress() {
return null;
}
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
@Override
protected SocketAddress localAddress0() {
return SocketUtils.localSocketAddress(javaChannel().socket());
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
//綁定端口以及最大接受的客戶端數(shù)量
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
看重點(diǎn)的API接口
doReadMessage():下面很明顯通過Nio的接受客戶端連接并新建一個(gè)NioSocketChannel并封裝父類和nio的SocketChannel放到buf中僚焦,返回1表示服務(wù)端接受成功
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
剩下其他的一些connect锰提,remoteAddress0等是serverSocket不支持的所以調(diào)用直接拋異常。
6) NioSocketChannel
這個(gè)類相對比較重要芳悲,通信主要是它實(shí)現(xiàn)的立肘。
先看成員變量
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private final SocketChannelConfig config; //這個(gè)是socketchannel配置信息
private static SocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
*
* See <a >#2308</a>.
*/
return provider.openSocketChannel(); //open一個(gè)soketChannel
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
A: connect操作
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress); //先看本地地址是否為null,不為空直接綁定
}
boolean success = false;
try {
//連接遠(yuǎn)程地址名扛,
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) { // 連接沒有應(yīng)答谅年,再次 注冊連接連接標(biāo)識位
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected; //返回連接失敗
} finally { //如果服務(wù)端拒絕或者REST拋出連接異常,則直接關(guān)閉連接
if (!success) {
doClose();
}
}
}
B:doWrite 看寫標(biāo)識
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) { //flushed字節(jié)為0直接清除可寫標(biāo)識位,標(biāo)識沒有可寫的肮韧。
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0; //已經(jīng)寫出的字節(jié)數(shù)
boolean done = false; //是否寫完
boolean setOpWrite = false; //寫標(biāo)識
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0: //標(biāo)識沒有可寫
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
//默認(rèn)的寫方法
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
}
// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}
這個(gè)是AbstractNioByteChannel的寫類似融蹂。
C:讀寫
具體的讀寫操作還是如下旺订,還是比較簡單的。
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
3.Unsafe
Unsafe就是channel的輔助接口超燃,我們實(shí)際的IO操作最后還是交給Unsafe操作区拳,Unsafe接口的定義就是放在Channel中的;具體如下:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
.....
/**
* Returns an <em>internal-use-only</em> object that provides unsafe operations.
*/
Unsafe unsafe();
interface Unsafe {
/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
*/
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or
* {@code null} if none is bound yet.
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass {@code null} to it.
*
* The {@link ChannelPromise} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void disconnect(ChannelPromise promise);
/**
* Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void close(ChannelPromise promise);
/**
* Closes the {@link Channel} immediately without firing any events. Probably only useful
* when registration attempt failed.
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
*/
void beginRead();
/**
* Schedules a write operation.
*/
void write(Object msg, ChannelPromise promise);
/**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/
void flush();
/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();
/**
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
*/
ChannelOutboundBuffer outboundBuffer();
}
}
總結(jié)下:
netty自定義了channel接口,通過組合的jdk的channel實(shí)現(xiàn)IO操作操作功能意乓;當(dāng)然channel需要注冊到eventLoop的多路復(fù)用器上樱调。一個(gè)channel對應(yīng)一條實(shí)際的物理連接;這里主要詳解了NioServersocketChannel和NioSocketChannel届良。下一章節(jié)我們看看EventLoop的實(shí)現(xiàn)細(xì)節(jié)
參考《Netty權(quán)威指南》