Netty 數(shù)據(jù)流處理 - write

Netty Data Stream Handling - write

上篇文章中介紹了Netty是讀數(shù)據(jù)的流程:EventLoop不停的select IO督函;一旦發(fā)現(xiàn)OP_READ可用則利用Channel.Unsafe讀取數(shù)據(jù)触幼,并把數(shù)據(jù)傳給Pipeline墩划;Pipeline拿到數(shù)據(jù)化撕,并交由內(nèi)部的Handler處理譬正。還有上篇文章也說了Pipeline中的兩種數(shù)據(jù)流向:inbound和outbound。read操作符合inbound流向盛撑;而write則符合outbound劉翔碎节。這些知識對我們理解write操作大有裨益,因為write操作在Pipeline中是read的反向操作抵卫。

那我們開始介紹Netty的write寫數(shù)據(jù)流程狮荔。先找到EchoServerHandler的channelRead方法,在這個示例中介粘,它會把讀到的數(shù)據(jù)再寫回客戶端:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.write(msg);
}

這個方法直接調(diào)用了ChannelHandlerContext的write方法:

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}

沒什么好解釋的殖氏,給出write的調(diào)用鏈:

@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

invokeWrite0接下來會調(diào)用HeadContext中的write方法(沒什么好解釋的了,read的反向操作):

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這個方法調(diào)用了Unsafe的write方法姻采,至此write操作從Pipeline中走完了雅采,接下來才是重頭戲。我們來看這個AbstractUnsafe實現(xiàn)的write方法:

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    // 1
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        // 2
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    // 3
    outboundBuffer.addMessage(msg, size, promise);
}

直接調(diào)用write寫數(shù)據(jù)的時候慨亲,并不是直接寫到channel中婚瓜,而是先寫到緩沖區(qū)里,也就是ChannelOutboundBuffer刑棵。當調(diào)用調(diào)用flushflush才開始向channel寫數(shù)據(jù)巴刻。ChannelOutboundBuffer是一個無界鏈表,如果不停的向緩沖區(qū)寫入數(shù)據(jù)可能會導致內(nèi)存溢出蛉签。因此ChannelOutboundBuffer檢測當newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()時胡陪,buffer便不可寫。同時通知Pipeline Writablity Changed:pipeline.fireChannelWritabilityChanged();碍舍。我們在實現(xiàn)自己的Handler時可以重新實現(xiàn)該方法督弓。

我們接著看下filterOutboundMessage這個方法:

@Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }

在講解ByteBuf的文章中提到過關(guān)于使用直接內(nèi)存還是堆內(nèi)存的最佳實踐:在IO通信的線程中操作ByteBuf應(yīng)使用DirectBuffer(省去內(nèi)存拷貝的成本),在后端業(yè)務(wù)邏輯中操作ByteBuf應(yīng)使用HeapBuffer(不用擔心內(nèi)存泄露)乒验。

這個方法首先檢查msg類型是不是ByteBuf,如果是ByteBuf則檢查是不是使用了直接內(nèi)存蒂阱,如果沒有則把基于堆的內(nèi)存換成直接內(nèi)存锻全。

我們接著分析ChannelOutboundBuffer的addMessage方法:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(entry.pendingSize, false);
}

這個方法做了兩件事情:把msg加入到鏈表中;調(diào)用incrementPendingOutboundBytes方法录煤。我們來看這個方法:

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);

    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

首先鳄厌,先更新緩沖區(qū)大小,接著判斷緩沖區(qū)是否大于最高水位妈踊,如果是則設(shè)置buffer為unWritable(默認的高水位線大小是64K)了嚎。setUnwritable方法內(nèi)通知了Pipeline Writablity Changed,這里不貼代碼了自己看去吧。

到這的話write方法是執(zhí)行完了歪泳,但是數(shù)據(jù)仍然留在內(nèi)存中萝勤。接下來我們看flush方法。跳過Pipeline直接來看AbstractUnsafe的flush:

@Override
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

這個方法首先調(diào)用了outboundBuffer的addFlush方法呐伞,我們先看下addFlush做了什么:

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        unflushedEntry = null;
    }
}

