Flink 源碼之節(jié)點(diǎn)間通信

Flink源碼分析系列文檔目錄

請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄

從collector到buffer

下面我們從數(shù)據(jù)源出開(kāi)始分析數(shù)據(jù)是如何寫(xiě)入到Flink緩存中的拧篮。

NoTimestampContext.collect方法儡嘶。該方法位于數(shù)據(jù)源(SourceFunction)中。

@Override
public void collect(T element) {
    synchronized (lock) {
        output.collect(reuse.replace(element));
    }
}

這里調(diào)用的是output對(duì)象的collect方法胁编。Output對(duì)象是Output<StreamRecord<T>>類(lèi)型。經(jīng)過(guò)debug我們發(fā)現(xiàn)這里的output真實(shí)類(lèi)型為CountingOutput類(lèi)型务冕。
CountingOutput僅僅是一個(gè)包裝類(lèi)型届良,包裝了一個(gè)Output。相比于其他Output而言多出了收集元素?cái)?shù)量的監(jiān)控答捕。CountingOutput維護(hù)了一個(gè)計(jì)數(shù)器類(lèi)型監(jiān)控變量:

private final Counter numRecordsOut;

在collect元素的時(shí)候調(diào)用了numRecordsOut.inc()方法,實(shí)現(xiàn)了對(duì)收集元素?cái)?shù)量的監(jiān)控屑那。
NoTimestampContext的CountingOuput封裝的output是什么類(lèi)型的呢拱镐?我們通過(guò)debug查看發(fā)現(xiàn)內(nèi)層的類(lèi)型為RecordWriterOutput

RecordWriterOutputcollect方法如下所示:

@Override
public void collect(StreamRecord<OUT> record) {
    // outputTag使用旁路輸出的時(shí)候會(huì)用到持际,這里只支持輸出到main input
    if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }

    pushToRecordWriter(record);
}

pushToRecordWriter方法使用序列化代理沃琅,將record傳遞給recordWriter。代碼如下:

private <X> void pushToRecordWriter(StreamRecord<X> record) {
    serializationDelegate.setInstance(record);

    try {
        recordWriter.emit(serializationDelegate);
    }
    catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    }
}

RecordWriter負(fù)責(zé)把數(shù)據(jù)序列化蜘欲,然后寫(xiě)入到緩存中益眉。它有兩個(gè)實(shí)現(xiàn)類(lèi):

  • BroadcastRecordWriter: 維護(hù)了多個(gè)下游channel,發(fā)送數(shù)據(jù)到下游所有的channel中。
  • ChannelSelectorRecordWriter: 通過(guò)channelSelector對(duì)象判斷數(shù)據(jù)需要發(fā)往下游的哪個(gè)channel郭脂。keyBy算子用的正是這個(gè)RecordWriter年碘。

這里我們分析下ChannelSelectorRecordWriteremit方法:

@Override
public void emit(T record) throws IOException, InterruptedException {
    emit(record, channelSelector.selectChannel(record));
}

很明顯這里使用了channelSelector.selectChannel方法。該方法為record和對(duì)應(yīng)下游channel id的函數(shù)關(guān)系展鸡。

接下來(lái)我們又回到了父類(lèi)RecordWriter屿衅。

protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
    checkErroneous();

    serializer.serializeRecord(record);

    // Make sure we don't hold onto the large intermediate serialization buffer for too long
    if (copyFromSerializerToTargetChannel(targetChannel)) {
        // 壓縮序列化器中間數(shù)據(jù)緩存大小
        serializer.prune();
    }
}

關(guān)鍵的邏輯在于copyFromSerializerToTargetChannel。此方法從序列化器中復(fù)制數(shù)據(jù)到目標(biāo)channel莹弊。

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
    // We should reset the initial position of the intermediate serialization buffer before
    // copying, so the serialization results can be copied to multiple target buffers.
    // 此處Serializer為SpanningRecordSerializer
    // reset方法將serializer內(nèi)部的databuffer position重置為0
    serializer.reset();

    boolean pruneTriggered = false;
    // 獲取目標(biāo)channel的bufferBuilder
    // bufferBuilder內(nèi)維護(hù)了MemorySegment涤久,即內(nèi)存片段
    // Flink的內(nèi)存管理依賴MemorySegment,可實(shí)現(xiàn)堆內(nèi)堆外內(nèi)存的管理
    // RecordWriter內(nèi)有一個(gè)bufferBuilder數(shù)組忍弛,長(zhǎng)度和下游channel數(shù)目相同
    // 該數(shù)組以channel ID為下標(biāo)拴竹,存儲(chǔ)和channel對(duì)應(yīng)的bufferBuilder
    // 如果對(duì)應(yīng)channel的bufferBuilder尚未創(chuàng)建,調(diào)用requestNewBufferBuilder申請(qǐng)一個(gè)新的bufferBuilder
    BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    // 復(fù)制serializer的數(shù)據(jù)到bufferBuilder中
    SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);

    // 循環(huán)直到result完全被寫(xiě)入到buffer
    // 一條數(shù)據(jù)可能會(huì)被寫(xiě)入到多個(gè)緩存中
    // 如果緩存不夠用剧罩,會(huì)申請(qǐng)新的緩存
    // 數(shù)據(jù)完全寫(xiě)入完畢之時(shí),當(dāng)前正在操作的緩存是沒(méi)有寫(xiě)滿的
    // 因此返回true座泳,表明需要壓縮該buffer的空間
    while (result.isFullBuffer()) {
        finishBufferBuilder(bufferBuilder);

        // If this was a full record, we are done. Not breaking out of the loop at this point
        // will lead to another buffer request before breaking out (that would not be a
        // problem per se, but it can lead to stalls in the pipeline).
        if (result.isFullRecord()) {
            pruneTriggered = true;
            emptyCurrentBufferBuilder(targetChannel);
            break;
        }

        bufferBuilder = requestNewBufferBuilder(targetChannel);
        result = serializer.copyToBufferBuilder(bufferBuilder);
    }
    checkState(!serializer.hasSerializedData(), "All data should be written at once");

    // 如果buffer超時(shí)時(shí)間為0惠昔,需要flush目標(biāo)channel的數(shù)據(jù)
    if (flushAlways) {
        flushTargetPartition(targetChannel);
    }
    return pruneTriggered;
}

