NettyClient與NettyServer通信流程

NettyClient端的channel為NioSocketChannel业踏,通過writeAndFlush方法將數據發(fā)送到NettyServer端划煮。

NioSocketChannel.writeAndFlush->AbstractChannel.writeAndFlush->pipeline.writeAndFlush->tail.writeAndFlush->AbstractChannelHandlerContext.writeAndFlush->AbstractChannelHandlerContext.write

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();(1)
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(msg, promise);(2)
        } else {
            next.invokeWrite(msg, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, msg, promise);
        }  else {
            task = WriteTask.newInstance(next, msg, promise);
        }
        safeExecute(executor, task, promise, msg);
    }
}

(1)處從TailContext向前找第一個outbounnd context灌灾,業(yè)務handler一般為inbound,因此會找到HeadContext卖毁。
(2)執(zhí)行HeadContext.invokeWriteAndFlush方法羡藐。

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

最終會調用HeadContext的write和flush方法贩毕,如下:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

先看unsafe的write方法:

@Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            msg = filterOutboundMessage(msg);
            /**
             * pipeline.estimatorHandle()會得到DefaultMessageSizeEstimator,再調用size(msg)方法用于計算該msg的可寫到網絡的字節(jié)的數量仆嗦,即該buf的可讀字節(jié)數量
             * 如果msg為ByteBuf辉阶,那么返回該buf的可讀字節(jié)數,即可以發(fā)送出去的字節(jié)數
             */
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }

        outboundBuffer.addMessage(msg, size, promise);(3)
    }

(3)處的方法如下瘩扼,主要是將msg放到ChannelOutboundBuffer的雙向鏈表里:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        /**
         * 添加第1個msg時睛藻,將添加的msg包成entry,作為鏈表tail
         * 因為entry還沒flush邢隧,所以flushedEntry設為null
         */
        flushedEntry = null;
        tailEntry = entry;
    } else {
        /**
         * tail不為null店印,說明已經有msg在之前add進來了
         * 將新add的entry連在之前tail的后面,作為新的tail
         */
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        /**
         * 第1次add的entry倒慧,作為unflushedEntry
         */
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

放入第1個entry1后按摘,雙向鏈表為:

    entry1->null
    指針指向為:
    flushedEntry->null
    unflushedEntry->entry1(本次添加的entry)
    tailEntry->entry1

放入第2個entry2后,雙向鏈表為:

    entry1->entry2->null
    指針指向為:
    flushedEntry->null
    unflushedEntry->entry1
    tailEntry->entry2(本次添加的entry)

再看unsafe的flush方法:

    @Override
    public final void flush() {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }

        outboundBuffer.addFlush();(4)
        flush0();(5)
    }  

(4)處的方法為:

public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

執(zhí)行addFlush前后纫谅,雙向鏈表的指針情況如下:

    addFlush前指針指向為:
    flushedEntry->null
    unflushedEntry->entry1
    tailEntry->entry1
    
    addFlush后指針指向為:
    flushedEntry->entry1
    unflushedEntry->null
    tailEntry->entry1

addFlush操作主要是從unflushedEntry開始鏈表后面掃炫贤,直到掃到null,并且置unflushedEntry為null付秕,將掃過的entry的promise的result都置為UNCANCELLABLE兰珍。

(5)處的方法為:

    protected void flush0() {
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }

        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }

        inFlush0 = true;

        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }

        try {
            doWrite(outboundBuffer);(6)
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                outboundBuffer.failFlushed(t, true);
            }
        } finally {
            inFlush0 = false;
        }
    }

(6)處的方法為AbstractNioByteChannel的doWrite方法:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    /**
     * 當發(fā)送緩沖區(qū)已經滿,沒能將完整數據包發(fā)送出去時询吴,需要設置setOpWrite為true掠河,
     * 這樣在因為緩沖區(qū)滿不能將完整數據包發(fā)送出去提前退出循環(huán)后重新設置寫感興趣事件,
     * 待下次可寫事件發(fā)生時繼續(xù)把剩下的數據包發(fā)送出去猛计。
     */
    boolean setOpWrite = false;
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
                in.remove();
                continue;
            }

            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                /**
                 * 這里的localFlushedAmount是真正寫入到channel的字節(jié)數
                 */
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    /**
                     * 寫入的字節(jié)數為0唠摹,說明channel的寫緩沖區(qū)已滿,不能再寫入
                     */
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                /**
                 * buf不可讀說明寫入buf的數據已經全部發(fā)送到channel
                 */
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            if (done) {
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else if (msg instanceof FileRegion) {
            FileRegion region = (FileRegion) msg;
            boolean done = region.transfered() >= region.count();

            if (!done) {
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }

                for (int i = writeSpinCount - 1; i >= 0; i--) {
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (region.transfered() >= region.count()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);
            }

            if (done) {
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

doWrite方法執(zhí)行的操作主要為循環(huán)從ChannelOutboundBuffer中取出msg寫到channel的socket發(fā)送緩沖區(qū)奉瘤,直到寫完ChannelOutboundBuffer的所有msg勾拉,但有時因為socket發(fā)送緩沖區(qū)滿了不能及時發(fā)出去,那么設置setOpWrite為true,標識寫半包情況藕赞,退出循環(huán)后要注冊channel的write感興趣事件成肘,等待下次可寫了繼續(xù)發(fā)。

對于server端斧蜕,在NioEventLoop select循環(huán)中獲取到讀事件双霍,解析好client發(fā)送來的數據并響應給client,同樣是執(zhí)行NioSocketChannel.writeAndFlush操作惩激,與client端一樣店煞,不在詳細闡述蟹演。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末风钻,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子酒请,更是在濱河造成了極大的恐慌骡技,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羞反,死亡現(xiàn)場離奇詭異布朦,居然都是意外死亡,警方通過查閱死者的電腦和手機昼窗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進店門是趴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人澄惊,你說我怎么就攤上這事唆途。” “怎么了掸驱?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵肛搬,是天一觀的道長。 經常有香客問我毕贼,道長温赔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任鬼癣,我火速辦了婚禮陶贼,結果婚禮上,老公的妹妹穿的比我還像新娘待秃。我一直安慰自己骇窍,他們只是感情好,可當我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布锥余。 她就那樣靜靜地躺著腹纳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嘲恍,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天足画,我揣著相機與錄音,去河邊找鬼佃牛。 笑死淹辞,一個胖子當著我的面吹牛,可吹牛的內容都是我干的俘侠。 我是一名探鬼主播象缀,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼爷速!你這毒婦竟也來了央星?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤惫东,失蹤者是張志新(化名)和其女友劉穎莉给,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體廉沮,經...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡颓遏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了滞时。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叁幢。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖坪稽,靈堂內的尸體忽然破棺而出曼玩,到底是詐尸還是另有隱情,我是刑警寧澤刽漂,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布演训,位于F島的核電站,受9級特大地震影響贝咙,放射性物質發(fā)生泄漏样悟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一庭猩、第九天 我趴在偏房一處隱蔽的房頂上張望窟她。 院中可真熱鬧,春花似錦蔼水、人聲如沸震糖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吊说。三九已至论咏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間颁井,已是汗流浹背厅贪。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留雅宾,地道東北人养涮。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像眉抬,于是被迫代替她去往敵國和親贯吓。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內容