unflushedEntry表示緩沖區(qū)鏈表中第一個未被flush的元素敌卓。如果這個變量為null的話,表示當前緩沖區(qū)已被flush伶氢。for循環(huán)中首先把entry的promise設(shè)置為Uncancellable(Promise繼承自Future趟径,F(xiàn)uture是可以cancel的),然后增加flushed計數(shù)癣防。decrementPendingOutboundBytes方法中有邏輯檢查目前Buffer是否低于低水位蜗巧,如果是則重置Buffer為可寫。

我們接著看flush0方法蕾盯,flush0主要是調(diào)用NioSocketChannel的doWrite方法:

protected void flush0() {
    ///...
    doWrite(outboundBuffer);
    ///...
}

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        int size = in.size();
        if (size == 0) {
            clearOpWrite();
            break;
        }
        long writtenBytes = 0;
        boolean done = false;
        boolean setOpWrite = false;

        // 1
        ByteBuffer[] nioBuffers = in.nioBuffers();
        int nioBufferCnt = in.nioBufferCount();
        long expectedWrittenBytes = in.nioBufferSize();
        SocketChannel ch = javaChannel();

        switch (nioBufferCnt) {
            case 0:
                super.doWrite(in);
                return;
            case 1:
                ByteBuffer nioBuffer = nioBuffers[0];
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    // 2
                    final int localWrittenBytes = ch.write(nioBuffer);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
            default:
                ///...
        }

        in.removeBytes(writtenBytes);

        if (!done) {
            // Did not write all buffers completely.
            incompleteWrite(setOpWrite);
            break;
        }
    }
}

doWrite方法首先獲取Buffer中緩存的消息幕屹,并將Netty ByteBuf轉(zhuǎn)成Nio ByteBuffer;然后把數(shù)據(jù)寫入Nio SocketChannel刑枝。寫數(shù)據(jù)的操作放到了一個for循環(huán)中香嗓。因為Netty的一個緩沖數(shù)據(jù)可能不會一次性的刷到Channel中,如果只作一次write操作就返回装畅,那么很有可能余下的數(shù)據(jù)要等到下次OP_WRITE Selection Key可用才能全部寫完靠娱。這期間經(jīng)歷了一次昂貴的IO操作,所以這個for循環(huán)又是CPU時間和IO時間的一個精心分配掠兄。

至此像云,Netty寫數(shù)據(jù)的流程分析完畢。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蚂夕,一起剝皮案震驚了整個濱河市迅诬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌婿牍,老刑警劉巖侈贷,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異等脂,居然都是意外死亡俏蛮,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門上遥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來搏屑,“玉大人,你說我怎么就攤上這事粉楚±绷担” “怎么了亮垫?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長伟骨。 經(jīng)常有香客問我饮潦,道長,這世上最難降的妖魔是什么底靠? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任害晦,我火速辦了婚禮,結(jié)果婚禮上暑中,老公的妹妹穿的比我還像新娘壹瘟。我一直安慰自己,他們只是感情好鳄逾,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布稻轨。 她就那樣靜靜地躺著,像睡著了一般雕凹。 火紅的嫁衣襯著肌膚如雪殴俱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天枚抵,我揣著相機與錄音线欲,去河邊找鬼。 笑死汽摹,一個胖子當著我的面吹牛李丰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播逼泣,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼趴泌,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了拉庶?” 一聲冷哼從身側(cè)響起嗜憔,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎氏仗,沒想到半個月后吉捶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡皆尔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年帚稠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片床佳。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖榄审,靈堂內(nèi)的尸體忽然破棺而出砌们,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布浪感,位于F島的核電站昔头,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏影兽。R本人自食惡果不足惜揭斧,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望峻堰。 院中可真熱鬧讹开,春花似錦、人聲如沸捐名。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镶蹋。三九已至成艘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間贺归,已是汗流浹背淆两。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拂酣,地道東北人秋冰。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像踱葛,于是被迫代替她去往敵國和親丹莲。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容