接下來(lái)分析下getBufferBuilder方法。以ChannelSelectorRecordWriter的此方法為例說(shuō)明挑势。

@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    if (bufferBuilders[targetChannel] != null) {
        return bufferBuilders[targetChannel];
    } else {
        return requestNewBufferBuilder(targetChannel);
    }
}

如果bufferBuilders數(shù)組中targetChannel下標(biāo)不存在镇防,申請(qǐng)一個(gè)新的BufferBuilder。此處我們發(fā)現(xiàn)各個(gè)channel對(duì)應(yīng)的bufferBuilder是懶加載的潮饱,只有第一次用到的時(shí)候才創(chuàng)建来氧。

我們跟蹤到requestNewBufferBuilder方法。

@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    // 首先需要檢查targetChannel對(duì)應(yīng)的buffer必須為null或數(shù)據(jù)已寫(xiě)入完畢
    checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());

    // 獲取目標(biāo)分區(qū)的bufferBuilder
    BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
    // 創(chuàng)建一個(gè)bufferConsumer香拉,bufferConsumer保存了bufferBuilder的memorySegment啦扬,當(dāng)前寫(xiě)入指針和當(dāng)前讀取指針
    // BufferBuilder用于寫(xiě)入數(shù)據(jù),BufferConsumer用于讀取數(shù)據(jù)
    targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
    bufferBuilders[targetChannel] = bufferBuilder;
    return bufferBuilder;
}

addBufferConsumer方法

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
    checkNotNull(bufferConsumer);

    ResultSubpartition subpartition;
    try {
        checkInProduceState();
        // 獲取subPartition
        // 此處subpartitionIndex為targetChannel
        subpartition = subpartitions[subpartitionIndex];
    }
    catch (Exception ex) {
        bufferConsumer.close();
        throw ex;
    }

    // 將bufferConsumer添加入subpartition
    return subpartition.add(bufferConsumer);
}

ReultSubPartition有兩個(gè)實(shí)現(xiàn)類(lèi)凫碌,PipelinedSubpartitionBoundedBlockingSubpartition扑毡。
其中PipelinedSubpartition用于流處理場(chǎng)景下的數(shù)據(jù)消費(fèi)。它內(nèi)部維護(hù)了一個(gè)BufferBuilder隊(duì)列盛险。消費(fèi)者通過(guò)調(diào)用createReadView創(chuàng)建一個(gè)PipelinedSubpartitionView來(lái)消費(fèi)數(shù)據(jù)瞄摊。創(chuàng)建view的時(shí)候需要提供一個(gè)BufferAvailabilityListener對(duì)象,用于作為buffer中有數(shù)據(jù)可用時(shí)候的回調(diào)苦掘。因此PipelinedSubpartition可以做到一旦有數(shù)據(jù)就及時(shí)提醒下游去消費(fèi)换帜。

BoundedBlockingSubpartition適合批處理場(chǎng)景下的數(shù)據(jù)消費(fèi)。和PipelinedSubpartition不同的是鹤啡,BoundedBlockingSubpartition數(shù)據(jù)是先寫(xiě)入后消費(fèi)的惯驼,可以一次寫(xiě)入,多次消費(fèi)递瑰。它的數(shù)據(jù)寫(xiě)入到BoundedData中跳座。數(shù)據(jù)落地的方式隨著BoundedData實(shí)現(xiàn)的不同而不同端铛。數(shù)據(jù)可以保存在文件系統(tǒng)(FileChannelBoundedData),內(nèi)存(MemoryMappedBoundedData)或者同時(shí)在文件系統(tǒng)和內(nèi)存(FileChannelMemoryMappedBoundedData)疲眷。

下游請(qǐng)求SubPartition

上一段分析過(guò)數(shù)據(jù)的消費(fèi)是通過(guò)ResultSubPartition調(diào)用createReadView方法禾蚕。
PipelinedSubpartitioncreateReadView代碼如下:

@Override
public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
    final boolean notifyDataAvailable;
    synchronized (buffers) {
        // 檢查該SubPartition的緩存不能被釋放
        checkState(!isReleased);
        // 檢查之前不能創(chuàng)建過(guò)read view
        checkState(readView == null,
                "Subpartition %s of is being (or already has been) consumed, " +
                "but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());

        LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
            parent.getOwningTaskName(), index, parent.getPartitionId());

        // 創(chuàng)建view,同時(shí)轉(zhuǎn)入availabilityListener
        readView = new PipelinedSubpartitionView(this, availabilityListener);
        // 如果buffer不為空狂丝,需要調(diào)用listener通知數(shù)據(jù)已準(zhǔn)備好换淆,可供消費(fèi)
        notifyDataAvailable = !buffers.isEmpty();
    }
    if (notifyDataAvailable) {
        notifyDataAvailable();
    }

    return readView;
}

上述方法在ResultPartitionManager中調(diào)用。
ResultPartitionManager負(fù)責(zé)維護(hù)當(dāng)前創(chuàng)建和消費(fèi)的分區(qū)几颜。
ResultPartitionManagercreateSubpartitionView方法:

@Override
public ResultSubpartitionView createSubpartitionView(
        ResultPartitionID partitionId,
        int subpartitionIndex,
        BufferAvailabilityListener availabilityListener) throws IOException {

    synchronized (registeredPartitions) {
        final ResultPartition partition = registeredPartitions.get(partitionId);

        if (partition == null) {
            throw new PartitionNotFoundException(partitionId);
        }

        LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);

        return partition.createSubpartitionView(subpartitionIndex, availabilityListener);
    }
}

