Netty學(xué)習(xí)筆記七-Channel分析

Channel介紹

Channel是JDK 的NIO類庫中的重要組成部分版姑,我們?cè)谥暗拇a中也經(jīng)常用到io.netty.channel.socket.nio.NioSocketChannel和io.netty.channel.socket.nio.NioServerSocketChannel用于阻塞性IO操作
NioServerSocketChannel類的繼承圖


image.png

從上可以看出,Channel是最頂層的抽象戴卜,一個(gè)Channel抽象為網(wǎng)絡(luò)socket的連接或者讀寫魄懂,連接飘言,綁定IO的一個(gè)組件峦睡。
一個(gè)Channel需要提供給用戶:
1韧骗、channel當(dāng)前的狀態(tài)(打開還是已連接?)
2零聚、channel的配置參數(shù)(channelConfig)
3袍暴、channel支持哪些IO操作
4、處理channel IO操作的事件
在Netty中所有的IO操作都是異步的隶症,也就意味著IO的請(qǐng)求會(huì)立刻返回政模,并不能保證請(qǐng)求是否已完成。在Netty中IO請(qǐng)求會(huì)返回ChannelFuture實(shí)例蚂会。
Channel是有層級(jí)的淋样,例如,對(duì)于服務(wù)端而言胁住,父channel為空趁猴,對(duì)于客戶端NioSocketChannel,它的父Channel就是創(chuàng)建它的ServerSocketChannel.

Channel主要API方法

下面對(duì)我們對(duì)Channel的主要API做簡(jiǎn)單介紹
Channel read() 從channel中讀取數(shù)據(jù)到第一個(gè)buffer彪见,如果數(shù)據(jù)被成功讀取觸發(fā)channelRead事件儡司,如果數(shù)據(jù)被讀取完成會(huì)觸發(fā)channelReadComplete事件,如果有讀操作被掛起余指,那么后續(xù)讀操作會(huì)取消捕犬。
ChannelFuture write(Object msg) 將當(dāng)前msg通過ChannelPipeLine寫入到Channel,這個(gè)方法不會(huì)真正執(zhí)行flush操作,所以當(dāng)寫入完成后要執(zhí)行flush()方法才能將數(shù)據(jù)真正發(fā)送出去碉碉。
write(Object msg, ChannelPromise promise) 跟write功能一樣柴钻,只是寫入完成后會(huì)回調(diào)promise。
Channel flush() 將channel中所有緩存消息全部寫入到目標(biāo)channel垢粮,發(fā)送給通信對(duì)方贴届。
ChannelFuture writeAndFlush(Object msg) 作用等于write+flush
ChannelFuture disconnect() 請(qǐng)求與遠(yuǎn)程通信對(duì)端斷開連接,這個(gè)操作會(huì)級(jí)聯(lián)觸發(fā)ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise)事件
ChannelFuture connect(SocketAddress remoteAddress) 使用指定的remoteAddress發(fā)起連接請(qǐng)求足丢,并且返回ChannelFuture對(duì)象粱腻,如果連接超時(shí)會(huì)ChannelFuture返回操作結(jié)果是ConnectTimeoutException,如果連接被拒絕操作結(jié)果是ConnectException斩跌,該操作會(huì)觸發(fā)ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)事件绍些。
EventLoop eventLoop() 返回channel注冊(cè)的eventLoop
ChannelConfig config() 返回channel的配置
ChannelMetadata metadata() 返回當(dāng)前channel的元數(shù)據(jù)信息,比如TCP參數(shù)配置耀鸦。
SocketAddress localAddress() 返回channel綁定的本地地址
SocketAddress remoteAddress() 返回channel通信的遠(yuǎn)程地址
Channel parent() 返回父級(jí)channel

AbstractChannel分析

