Netty之Channel

分析的netty版本4.1.17

一鲸伴、Channel

Netty的抽象了一個(gè)頂層接口Channel相比原來NIO提供的Channel有更多的功能环戈,當(dāng)然也是相對復(fù)雜的筛婉。

1. Channel的功能

1.1 網(wǎng)絡(luò)的IO操作

網(wǎng)絡(luò)的IO操作包含read,write,flush,close,disconnect,connect,bind,config,localAddress,remoteAdress等IO的功能操作般堆。

1.2 其他功能

  • eventLoop():這個(gè)比較重要耿芹,channel是需要注冊到eventLoop多路復(fù)用器上的,通過這個(gè)方法可以獲取當(dāng)前channel所注冊的eventLoop浆竭;當(dāng)然eventLoop除了處理IO操作還可以執(zhí)行定時(shí)任務(wù)和自定義的NIOTask浸须。
  • metadata():在Netty中每一個(gè)channel都對應(yīng)一個(gè)物理連接,這個(gè)元數(shù)據(jù)表示的就是每一個(gè)連接對應(yīng)的TCP參數(shù)配置邦泄,通過這個(gè)方法可以獲取相對應(yīng)的配置信息删窒。

  • parent():對于服務(wù)端Channel來講,它的父channel是空顺囊,而客戶端的channel肌索,它的父channel就是創(chuàng)建它的ServerSocketChannel.

  • id():這個(gè)返回的是ChannelId對象,它是由mac地址特碳,進(jìn)程id诚亚,自增序列,系統(tǒng)時(shí)間數(shù)午乓,隨機(jī)數(shù)等構(gòu)成的站宗。

2.Channel結(jié)構(gòu)和源碼

2.1NioServerSocketChannel繼承結(jié)構(gòu)

YqoDb9.png

2.2 NioSocketChannel繼承結(jié)構(gòu)

YqoBDJ.png

簡單的看看上面兩個(gè)圖的,做下對比:

兩個(gè)相同之處很明顯AbstractChannel---->AbstractNioChannel及DefaultAttributeMap

主要不同點(diǎn)是 NioSocketChannel繼承的是AbstractNioByteChannel接口是SockerChannel;NioServerSocketChannle繼承是的AbstractNioMessageChannle以及實(shí)現(xiàn)接口ServerSocketChannel

2.3 channel的生命周期

Netty 有一個(gè)簡單但強(qiáng)大的狀態(tài)模型益愈,并完美映射到ChannelInboundHandler 的各個(gè)方法梢灭。下面是Channel 生命周期中四個(gè)不同的狀態(tài):

狀態(tài)描述

  • channelUnregistered() :Channel已創(chuàng)建,還未注冊到一個(gè)EventLoop上

  • channelRegistered(): Channel已經(jīng)注冊到一個(gè)EventLoop上

  • channelActive() :Channel是活躍狀態(tài)(連接到某個(gè)遠(yuǎn)端),可以收發(fā)數(shù)據(jù)

  • channelInactive(): Channel未連接到遠(yuǎn)端

一個(gè)Channel 正常的生命周期如下圖所示敏释。隨著狀態(tài)發(fā)生變化相應(yīng)的事件產(chǎn)生库快。這些事件被轉(zhuǎn)發(fā)到ChannelPipeline中的ChannelHandler 來觸發(fā)相應(yīng)的操作。

image

2.4 相關(guān)源碼

1)AbstractChannel

列出了主要的成員變量颂暇,和主要網(wǎng)絡(luò)IO操作的實(shí)現(xiàn)