該方法邏輯比較簡(jiǎn)單倍试,ResultPartitionsetup的時(shí)候會(huì)將該分區(qū)注冊(cè)到ResultPartitionManager中。創(chuàng)建view的時(shí)候會(huì)根據(jù)partition id從已注冊(cè)的分區(qū)列表中獲取到指定的ResultPartition蛋哭,然后創(chuàng)建一個(gè)subpartition view县习。

繼續(xù)跟蹤該方法的調(diào)用鏈,我們可以發(fā)現(xiàn)該方法在兩個(gè)類(lèi)中調(diào)用:LocalInputChannelCreditBasedSequenceNumberingViewReader谆趾。

LocalInputChannel負(fù)責(zé)從本地請(qǐng)求一個(gè)subPartition view躁愿。
CreditBasedSequenceNumberingViewReader負(fù)責(zé)通過(guò)網(wǎng)絡(luò)從其他節(jié)點(diǎn)獲取subPartition view。同時(shí)提供了credit based反壓機(jī)制的支持沪蓬。

我們跟蹤下CreditBasedSequenceNumberingViewReaderrequestSubpartitionView方法:

@Override
public void requestSubpartitionView(
    ResultPartitionProvider partitionProvider,
    ResultPartitionID resultPartitionId,
    int subPartitionIndex) throws IOException {

    synchronized (requestLock) {
        if (subpartitionView == null) {
            // This this call can trigger a notification we have to
            // schedule a separate task at the event loop that will
            // start consuming this. Otherwise the reference to the
            // view cannot be available in getNextBuffer().
            this.subpartitionView = partitionProvider.createSubpartitionView(
                resultPartitionId,
                subPartitionIndex,
                this);
        } else {
            throw new IllegalStateException("Subpartition already requested");
        }
    }
}

方法邏輯僅為創(chuàng)建一個(gè)subpartitionView彤钟。繼續(xù)向上跟蹤該方法的調(diào)用位置,我們找到了PartitionRequestServerHandlerchannelRead0方法:

@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
    try {
        // 獲取接收到消息的類(lèi)型
        Class<?> msgClazz = msg.getClass();

        // ----------------------------------------------------------------
        // Intermediate result partition requests
        // ----------------------------------------------------------------
        // 如果是分區(qū)請(qǐng)求消息
        if (msgClazz == PartitionRequest.class) {
            PartitionRequest request = (PartitionRequest) msg;

            LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);

            try {
                NetworkSequenceViewReader reader;
                // 創(chuàng)建一個(gè)reader
                reader = new CreditBasedSequenceNumberingViewReader(
                    request.receiverId,
                    request.credit,
                    outboundQueue);

                // 為該reader分配一個(gè)subpartitionView
                reader.requestSubpartitionView(
                    partitionProvider,
                    request.partitionId,
                    request.queueIndex);

                // 注冊(cè)reader到outboundQueue中
                // outboundQueue中存放了多個(gè)reader跷叉,這些reader在隊(duì)列中排隊(duì)逸雹,等待數(shù)據(jù)發(fā)送
                outboundQueue.notifyReaderCreated(reader);
            } catch (PartitionNotFoundException notFound) {
                respondWithError(ctx, notFound, request.receiverId);
            }
        }
        // ----------------------------------------------------------------
        // Task events
        // ----------------------------------------------------------------
        else if (msgClazz == TaskEventRequest.class) {
            TaskEventRequest request = (TaskEventRequest) msg;

            if (!taskEventPublisher.publish(request.partitionId, request.event)) {
                respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
            }
        } else if (msgClazz == CancelPartitionRequest.class) {
            CancelPartitionRequest request = (CancelPartitionRequest) msg;

            outboundQueue.cancel(request.receiverId);
        } else if (msgClazz == CloseRequest.class) {
            outboundQueue.close();
        } else if (msgClazz == AddCredit.class) {
            AddCredit request = (AddCredit) msg;

            outboundQueue.addCredit(request.receiverId, request.credit);
        } else {
            LOG.warn("Received unexpected client request: {}", msg);
        }
    } catch (Throwable t) {
        respondWithError(ctx, t);
    }
}

此方法在上游數(shù)據(jù)發(fā)送端執(zhí)行,數(shù)據(jù)發(fā)送端對(duì)應(yīng)的netty角色為server云挟。
這里我們根據(jù)netty接收到的消息的類(lèi)型梆砸,來(lái)做出對(duì)應(yīng)的響應(yīng)。如果接收到的消息類(lèi)型為PartitionRequest园欣,需要?jiǎng)?chuàng)建一個(gè)CreditBasedSequenceNumberingViewReader并將該reader加入到outboundQueue中辫樱。

outboundQueue是一個(gè)PartitionRequestQueue類(lèi)型對(duì)象。該對(duì)象負(fù)責(zé)處理partition request俊庇。每次partition request會(huì)在PartitionRequestServerHandler中創(chuàng)建一個(gè)NetworkSequenceViewReader對(duì)象狮暑。然后給每個(gè)reader分配SubPartitionView(調(diào)用requestSubpartitionView)。最后調(diào)用notifyReaderCreated把reader加入到PartitionRequestQueueallReaders中辉饱。PartitionRequestQueue監(jiān)聽(tīng)下游的channel是否可寫(xiě)(writability)搬男。下游channel變?yōu)榭蓪?xiě)的時(shí)候會(huì)調(diào)用channelWritabilityChanged方法,將allReaders中排隊(duì)的reader逐個(gè)取出彭沼,發(fā)往下游缔逛。

channelWritabilityChanged方法代碼如下:

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    writeAndFlushNextMessageIfPossible(ctx.channel());
}

