flush

DefaultChannelPipeline.flush()

調(diào)用unsafe

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
     unsafe.flush();
 }

AbstractChannel.flush()

調(diào)用outboundbuffer.addFlush 然后flush0,

@Override
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

flush0()

調(diào)用NioSocketChannel.flush0()

@Override
protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}
//???
    private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

ChannelOutboundBuffer.addFlush()

對unflushedEntry繼續(xù)計數(shù)flushed++
若已經(jīng)cancel撩满,進行其他處理盅弛??洲胖?


/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
    if (flushedEntry == null) {
        // there is no flushedEntry yet, so start with the entry
        flushedEntry = entry;
    }
    do {
        flushed ++;
        if (!entry.promise.setUncancellable()) {
            // Was cancelled so make sure we free up memory and notify about the freed bytes
            int pending = entry.cancel();
            decrementPendingOutboundBytes(pending, false, true);
        }
        entry = entry.next;
    } while (entry != null);

    // All flushed so reset unflushedEntry
    unflushedEntry = null;
}
}

AbstractChannel

主要調(diào)用 doWrite(outboundBuffer) 還有其他異常處理

@SuppressWarnings("deprecation")
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            try {
                shutdownOutput(voidPromise(), t);
            } catch (Throwable t2) {
                close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        }
    } finally {
        inFlush0 = false;
    }
}
 @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel.doWrite()

真正寫据某。對不同的nioBufferCnt用不同的寫方法
寫完后會調(diào)用ChannelOutboundBuffer.removeBytes(writtenBytes)

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
    int size = in.size();
    if (size == 0) {
        // All written so clear OP_WRITE
        clearOpWrite();
        break;
    }
    long writtenBytes = 0;
    boolean done = false;
    boolean setOpWrite = false;

    // Ensure the pending writes are made of ByteBufs only.
    ByteBuffer[] nioBuffers = in.nioBuffers();
    int nioBufferCnt = in.nioBufferCount();
    long expectedWrittenBytes = in.nioBufferSize();
    SocketChannel ch = javaChannel();

    // Always us nioBuffers() to workaround data-corruption.
    // See https://github.com/netty/netty/issues/2761
    switch (nioBufferCnt) {
        case 0:
            // We have something else beside ByteBuffers to write so fallback to normal writes.
            super.doWrite(in);
            return;
        case 1:
            // Only one ByteBuf so use non-gathering write
            ByteBuffer nioBuffer = nioBuffers[0];
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final int localWrittenBytes = ch.write(nioBuffer);
                if (localWrittenBytes == 0) {
                    setOpWrite = true;
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }
            break;
        default:
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes == 0) {
                    setOpWrite = true;
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }
            break;
    }

    // Release the fully written buffers, and update the indexes of the partially written buffer.
    in.removeBytes(writtenBytes);

    if (!done) {
        // Did not write all buffers completely.
        incompleteWrite(setOpWrite);
        break;
    }
}
}

ChannelOutboundBuffer.removeBytes()

記錄寫的progress,若current()的buffer寫完,會調(diào)用remove();
最后調(diào)用clearNioBuffers();

/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }
    
        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
    
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

remove()

調(diào)用removeEntry(e)

/**
* Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
* flushed message exists at the time this method is called it will return {@code false} to signal that no more
* messages are ready to be handled.
*/
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    
    removeEntry(e);
    
    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);//回調(diào)
        decrementPendingOutboundBytes(size, false, true);//小于低水位蒋困,設(shè)置write快速再寫
    }
    
    // recycle the entry 回收
    e.recycle();
    
    return true;
}

removeEntry()

--flushed
若flushed為0flushedEntity設(shè)置null
否則把next設(shè)置給flushed

private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            flushedEntry = e.next;
        }
    }
//小于低水位,設(shè)置write快速再寫
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}


private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}        

clearNioBuffers()

把線程中分配的BUF設(shè)置為null

 // Clear all ByteBuffer from the array so these can be GC'ed.
    // See https://github.com/netty/netty/issues/3837
    private void clearNioBuffers() {
        int count = nioBufferCount;
        if (count > 0) {
            nioBufferCount = 0;
            Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
        }
    }
  private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };

AbstractNioByteChannel.incompleteWrite()

網(wǎng)咯擁塞的時候(writeBytes數(shù)量為0)設(shè)置write,加速寫

protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        setOpWrite();
    } else {
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}

setOpWrite()

  protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

clearOpWrite()

protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末敬辣,一起剝皮案震驚了整個濱河市雪标,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌溉跃,老刑警劉巖村刨,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異撰茎,居然都是意外死亡嵌牺,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逆粹,“玉大人募疮,你說我怎么就攤上這事∑У” “怎么了阿浓?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長奢方。 經(jīng)常有香客問我搔扁,道長,這世上最難降的妖魔是什么蟋字? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任稿蹲,我火速辦了婚禮,結(jié)果婚禮上鹊奖,老公的妹妹穿的比我還像新娘苛聘。我一直安慰自己,他們只是感情好忠聚,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布设哗。 她就那樣靜靜地躺著,像睡著了一般两蟀。 火紅的嫁衣襯著肌膚如雪网梢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天赂毯,我揣著相機與錄音战虏,去河邊找鬼。 笑死党涕,一個胖子當著我的面吹牛烦感,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播膛堤,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼手趣,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了肥荔?” 一聲冷哼從身側(cè)響起绿渣,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎次企,沒想到半個月后怯晕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡舟茶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吧凉。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡阀捅,死狀恐怖饲鄙,靈堂內(nèi)的尸體忽然破棺而出忍级,到底是詐尸還是另有隱情,我是刑警寧澤汛蝙,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布历帚,位于F島的核電站睹栖,受9級特大地震影響硫惕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜野来,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一恼除、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧曼氛,春花似錦豁辉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至构舟,卻和暖如春灰追,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背狗超。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工弹澎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人努咐。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓苦蒿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親渗稍。 傳聞我的和親對象是個殘疾皇子佩迟,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容