???重點(diǎn)看了下網(wǎng)絡(luò)讀寫操作缺谴,網(wǎng)絡(luò)I/O操作時(shí)講到它會觸發(fā)ChannelPipeline中對應(yīng)的事件方法。Netty 基于事件驅(qū)動耳鸯,我們也可以理解為當(dāng)Chnanel進(jìn)行I/O操作時(shí)會產(chǎn)生對應(yīng)的I/O事件湿蛔,然后驅(qū)動事件在ChannelPipeline中傳播,由對應(yīng)的ChannelHandler對事件進(jìn)行攔截和處理县爬,不關(guān)心的事件可以直接忽略阳啥。采用事件驅(qū)動的方式可以非常輕松地通過事件定義來劃分事件攔截切面,方便業(yè)務(wù)的定制和功能擴(kuò)展财喳,相比AOP察迟,其性能更高,但是功能卻基本等價(jià)耳高。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);

    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");

    private final Channel parent;     //父channel
    private final ChannelId id;        //channel 的唯一id
    private final Unsafe unsafe;     //unsafe底層io操作應(yīng)用
    private final DefaultChannelPipeline pipeline;  //執(zhí)行channel鏈
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;  
    private volatile SocketAddress remoteAddress;
    private volatile EventLoop eventLoop;   //channel所注冊的eventLoop
    private volatile boolean registered;       //變量是否完成注冊
    private boolean closeInitiated;

    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();    
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
  ...
    //主要的IO操作扎瓶,發(fā)先都是通過pipeline事件傳播實(shí)現(xiàn)
    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }


   @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return pipeline.writeAndFlush(msg, promise);
    }

    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

2) AbstractNioChannel

會使用到nio的相關(guān)類,Selector做相關(guān)操作位的使用

public abstract class AbstractNioChannel extends AbstractChannel {

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractNioChannel.class, "doClose()");

    private final SelectableChannel ch; //Socketchannle和ServerSocketChannel的公共操作類泌枪,用來設(shè)置SelectableChannel相關(guān)參數(shù)和IO操作
    protected final int readInterestOp;    //代表JDK的SelectionKey的OP_READ
    volatile SelectionKey selectionKey;  //JDK的selectionKey
    boolean readPending;
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise; 
    private ScheduledFuture<?> connectTimeoutFuture;   //連接超時(shí)定時(shí)器future
    private SocketAddress requestedRemoteAddress;   //請求通信地址

核心API
A:doRegister() :定義一個(gè)布爾類型的局部變量selected來標(biāo)識注冊操作是否成功概荷,調(diào)用nio的AbstractSelectableChannel的register方法,將當(dāng)前的Channel注冊到EventLoop的多路復(fù)用器selector上碌燕。

  //核心操作误证,注冊操作

 //1) 如果當(dāng)前注冊返回的selectionKey已經(jīng)被取消,則拋出CancelledKeyException異常修壕,捕獲該異常進(jìn)行處理愈捅。
// 2) 如果是第一次處理該異常,調(diào)用多路復(fù)用器的selectNow()方法將已經(jīng)取消的selectionKey從多路復(fù)用器中刪除掉慈鸠。操作成功之后蓝谨,將selected置為true, 說明之前失效的selectionKey已經(jīng)被刪除掉青团。繼續(xù)發(fā)起下一次注冊操作像棘,如果成功則退出,
//3) 如果仍然發(fā)生CancelledKeyException異常,說明我們無法刪除已經(jīng)被取消的selectionKey,按照J(rèn)DK的API說明,這種意外不應(yīng)該發(fā)生壶冒。如果發(fā)生這種問題,則說明可能NIO的相關(guān)類庫存在不可恢復(fù)的BUG,直接拋出CancelledKeyException異常到上層進(jìn)行統(tǒng)一處理截歉。

  
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

}

channel注冊時(shí)候通過操作位表示對某個(gè)事件感興趣:

//SelectionKey
public static final int OP_READ = 1 << 0;   //讀操作位
public static final int OP_WRITE = 1 << 2;  //寫操作位
public static final int OP_CONNECT = 1 << 3;  //客戶端連接操作位
public static final int OP_ACCEPT = 1 << 4;  //服務(wù)端接受連接操作位

//如果注冊的操作位為0表示只是完成注冊功能胖腾,說明對任何事件都不感興趣