writeAndFlushNextMessageIfPossible方法代碼如下:

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
    // 如果channel不可寫(xiě),返回
    if (fatalError || !channel.isWritable()) {
        return;
    }

    // The logic here is very similar to the combined input gate and local
    // input channel logic. You can think of this class acting as the input
    // gate and the consumed views as the local input channels.

    BufferAndAvailability next = null;
    try {
        while (true) {
            // 隊(duì)列中取出一個(gè)reader
            NetworkSequenceViewReader reader = pollAvailableReader();

            // No queue with available data. We allow this here, because
            // of the write callbacks that are executed after each write.
            if (reader == null) {
                return;
            }

            // 獲取buffer
            next = reader.getNextBuffer();
            if (next == null) {
                if (!reader.isReleased()) {
                    continue;
                }

                Throwable cause = reader.getFailureCause();
                if (cause != null) {
                    ErrorResponse msg = new ErrorResponse(
                        new ProducerFailedException(cause),
                        reader.getReceiverId());

                    ctx.writeAndFlush(msg);
                }
            } else {
                // This channel was now removed from the available reader queue.
                // We re-add it into the queue if it is still available
                if (next.moreAvailable()) {
                    registerAvailableReader(reader);
                }

                // 包裝buffer
                BufferResponse msg = new BufferResponse(
                    next.buffer(),
                    reader.getSequenceNumber(),
                    reader.getReceiverId(),
                    next.buffersInBacklog());

                // Write and flush and wait until this is done before
                // trying to continue with the next buffer.
                // 將msg發(fā)送到下游
                channel.writeAndFlush(msg).addListener(writeListener);

                return;
            }
        }
    } catch (Throwable t) {
        if (next != null) {
            next.buffer().recycleBuffer();
        }

        throw new IOException(t.getMessage(), t);
    }
}

接下來(lái)我們回到PartitionRequest這個(gè)請(qǐng)求。PartitionRequest是在哪里發(fā)送的呢褐奴?我們跟蹤到NettyPartitionRequestClientrequestSubpartition方法:

@Override
public void requestSubpartition(
        final ResultPartitionID partitionId,
        final int subpartitionIndex,
        final RemoteInputChannel inputChannel,
        int delayMs) throws IOException {

    checkNotClosed();

    LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
            subpartitionIndex, partitionId, delayMs);

    // clientHandler為CreditBasedPartitionRequestClientHandler
    // 它內(nèi)部維護(hù)了input channel ID和channel的對(duì)應(yīng)關(guān)系按脚,是一個(gè)map類(lèi)型變量
    // 在讀取消息的時(shí)候,需要依賴該map從channel ID獲取到channel對(duì)象本身
    clientHandler.addInputChannel(inputChannel);

    // 創(chuàng)建PartitionRequest對(duì)象
    final PartitionRequest request = new PartitionRequest(
            partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());

    // 發(fā)送PartitionRequest請(qǐng)求發(fā)送成功之后的回調(diào)函數(shù)
    final ChannelFutureListener listener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // 如果遇到了錯(cuò)誤
            if (!future.isSuccess()) {
                // map中移除這個(gè)channel
                clientHandler.removeInputChannel(inputChannel);
                SocketAddress remoteAddr = future.channel().remoteAddress();
                // 為inputChannel內(nèi)部的cause變量賦值敦冬,設(shè)置一個(gè)error
                inputChannel.onError(
                        new LocalTransportException(
                            String.format("Sending the partition request to '%s' failed.", remoteAddr),
                            future.channel().localAddress(), future.cause()
                        ));
            }
        }
    };

    // 如果不需要延遲發(fā)送
    if (delayMs == 0) {
        ChannelFuture f = tcpChannel.writeAndFlush(request);
        f.addListener(listener);
    } else {
    // 如果需要延遲發(fā)送辅搬,調(diào)用eventLoop的schedule方法
        final ChannelFuture[] f = new ChannelFuture[1];
        tcpChannel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                f[0] = tcpChannel.writeAndFlush(request);
                f[0].addListener(listener);
            }
        }, delayMs, TimeUnit.MILLISECONDS);
    }
}

繼續(xù)跟蹤調(diào)用鏈,到RemoteInputChannelrequestSubpartition方法脖旱。代碼如下所示:

@VisibleForTesting
@Override
public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
    if (partitionRequestClient == null) {
        // Create a client and request the partition
        try {
            partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
        } catch (IOException e) {
            // IOExceptions indicate that we could not open a connection to the remote TaskExecutor
            throw new PartitionConnectionException(partitionId, e);
        }

        partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
    }
}

RemoteInputChannelrequestSubpartition方法中堪遂,如果partitionRequestClient,會(huì)預(yù)先通過(guò)connectionManager創(chuàng)建一個(gè)client萌庆,再調(diào)用requestSubpartition方法溶褪。

繼續(xù)跟蹤,我們找到SingleInputGaterequestPartitions方法践险。代碼如下:

@VisibleForTesting
void requestPartitions() throws IOException, InterruptedException {
    synchronized (requestLock) {
        // 只能請(qǐng)求一次partition猿妈,第一次調(diào)用該方法后此flag會(huì)被設(shè)置為true
        if (!requestedPartitionsFlag) {
            if (closeFuture.isDone()) {
                throw new IllegalStateException("Already released.");
            }

            // Sanity checks
            if (numberOfInputChannels != inputChannels.size()) {
                throw new IllegalStateException(String.format(
                    "Bug in input gate setup logic: mismatch between " +
                    "number of total input channels [%s] and the currently set number of input " +
                    "channels [%s].",
                    inputChannels.size(),
                    numberOfInputChannels));
            }

            // 循環(huán)所有的inputChannels,請(qǐng)求他們對(duì)應(yīng)的subPartition
            for (InputChannel inputChannel : inputChannels.values()) {
                inputChannel.requestSubpartition(consumedSubpartitionIndex);
            }
        }
        // 方法調(diào)用完畢設(shè)置flag為true巍虫,防止重復(fù)調(diào)用
        requestedPartitionsFlag = true;
    }
}