AbstractChannel是Channel的基本實(shí)現(xiàn)類柬批,它采用聚合的方式封裝了各種功能,從成員變量聚合了以下內(nèi)容:
private final Channel parent :父級(jí)channel
private final ChannelId id = DefaultChannelId.newInstance() 全局唯一id
private final Unsafe unsafe : unsafe示例
private final DefaultChannelPipeline pipeline; 當(dāng)前channle對(duì)應(yīng)的DefaultChannelPipeline
private final EventLoop eventLoop; 當(dāng)前channel注冊(cè)的EventLoop
前面提到channel的IO操作會(huì)觸發(fā)會(huì)產(chǎn)生對(duì)應(yīng)的IO事件袖订,然后事件在ChannelPipeLine中傳播氮帐,并由對(duì)應(yīng)的ChannleHandler處理。
AbstractChannel的IO操作直接調(diào)用DefaultChannelPipeline的相關(guān)方法由DefaultChannelPipeline處理相關(guān)邏輯

@Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }
 @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }
……

AbstractNioChannel分析

AbstractChannel是Channel的基本實(shí)現(xiàn)類洛姑,那么AbstractNioChannel就是Channel基于選擇器的實(shí)現(xiàn)類上沐,它實(shí)現(xiàn)了核心的將Channel注冊(cè)到Selector的功能。
首先看下成員變量

private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(AbstractNioChannel.class);

    private final SelectableChannel ch;//JDK NIO SelectableChannel
    protected final int readInterestOp;//JDK selectionKey 的OP_READ
    private volatile SelectionKey selectionKey;
    private volatile boolean inputShutdown;

    /**
     * The future of the current connection attempt.  If not null, subsequent
     * connection attempts will fail.
     */
    private ChannelPromise connectPromise;
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

核心注冊(cè)功能源碼:

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

定義布爾類型變量selected標(biāo)識(shí)是否注冊(cè)成功楞艾,調(diào)用SelectableChannel的register將當(dāng)前channel注冊(cè)到EventLoop的多路復(fù)用器Selector上参咙。
其中SelectableChannel的注冊(cè)方法定義如下:

public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;

第二個(gè)參數(shù)是 指定監(jiān)聽的網(wǎng)絡(luò)操作位,表示channel對(duì)哪幾類網(wǎng)絡(luò)事件感興趣硫眯,具體定義如下:

    public static final int OP_READ = 1 << 0; //讀事件
    public static final int OP_WRITE = 1 << 2;//寫事件
    public static final int OP_CONNECT = 1 << 3;//客戶端連接服務(wù)端事件
    public static final int OP_ACCEPT = 1 << 4;//服務(wù)端接收客戶端連接事件

其中注冊(cè)時(shí)傳的ops等于0蕴侧,表示不對(duì)任何事件感興趣,只是完成注冊(cè)操作两入,注冊(cè)成功之后返回selectionKey,通過selectionKey可以獲取注冊(cè)的channel.
如果注冊(cè)返回的selectionKey被取消净宵,則拋出CancelledKeyException異常,如果是第一次拋異常則調(diào)用selectNow將取消的selectionKey刪除并將selected置為true并再次注冊(cè)裹纳,如果仍未成功則拋出CancelledKeyException異常择葡。
處理讀操作之前設(shè)置網(wǎng)絡(luò)操作位為讀

protected void doBeginRead() throws Exception {
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

先判斷Channel是否關(guān)閉,如果處于關(guān)閉則直接返回痊夭,然后獲取當(dāng)前的SelectKey的操作位與讀操作位位于刁岸,如果結(jié)果為0標(biāo)識(shí)沒有設(shè)置過讀操作位,最后通過或設(shè)置讀操作位

AbstractNioByteChannel分析

AbstractNioByteChannel是Channel操作字節(jié)的實(shí)現(xiàn)她我,核心代碼是protected void doWrite(ChannelOutboundBuffer in)

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        for (;;) {
            Object msg = in.current(true);
            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                break;
            }

首先從ChannelOutboundBuffer環(huán)形數(shù)組中讀取數(shù)據(jù)虹曙,如果數(shù)據(jù)為空迫横,調(diào)用clearOpWrite清除讀標(biāo)志位

if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

如果數(shù)據(jù)不為空,判斷是否是ByteBuf類型酝碳,若是強(qiáng)制轉(zhuǎn)換為ByteBuf矾踱,并判斷ByteBuf中是否有可讀字節(jié),如果沒有則將該消息從數(shù)組中刪除疏哗,并繼續(xù)處理其他消息呛讲。

boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        done = true;
                        break;
                    }
                }