注冊時(shí)可以指定附件,后續(xù)獲取到事件通知時(shí)可以從SelectionKey中獲取到附件,上面是將當(dāng)前AbstractNioSocket實(shí)現(xiàn)子類自身當(dāng)做附件咸作,如果注冊成功則可以通過返回的SelectionKey從多路復(fù)用器中獲取channel對象锨阿。

B:doBeginRead() 讀之前的準(zhǔn)備

  @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;   //key無效的話直接返回
        }

        readPending = true;  //表示讀pending中

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {  //表示當(dāng)前沒有讀操作位
            selectionKey.interestOps(interestOps | readInterestOp);  //設(shè)置讀操作位
        }

    
    }

  //SelectionKey中定義的是否可讀操作
  public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

3) AbstractNioByteChannel

先看成員變量

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    private static final String EXPECTED_TYPES =
            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
            StringUtil.simpleClassName(FileRegion.class) + ')';

    private Runnable flushTask;  //flush工作的task任務(wù),主要是繼續(xù)寫半包消息

}

接下來看核心API doWrite操作

配置中設(shè)置循環(huán)次數(shù)是避免半包中數(shù)據(jù)量過大,IO線程一直嘗試寫操作记罚,此時(shí)IO線程無法處理其他IO操作或者定時(shí)任務(wù)墅诡,比如新的消息或者定時(shí)任務(wù),如果網(wǎng)絡(luò)IO慢或者對方讀取慢等造成IO線程假死的狀態(tài)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1; //        寫自選次數(shù)

        boolean setOpWrite = false;  //寫操作位為0
        for (;;) {
            Object msg = in.current();
            if (msg == null) {  //從環(huán)形數(shù)組ChannelOutboundBuffer彈出一條消息桐智,如果為null末早,表示消息已經(jīng)發(fā)送完成,
                // Wrote all messages.
                clearOpWrite();  //清除寫標(biāo)志位说庭,退出循環(huán)
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) { //如果可讀字節(jié)為0然磷,則丟棄該消息,循環(huán)處理其他消息
                    in.remove();
                    continue;
                }

                boolean done = false;    //消息是否全部發(fā)送完畢表示
                long flushedAmount = 0;  //發(fā)送的字節(jié)數(shù)量
                if (writeSpinCount == -1) {
                    //如果為-1的時(shí)候從配置中獲取寫循環(huán)次數(shù)
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf);  //由子類實(shí)現(xiàn)寫
                    if (localFlushedAmount == 0) {  //這里表示本次發(fā)送字節(jié)為0刊驴,發(fā)送TCP緩沖區(qū)滿了姿搜,所以此時(shí)為了避免空循環(huán)一直發(fā)送,這里就將半包寫表示設(shè)置為true并退出循環(huán)
                        setOpWrite = true;
                        break;
                    }
                    //發(fā)送成功就對發(fā)送的字節(jié)計(jì)數(shù)
                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) { //如果沒有可讀字節(jié)捆憎,表示已經(jīng)發(fā)送完畢
                        done = true; //表示發(fā)送完成舅柜,并退出循環(huán)
                        break;
                    }
                }
                //通知promise當(dāng)前寫的進(jìn)度
                in.progress(flushedAmount); 

                if (done) {  //如果發(fā)送完成,移除緩沖的數(shù)據(jù)
                    in.remove();
                } else {
                    如果沒有完成會調(diào)用incompleteWrite方法
                    // Break the loop and so incompleteWrite(...) is called.
                    break;
                }
            } else if (msg instanceof FileRegion) {  //這個(gè)是文件傳輸和上面類似
                FileRegion region = (FileRegion) msg;
                boolean done = region.transferred() >= region.count();

                if (!done) {
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }

                    for (int i = writeSpinCount - 1; i >= 0; i--) {
                        long localFlushedAmount = doWriteFileRegion(region);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }

                        flushedAmount += localFlushedAmount;
                        if (region.transferred() >= region.count()) {
                            done = true;
                            break;
                        }
                    }

                    in.progress(flushedAmount);
                }

                if (done) {
                    in.remove();
                } else {
                    // Break the loop and so incompleteWrite(...) is called.
                    break;
                }
            } else {
                // Should not reach here.
                throw new Error();
            }
        }
        //如果沒有完成寫看看需要做的事情
        incompleteWrite(setOpWrite);
    }