SingleInputGate繼承了InputGate接口彭则。InputGate的作用為從intermediate result讀取數(shù)據(jù)到task中。
根據(jù)JobGraph(參見(jiàn)Flink 源碼之JobGraph生成
)的分析我們可以得知operatorChain之間使用的intermediate result來(lái)作為中間結(jié)果緩存。Intermediate result在執(zhí)行時(shí)的真實(shí)數(shù)據(jù)承載對(duì)象為ResultPartition(一個(gè)或多個(gè),視分區(qū)條件而定)抄伍。ResultPartition分為一個(gè)或多個(gè)ResultSubPartition熏挎。每一個(gè)ResultSubPartition和下游的某一個(gè)InputGate有對(duì)應(yīng)關(guān)系。下游的InputGate讀取上游所有對(duì)應(yīng)ResultSubPartition的內(nèi)容前痘。

讀取數(shù)據(jù)

我們分析下StreamTaskprocessInput方法凛捏。代碼如下所示:

protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
    // 調(diào)用inputProcessor的processInput方法
    InputStatus status = inputProcessor.processInput();
    if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
        return;
    }
    if (status == InputStatus.END_OF_INPUT) {
        // 如果輸入結(jié)束,將mailboxLoopRunning設(shè)置為false芹缔,停止運(yùn)行
        controller.allActionsCompleted();
        return;
    }
    // 在inputGate recordWriter或inputProcessor恢復(fù)可用之后異步調(diào)用default action的恢復(fù)操作
    CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
    MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
    jointFuture.thenRun(suspendedDefaultAction::resume);
}

這里我們重點(diǎn)關(guān)注下inputProcessor.processInput()調(diào)用坯癣。
inputProcessor有兩個(gè)實(shí)現(xiàn)類(lèi):StreamOneInputProcessorStreamTwoInputProcessor。我們看一下StreamOneInputProcessorprocessInput方法最欠。代碼如下:

@Override
public InputStatus processInput() throws Exception {
    InputStatus status = input.emitNext(output);

    if (status == InputStatus.END_OF_INPUT) {
        synchronized (lock) {
            operatorChain.endHeadOperatorInput(1);
        }
    }

    return status;
}

這里的input具有兩個(gè)實(shí)現(xiàn)類(lèi)StreamTaskSourceInputStreamTaskNetworkInput示罗。如果該StreamTask運(yùn)行的是數(shù)據(jù)源,則實(shí)現(xiàn)類(lèi)為StreamTaskSourceInput芝硬。其他情況使用的實(shí)現(xiàn)類(lèi)為StreamTaskNetworkInput蚜点,需要通過(guò)網(wǎng)絡(luò)讀取數(shù)據(jù)。

我們分析下StreamTaskNetworkInputemitNext方法拌阴。代碼如下:

@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {

    while (true) {
        // get the stream element from the deserializer
        if (currentRecordDeserializer != null) {
            // 從buffer的memorySegment中反序列化數(shù)據(jù)
            DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
            // 如果buffer已經(jīng)消費(fèi)了绍绘,可以回收buffer
            if (result.isBufferConsumed()) {
                currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                currentRecordDeserializer = null;
            }

            // 如果已經(jīng)讀取到完整記錄
            if (result.isFullRecord()) {
                // 處理從buffer中反序列化出的數(shù)據(jù),在后續(xù)博客中分析
                processElement(deserializationDelegate.getInstance(), output);
                return InputStatus.MORE_AVAILABLE;
            }
        }

        // 從CheckpointInputGate讀取數(shù)據(jù)
        Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
        if (bufferOrEvent.isPresent()) {
            // 處理獲取到的緩存,并將緩存中的memory segment提供給currentRecordDeserializer陪拘,供反序列化出消息厂镇,代碼稍后分析
            processBufferOrEvent(bufferOrEvent.get());
        } else {
            // 如果checkpointedInputGate 輸入流結(jié)束,返回END_OF_INPUT
            if (checkpointedInputGate.isFinished()) {
                checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
                if (!checkpointedInputGate.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return InputStatus.END_OF_INPUT;
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
    }
}

我們?cè)倏聪?code>processBufferOrEvent方法的源代碼:

private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
    // 如果是buffer的話
    if (bufferOrEvent.isBuffer()) {
        // 讀取buffer對(duì)應(yīng)的channel id
        lastChannel = bufferOrEvent.getChannelIndex();
        checkState(lastChannel != StreamTaskInput.UNSPECIFIED);
        // 獲取channel對(duì)應(yīng)的record反序列化器
        currentRecordDeserializer = recordDeserializers[lastChannel];
        checkState(currentRecordDeserializer != null,
            "currentRecordDeserializer has already been released");

        // 此處是關(guān)鍵左刽,設(shè)置反序列化器要讀取的buffer為inputGate獲取到的buffer
        currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }
    else {
        // Event received
        // 如果接收到的是event
        final AbstractEvent event = bufferOrEvent.getEvent();
        
        if (event.getClass() != EndOfPartitionEvent.class) {
            throw new IOException("Unexpected event: " + event);
        }

        // release the record deserializer immediately,
        // which is very valuable in case of bounded stream
        // 清除channel對(duì)應(yīng)的反序列化器
        // 并將recordDeserializers[channelIndex] 引用置空
        releaseDeserializer(bufferOrEvent.getChannelIndex());
    }
}

我們繼續(xù)跟蹤buffer是如何從inputGate中獲取的捺信。經(jīng)debug我們發(fā)現(xiàn)這個(gè)inputGate使用的2層包裝。CheckpointedInputGate包裝了InputGateWithMetrics悠反,又包裝了SingleInputGate残黑。其中CheckpointedInputGate負(fù)責(zé)檢查數(shù)據(jù)流中的checkpoint barrier,調(diào)用對(duì)應(yīng)的barrierHandler決定是否觸發(fā)checkpoint操作斋否。參見(jiàn)Flink 源碼之分布式快照
梨水。InputGateWithMetrics負(fù)責(zé)監(jiān)控接收的數(shù)據(jù),統(tǒng)計(jì)所有流入數(shù)據(jù)的總字節(jié)數(shù)茵臭。
經(jīng)歷2層包裝之后疫诽,程序邏輯進(jìn)行到SingleInputGatepollNext方法
SingleInputGatepollNextgetNext兩個(gè)方法。這兩個(gè)方法基本相同旦委,唯一的區(qū)別是pollNext為非阻塞方式奇徒,getNext為阻塞方式。

@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
    return getNextBufferOrEvent(false);
}