設(shè)置半包標(biāo)志setOpWrite,消息是否全部發(fā)送標(biāo)識(shí)done返奉,發(fā)送總消息數(shù)flushedAmount贝搁,對(duì)循環(huán)發(fā)送次數(shù)writeSpinCount判斷,如果為-1從channel配置重獲取循環(huán)發(fā)送次數(shù)芽偏,調(diào)用doWriteBytes進(jìn)行循環(huán)發(fā)送雷逆,如果本次發(fā)送的字節(jié)數(shù)為0那么說明TCP緩沖區(qū)已滿,所以講寫半包標(biāo)識(shí)置為true并跳出循環(huán)污尉。

如果發(fā)送字節(jié)數(shù)大于0那么對(duì)發(fā)送字節(jié)數(shù)進(jìn)行累加膀哲,如果當(dāng)前buf沒有可讀字節(jié)數(shù)了則標(biāo)識(shí)buf寫入完成,設(shè)置消息發(fā)送標(biāo)識(shí)為true并跳出循環(huán)被碗。

in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    incompleteWrite(setOpWrite);
                    break;
                }

調(diào)用progress更新進(jìn)度信息某宪,如果消息已完全發(fā)送則將該buf從環(huán)形數(shù)組中移除,否則調(diào)用incompleteWrite方法

protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();
        } else {
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }

如果寫半包標(biāo)識(shí)setOpWrite為true,調(diào)用setOpWrite()重新設(shè)置寫操作位setOpWrite()

 protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

SelectKey設(shè)置為OP_WRITE后锐朴,Selector會(huì)不斷輪詢對(duì)應(yīng)的Channel處理沒有發(fā)送完成的半包消息兴喂,直到清除OP_WRITE標(biāo)志為止。
如果沒有設(shè)置半包標(biāo)識(shí)焚志,則需要新起一個(gè)線程來處理瞻想。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市娩嚼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌滴肿,老刑警劉巖岳悟,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異泼差,居然都是意外死亡贵少,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門堆缘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來滔灶,“玉大人,你說我怎么就攤上這事吼肥÷计剑” “怎么了麻车?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)斗这。 經(jīng)常有香客問我动猬,道長(zhǎng),這世上最難降的妖魔是什么表箭? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任赁咙,我火速辦了婚禮,結(jié)果婚禮上免钻,老公的妹妹穿的比我還像新娘彼水。我一直安慰自己,他們只是感情好极舔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布凤覆。 她就那樣靜靜地躺著,像睡著了一般姆怪。 火紅的嫁衣襯著肌膚如雪叛赚。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天稽揭,我揣著相機(jī)與錄音俺附,去河邊找鬼。 笑死溪掀,一個(gè)胖子當(dāng)著我的面吹牛事镣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播揪胃,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼璃哟,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了喊递?” 一聲冷哼從身側(cè)響起随闪,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤驶悟,失蹤者是張志新(化名)和其女友劉穎朽肥,沒想到半個(gè)月后翔怎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體稚叹,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡汗茄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年答倡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了董济。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片踱阿。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡泽疆,死狀恐怖户矢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情殉疼,我是刑警寧澤梯浪,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布捌年,位于F島的核電站,受9級(jí)特大地震影響驱证,放射性物質(zhì)發(fā)生泄漏延窜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一抹锄、第九天 我趴在偏房一處隱蔽的房頂上張望逆瑞。 院中可真熱鬧,春花似錦伙单、人聲如沸获高。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽念秧。三九已至,卻和暖如春布疼,著一層夾襖步出監(jiān)牢的瞬間摊趾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國打工游两, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留砾层,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓贱案,卻偏偏與公主長(zhǎng)得像肛炮,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子宝踪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容