//未完成寫操作躲惰,看看操作
  protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {  //如果當(dāng)前的寫操作位true致份,那么當(dāng)前多路復(fù)用器繼續(xù)輪詢處理
            setOpWrite();
        } else {  //否則重新新建一個(gè)task任務(wù),讓eventLoop后面點(diǎn)執(zhí)行flush操作礁扮,這樣其他任務(wù)才能夠執(zhí)行
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }

4) AbstractNioMessageChannel

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    boolean inputShutdown;  //只有一個(gè)成員變量知举,表示是否數(shù)據(jù)讀取完畢

}

主要實(shí)現(xiàn)方法是doWrite

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }

                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            } catch (Exception e) {
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throw e;
                }
            }
        }
    }

通過代碼分析我們發(fā)現(xiàn),AbstractNioMessageChannel 和AbstractNioByteChannel的消息發(fā)送實(shí)現(xiàn)比較相似太伊,不同之處在于:一個(gè)發(fā)送的是ByteBuf或者FileRegion雇锡,它們可以直接被發(fā)送;另一個(gè)發(fā)送的則是POJO對象。

5) NioServerSocketChannel

先看成員變量

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);  //有channel的元數(shù)據(jù)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a >#2308</a>.
             */
            return provider.openServerSocketChannel();  //打開ServerSocketChannel
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

    private final ServerSocketChannelConfig config;  //用于配置serversocketchannel的tcp相關(guān)參數(shù)

再看看對應(yīng)的接口實(shí)現(xiàn)操作:

    @Override
    public boolean isActive() {
        return javaChannel().socket().isBound(); //判斷端口是否屬于綁定狀態(tài)S
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }

    @Override
    protected SocketAddress localAddress0() {
        return SocketUtils.localSocketAddress(javaChannel().socket());
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());  
        } else {
            //綁定端口以及最大接受的客戶端數(shù)量
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

看重點(diǎn)的API接口
doReadMessage():下面很明顯通過Nio的接受客戶端連接并新建一個(gè)NioSocketChannel并封裝父類和nio的SocketChannel放到buf中僚焦,返回1表示服務(wù)端接受成功

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }

剩下其他的一些connect锰提,remoteAddress0等是serverSocket不支持的所以調(diào)用直接拋異常。

6) NioSocketChannel

這個(gè)類相對比較重要芳悲,通信主要是它實(shí)現(xiàn)的立肘。
先看成員變量

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

 private final SocketChannelConfig config;  //這個(gè)是socketchannel配置信息

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
             *
             *  See <a >#2308</a>.
             */
            return provider.openSocketChannel();  //open一個(gè)soketChannel
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }

   

A: connect操作

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);  //先看本地地址是否為null,不為空直接綁定
        }

        boolean success = false;
        try {
            //連接遠(yuǎn)程地址名扛,
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) { //  連接沒有應(yīng)答谅年,再次 注冊連接連接標(biāo)識位
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected; //返回連接失敗
        } finally {   //如果服務(wù)端拒絕或者REST拋出連接異常,則直接關(guān)閉連接
            if (!success) {
                doClose();
            }
        }
    }

B:doWrite 看寫標(biāo)識

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {  //flushed字節(jié)為0直接清除可寫標(biāo)識位,標(biāo)識沒有可寫的肮韧。
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;   //已經(jīng)寫出的字節(jié)數(shù)
            boolean done = false;   //是否寫完
            boolean setOpWrite = false;  //寫標(biāo)識

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:  //標(biāo)識沒有可寫
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                  //默認(rèn)的寫方法
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }

這個(gè)是AbstractNioByteChannel的寫類似融蹂。

C:讀寫
具體的讀寫操作還是如下旺订,還是比較簡單的。

   @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

3.Unsafe

Unsafe就是channel的輔助接口超燃,我們實(shí)際的IO操作最后還是交給Unsafe操作区拳,Unsafe接口的定義就是放在Channel中的;具體如下:


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

  .....
    /**
     * Returns an <em>internal-use-only</em> object that provides unsafe operations.
     */
    Unsafe unsafe();

    interface Unsafe {

        /**
         * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
         * receiving data.
         */
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        /**
         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
         */
        SocketAddress localAddress();

        /**
         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
         */
        SocketAddress remoteAddress();

        /**
         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
         */
        void register(EventLoop eventLoop, ChannelPromise promise);

        /**
         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);

        /**
         * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
         * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
         * pass {@code null} to it.
         *
         * The {@link ChannelPromise} will get notified once the connect operation was complete.
         */
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

        /**
         * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void disconnect(ChannelPromise promise);

        /**
         * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
         * operation was complete.
         */
        void close(ChannelPromise promise);

        /**
         * Closes the {@link Channel} immediately without firing any events.  Probably only useful
         * when registration attempt failed.
         */
        void closeForcibly();

        /**
         * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
         * {@link ChannelPromise} once the operation was complete.
         */
        void deregister(ChannelPromise promise);

        /**
         * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
         * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
         */
        void beginRead();

        /**
         * Schedules a write operation.
         */
        void write(Object msg, ChannelPromise promise);

        /**
         * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
         */
        void flush();

        /**
         * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
         * It will never be notified of a success or error and so is only a placeholder for operations
         * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
         */
        ChannelPromise voidPromise();

        /**
         * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
         */
        ChannelOutboundBuffer outboundBuffer();
    }
}

總結(jié)下:
netty自定義了channel接口,通過組合的jdk的channel實(shí)現(xiàn)IO操作操作功能意乓;當(dāng)然channel需要注冊到eventLoop的多路復(fù)用器上樱调。一個(gè)channel對應(yīng)一條實(shí)際的物理連接;這里主要詳解了NioServersocketChannel和NioSocketChannel届良。下一章節(jié)我們看看EventLoop的實(shí)現(xiàn)細(xì)節(jié)

參考《Netty權(quán)威指南》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末笆凌,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子伙窃,更是在濱河造成了極大的恐慌菩颖,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件为障,死亡現(xiàn)場離奇詭異晦闰,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)鳍怨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進(jìn)店門呻右,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人鞋喇,你說我怎么就攤上這事声滥。” “怎么了侦香?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵落塑,是天一觀的道長。 經(jīng)常有香客問我罐韩,道長憾赁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任散吵,我火速辦了婚禮龙考,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘矾睦。我一直安慰自己晦款,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布枚冗。 她就那樣靜靜地躺著缓溅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赁温。 梳的紋絲不亂的頭發(fā)上肛宋,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天州藕,我揣著相機(jī)與錄音,去河邊找鬼酝陈。 笑死,一個(gè)胖子當(dāng)著我的面吹牛毁涉,可吹牛的內(nèi)容都是我干的沉帮。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼贫堰,長吁一口氣:“原來是場噩夢啊……” “哼穆壕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起其屏,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤喇勋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后偎行,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體川背,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年蛤袒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了熄云。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,680評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡妙真,死狀恐怖缴允,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情珍德,我是刑警寧澤练般,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站锈候,受9級特大地震影響薄料,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜晴及,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一都办、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧虑稼,春花似錦琳钉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至溯壶,卻和暖如春及皂,著一層夾襖步出監(jiān)牢的瞬間甫男,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工验烧, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留板驳,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓碍拆,卻偏偏與公主長得像若治,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子感混,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評論 2 361