getNextBufferOrEvent方法:

private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
    // 如果接收到所有分區(qū)終止的事件缨硝,則返回空
    if (hasReceivedAllEndOfPartitionEvents) {
        return Optional.empty();
    }

    // 如果input gate被關(guān)閉
    if (closeFuture.isDone()) {
        throw new CancelTaskException("Input gate is already closed.");
    }

    // 以阻塞方式讀取數(shù)據(jù)
    Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
    if (!next.isPresent()) {
        return Optional.empty();
    }

    InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
    return Optional.of(transformToBufferOrEvent(
        inputWithData.data.buffer(),
        inputWithData.moreAvailable,
        inputWithData.input));
}

waitAndGetNextData方法:

private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
        throws IOException, InterruptedException {
    while (true) {
        // 獲取channel摩钙,根據(jù)blocking參數(shù)決定是否是阻塞方式
        Optional<InputChannel> inputChannel = getChannel(blocking);
        if (!inputChannel.isPresent()) {
            return Optional.empty();
        }

        // Do not query inputChannel under the lock, to avoid potential deadlocks coming from
        // notifications.
        // 獲取input channel的緩存數(shù)據(jù)
        Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();

        synchronized (inputChannelsWithData) {
            // 能獲取到數(shù)據(jù),并且還有更多數(shù)據(jù)
            if (result.isPresent() && result.get().moreAvailable()) {
                // enqueue the inputChannel at the end to avoid starvation
                // channel加入到inputChannelsWithData隊(duì)列中
                inputChannelsWithData.add(inputChannel.get());

                // 下面這個(gè)BitSet負(fù)責(zé)記錄哪些channel已經(jīng)加入到了inputChannelsWithData隊(duì)列
                enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
            }

            // 如果inputChannelsWithData為空查辩,設(shè)置為不可用狀態(tài)
            if (inputChannelsWithData.isEmpty()) {
                availabilityHelper.resetUnavailable();
            }

            // 返回包裝后的結(jié)果
            if (result.isPresent()) {
                return Optional.of(new InputWithData<>(
                    inputChannel.get(),
                    result.get(),
                    !inputChannelsWithData.isEmpty()));
            }
        }
    }
}

每一個(gè)InputGate包含一個(gè)或多個(gè)InputChannel胖笛。其中InputChannel分為2種。LocalInputChannel負(fù)責(zé)從本地的SubPartition讀取數(shù)據(jù)宜岛,RemoteInputChannel負(fù)責(zé)從遠(yuǎn)程(其他節(jié)點(diǎn))的Subpartition讀取數(shù)據(jù)长踊。

LocalInputChannelgetNextBuffer方法:

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
    checkError();

    // 獲取requestSubpartition方法得到的subpartitionView 
    ResultSubpartitionView subpartitionView = this.subpartitionView;
    // 如果沒(méi)有獲取到subpartitionView,需要再次檢查subpartitionView
    // 如果此時(shí)另一線程正在調(diào)用requestSubpartition方法萍倡,checkAndWaitForSubpartitionView方法會(huì)被阻塞
    // 等待requestSubpartition執(zhí)行完畢
    if (subpartitionView == null) {
        // There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local
        // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When
        // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or
        // during) it was released during reading the EndOfPartitionEvent (2).
        if (isReleased) {
            return Optional.empty();
        }

        // this can happen if the request for the partition was triggered asynchronously
        // by the time trigger
        // would be good to avoid that, by guaranteeing that the requestPartition() and
        // getNextBuffer() always come from the same thread
        // we could do that by letting the timer insert a special "requesting channel" into the input gate's queue
        subpartitionView = checkAndWaitForSubpartitionView();
    }

    // 獲取緩存數(shù)據(jù)
    BufferAndBacklog next = subpartitionView.getNextBuffer();

    if (next == null) {
        if (subpartitionView.isReleased()) {
            throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
        } else {
            return Optional.empty();
        }
    }

    // 更新已讀取字節(jié)數(shù)
    numBytesIn.inc(next.buffer().getSize());
    // 更新以讀取緩存數(shù)
    numBuffersIn.inc();
    return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}

以上是LocalInputChannelgetNextBuffer方法身弊。下面我們分析下RemoteInputChannelgetNextBuffer方法。該方法和LocalInputChannel不同的是它從receivedBuffers隊(duì)列中獲取buffer列敲,而不是直接從subpartitionView獲取阱佛。

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
    checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
    checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");

    checkError();

    final Buffer next;
    final boolean moreAvailable;

    // 從receivedBuffers隊(duì)列中獲取buffer
    synchronized (receivedBuffers) {
        next = receivedBuffers.poll();
        moreAvailable = !receivedBuffers.isEmpty();
    }

    numBytesIn.inc(next.getSize());
    numBuffersIn.inc();
    return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
}

接下來(lái)大家會(huì)問(wèn),receivedBuffers中的緩存數(shù)據(jù)是什么時(shí)候被加入的呢戴而?答案在onBuffer方法瘫絮。代碼如下:

public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
    // 是否需要回收此buffer
    boolean recycleBuffer = true;

    try {

        final boolean wasEmpty;
        synchronized (receivedBuffers) {
            // Similar to notifyBufferAvailable(), make sure that we never add a buffer
            // after releaseAllResources() released all buffers from receivedBuffers
            // (see above for details).
            // 在releaseAllResources()調(diào)用之后無(wú)法在接收新的buffer
            if (isReleased.get()) {
                return;
            }

            // 檢查sequenceNumber
            if (expectedSequenceNumber != sequenceNumber) {
                onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                return;
            }

            // 判斷添加buffer之前的隊(duì)列是否為空
            wasEmpty = receivedBuffers.isEmpty();
            // 添加緩存數(shù)據(jù)到隊(duì)列中
            receivedBuffers.add(buffer);
            // 已接收到數(shù)據(jù),緩存不需要回收
            recycleBuffer = false;
        }

         // 增加SequenceNumber
        ++expectedSequenceNumber;

        // 如果添加buffer之前的隊(duì)列為空填硕,需要通知對(duì)應(yīng)的inputGate麦萤,現(xiàn)在已經(jīng)有數(shù)據(jù)了(不為空)
        if (wasEmpty) {
            notifyChannelNonEmpty();
        }

        if (backlog >= 0) {
            // 負(fù)責(zé)提前分配buffer
            onSenderBacklog(backlog);
        }
    } finally {
        // 回收buffer
        if (recycleBuffer) {
            buffer.recycleBuffer();
        }
    }
}

這里我們先看一下如何提前分配buffer的邏輯鹿鳖。代碼如下:

void onSenderBacklog(int backlog) throws IOException {
    int numRequestedBuffers = 0;

    // 鎖定bufferQueue
    synchronized (bufferQueue) {
        // Similar to notifyBufferAvailable(), make sure that we never add a buffer
        // after releaseAllResources() released all buffers (see above for details).
        // 避免在releaseAllResources()之后執(zhí)行
        if (isReleased.get()) {
            return;
        }

        // backlog為后續(xù)所需的buffer(積壓數(shù)量)
        // initialCredit為初始預(yù)留的buffer數(shù)量
        numRequiredBuffers = backlog + initialCredit;
        // 如果可用buffer數(shù)小于numRequiredBuffers 
        // 并且不在等待請(qǐng)求浮動(dòng)Buffers的狀態(tài)
        // 需要為bufferQueue增加浮動(dòng)buffer
        while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
            // 申請(qǐng)一個(gè)buffer
            Buffer buffer = inputGate.getBufferPool().requestBuffer();
            if (buffer != null) {
                // 加入浮動(dòng)buffer隊(duì)列
                bufferQueue.addFloatingBuffer(buffer);
                numRequestedBuffers++;
            } else if (inputGate.getBufferProvider().addBufferListener(this)) {
                // If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
                // 如果請(qǐng)求不到buffer(channel沒(méi)有足夠的buffer)
                // 注冊(cè)一個(gè)監(jiān)聽(tīng)器,并且標(biāo)記等待請(qǐng)求浮動(dòng)Buffers的狀態(tài)為true
                isWaitingForFloatingBuffers = true;
                break;
            }
        }
    }

    // 如果本次操作請(qǐng)求的buffer數(shù)量大于0
    // unannouncedCredit為未告知上游生產(chǎn)者的credit壮莹,用于數(shù)據(jù)反壓
    // 如果unannouncedCredit在增加numRequestedBuffers之前的值為0
    // 需要通知上游這里有credit翅帜,可以接收數(shù)據(jù)
    if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
        notifyCreditAvailable();
    }
}

上面分析到如果請(qǐng)求buffer失敗,會(huì)注冊(cè)一個(gè)監(jiān)聽(tīng)器命满。那么當(dāng)監(jiān)聽(tīng)器執(zhí)行到buffer創(chuàng)建成功的時(shí)候執(zhí)行什么方法呢涝滴?我們分析下notifyBufferAvailable方法。

@Override
public NotificationResult notifyBufferAvailable(Buffer buffer) {
    NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
    try {
        synchronized (bufferQueue) {
            // 保證必須在等待浮動(dòng)buffer狀態(tài)
            checkState(isWaitingForFloatingBuffers,
                "This channel should be waiting for floating buffers.");

            // Important: make sure that we never add a buffer after releaseAllResources()
            // released all buffers. Following scenarios exist:
            // 1) releaseAllResources() already released buffers inside bufferQueue
            // -> then isReleased is set correctly
            // 2) releaseAllResources() did not yet release buffers from bufferQueue
            // -> we may or may not have set isReleased yet but will always wait for the
            // lock on bufferQueue to release buffers
            // 確保沒(méi)有release胶台,并且可用buffer數(shù)量小于所需的buffer才執(zhí)行后續(xù)流程
            if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                isWaitingForFloatingBuffers = false;
                return notificationResult;
            }
            // 添加浮動(dòng)buffer
            bufferQueue.addFloatingBuffer(buffer);

            // 如果可用buffer數(shù)量和所需buffer數(shù)量一致歼疮,返回不再需要新的buffer
            if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
                isWaitingForFloatingBuffers = false;
                notificationResult = NotificationResult.BUFFER_USED_NO_NEED_MORE;
            } else {
                // 否則返回仍需新的buffer
                notificationResult = NotificationResult.BUFFER_USED_NEED_MORE;
            }
        }

        // 如果unannouncedCredit在加1之前為0,通知上游诈唬,下游可以接收數(shù)據(jù)
        if (unannouncedCredit.getAndAdd(1) == 0) {
            notifyCreditAvailable();
        }
    } catch (Throwable t) {
        setError(t);
    }
    return notificationResult;
}

下面我們回到onBuffer方法韩脏。繼續(xù)跟蹤,我們發(fā)現(xiàn)onBuffer方法在CreditBasedPartitionRequestClientHandlerdecodeBufferOrEvent方法中調(diào)用铸磅。這個(gè)方法負(fù)責(zé)處理接收到的數(shù)據(jù)赡矢。數(shù)據(jù)的類(lèi)型可能為buffer或者event。代碼如下:

private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
    try {
        ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
        final int receivedSize = nettyBuffer.readableBytes();
        // 如果是buffer
        if (bufferOrEvent.isBuffer()) {
            // ---- Buffer ------------------------------------------------

            // Early return for empty buffers. Otherwise Netty's readBytes() throws an
            // IndexOutOfBoundsException.
            // 如果收到字節(jié)數(shù)為0阅仔,調(diào)用RemoteInputChannel的onEmptyBuffer方法
            if (receivedSize == 0) {
                inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                return;
            }

            // 請(qǐng)求一個(gè)空的buffer
            Buffer buffer = inputChannel.requestBuffer();
            if (buffer != null) {
                // 寫(xiě)入網(wǎng)絡(luò)讀取到的數(shù)據(jù)至buffer中
                nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
                // 設(shè)置壓縮
                buffer.setCompressed(bufferOrEvent.isCompressed);

                // 調(diào)用onBuffer處理方法
                inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
            } else if (inputChannel.isReleased()) {
                // 如果channel已經(jīng)release吹散,調(diào)用取消請(qǐng)求方法
                cancelRequestFor(bufferOrEvent.receiverId);
            } else {
                throw new IllegalStateException("No buffer available in credit-based input channel.");
            }
        } else {
            // 如果是事件(event),創(chuàng)建一個(gè)memSeg 八酒,數(shù)據(jù)為event內(nèi)容
            // 再把它包裹進(jìn)networkBuffer對(duì)象空民,通過(guò)onBuffer方法教給RemoteInputChannel處理
            // ---- Event -------------------------------------------------
            // TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
            byte[] byteArray = new byte[receivedSize];
            nettyBuffer.readBytes(byteArray);

            MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
            Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);

            inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
        }
    } finally {
        // 釋放netty的buffer
        bufferOrEvent.releaseBuffer();
    }
}

繼續(xù)追蹤此方法的調(diào)用鏈到decodeMsg方法。該方法的部分源代碼如下:

private void decodeMsg(Object msg) throws Throwable {
    final Class<?> msgClazz = msg.getClass();

    // ---- Buffer --------------------------------------------------------
    if (msgClazz == NettyMessage.BufferResponse.class) {
        NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

        // 獲取接收此buffer的input channel
        RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
        if (inputChannel == null) {
            bufferOrEvent.releaseBuffer();

            cancelRequestFor(bufferOrEvent.receiverId);

            return;
        }

        // 調(diào)用decodeBufferOrEvent方法
        decodeBufferOrEvent(inputChannel, bufferOrEvent);

    } else if (msgClazz == NettyMessage.ErrorResponse.class) {
        // ---- Error ---------------------------------------------------------
        // 剩余代碼省略
    } else {
        throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
    }
}

繼續(xù)追蹤羞迷,我們到netty框架的channelRead方法界轩。代碼如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        decodeMsg(msg);
    } catch (Throwable t) {
        notifyAllChannelsOfErrorAndClose(t);
    }
}

到此,整個(gè)數(shù)據(jù)的讀取流程分析完畢闭树。

本博客為作者原創(chuàng)耸棒,歡迎大家參與討論和批評(píng)指正荒澡。如需轉(zhuǎn)載請(qǐng)注明出處报辱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市单山,隨后出現(xiàn)的幾起案子碍现,更是在濱河造成了極大的恐慌,老刑警劉巖米奸,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昼接,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡悴晰,警方通過(guò)查閱死者的電腦和手機(jī)慢睡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)逐工,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人漂辐,你說(shuō)我怎么就攤上這事泪喊。” “怎么了髓涯?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵袒啼,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我纬纪,道長(zhǎng)蚓再,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任包各,我火速辦了婚禮摘仅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘髓棋。我一直安慰自己实檀,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布按声。 她就那樣靜靜地躺著膳犹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪签则。 梳的紋絲不亂的頭發(fā)上须床,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音渐裂,去河邊找鬼豺旬。 笑死,一個(gè)胖子當(dāng)著我的面吹牛柒凉,可吹牛的內(nèi)容都是我干的族阅。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼膝捞,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼坦刀!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起蔬咬,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤鲤遥,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后林艘,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體盖奈,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年狐援,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了钢坦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片究孕。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖爹凹,靈堂內(nèi)的尸體忽然破棺而出蚊俺,到底是詐尸還是另有隱情,我是刑警寧澤逛万,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布泳猬,位于F島的核電站,受9級(jí)特大地震影響宇植,放射性物質(zhì)發(fā)生泄漏得封。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一指郁、第九天 我趴在偏房一處隱蔽的房頂上張望忙上。 院中可真熱鬧,春花似錦闲坎、人聲如沸疫粥。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)梗逮。三九已至,卻和暖如春绣溜,著一層夾襖步出監(jiān)牢的瞬間慷彤,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工怖喻, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留底哗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓锚沸,卻偏偏與公主長(zhǎng)得像跋选,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子哗蜈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 簡(jiǎn)介 Java NIO 是由 Java 1.4 引進(jìn)的異步 IO.Java NIO 由以下幾個(gè)核心部分組成: Ch...
    永順閱讀 1,786評(píng)論 0 15
  • flink內(nèi)部通信機(jī)制 Operator間的數(shù)據(jù)傳遞本地線程數(shù)據(jù)傳遞遠(yuǎn)程線程數(shù)據(jù)傳遞同一線程的Operator數(shù)據(jù)...
    余楚倩閱讀 9,114評(píng)論 0 24
  • Java NIO(New IO)是從Java 1.4版本開(kāi)始引入的一個(gè)新的IO API前标,可以替代標(biāo)準(zhǔn)的Java I...
    JackChen1024閱讀 7,546評(píng)論 1 143
  • Flink源碼分析系列文檔目錄 請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄[https://www.jianshu....
    AlienPaul閱讀 2,897評(píng)論 0 1
  • 本文是先介紹 Flink,再說(shuō) Flink的過(guò)去和現(xiàn)在 一恬叹、Flink介紹 Flink是一款分布式的計(jì)算引擎候生,它可...
    生活的探路者閱讀 1,272評(píng)論 0 22