Flink數(shù)據(jù)傳輸源碼及原理

Flink數(shù)據(jù)交換源碼及其原理

前言

對(duì)于里面內(nèi)容我們需要一定的netty只是,在涉及網(wǎng)絡(luò)交互的時(shí)候是基于netty的,所以不了解netty可能讓我們難以理解其中邏輯

對(duì)于一些相關(guān)組件我們提前介紹一下,后面會(huì)常提到

ResultPartition: 算子的數(shù)據(jù)會(huì)寫入ResultPartition,包含多個(gè)ResultSubPartition

ResultSubPartition : 每個(gè)task消費(fèi)一個(gè)ResultSubPartition,task不會(huì)只消費(fèi)一個(gè)ResultSubPartition

InputGate : 對(duì)數(shù)據(jù)輸入的封裝,Gate(翻譯 : 門)就很容易理解,其實(shí)InputGate主要還是inputChannel的封裝,InputGate中包含多個(gè)InputChannel

InputChannel : 實(shí)際工作的內(nèi)容,分為本地和遠(yuǎn)程,每一個(gè)InputChannel接收一個(gè)ResultSubPartition輸出

Task構(gòu)建簡(jiǎn)單流程

我們從構(gòu)建出來(lái)Task之后開始讀取數(shù)據(jù)發(fā)送數(shù)據(jù)的流程,今天聊一聊Task是如何傳遞數(shù)據(jù),這里是討論數(shù)據(jù)交互前所需要的組件構(gòu)建的過(guò)程以及交互過(guò)程

在chain中直接調(diào)用下游算子的processElement方法即可,如果是taskManger和跨網(wǎng)絡(luò)中,會(huì)對(duì)數(shù)據(jù)進(jìn)行序列化以及反序列寫入到buffer(buffer包含一個(gè)MemorySegment)中,會(huì)通過(guò)bufferBuilder來(lái)講數(shù)據(jù)寫入到MemorySegment中,與BufferBuilder想對(duì)應(yīng)的時(shí)候BufferConsumer位于下游task,負(fù)責(zé)讀取MemorySegment的數(shù)據(jù),一個(gè)bufferBuilder對(duì)應(yīng)一個(gè)BufferConsumer

// 這是flink調(diào)度任務(wù)的一個(gè)具體執(zhí)行類,繼承runnable,表示一個(gè)task由一個(gè)線程執(zhí)行
public  class  Task implements 
                                 Runnable,TaskSlotPayload,TaskActions,
                                     PartitionProducerStateProvider,
                   CheckpointListener,BackPressureSampleableTask{    }

    // Task.run,可以看到直接調(diào)用doRun方法,doRun方法也是主要邏輯存在的地方
  public void run() {
        try {
            doRun();
        } finally {
            terminationFuture.complete(executionState);
        }
    }

  // 這里主要保留了 最主要的幾行代碼,其他的都刪掉了,這里通過(guò)invokable調(diào)用invoke方法來(lái)真正的啟動(dòng)任務(wù)
    // AbstractInvokable 一些實(shí)現(xiàn)類 StreamTask DataSrouceTask等.
    // 我們通過(guò)api的方式source生成SourceStreamTask,其他算子根據(jù)不同的輸入生成不同的StreamTask
private void doRun() {
                    AbstractInvokable invokable = null;
            // 構(gòu)建AbstractInvokable,實(shí)際上就是StreamTask,當(dāng)然我說(shuō)的是我這里的栗子,不同的模式可能會(huì)創(chuàng)建不同的AbstractInvokable,里面通過(guò)具體nameOfInvokableClass全限定類名來(lái)反射構(gòu)建具體的AbstractInvokable對(duì)象,比如這里構(gòu)建的是OneInputStreamTask
            invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
            this.invokable = invokable;
            // 調(diào)用invoke開始執(zhí)行
            invokable.invoke();
    }


// 在進(jìn)入invoke的方法前,我們看看AbstractInvokable是如何構(gòu)建出來(lái)的,以O(shè)neInputStreamTask為栗子
// 這里通過(guò)反射的方式構(gòu)建了傳入的Environment
  public OneInputStreamTask(Environment env) throws Exception {
            // 調(diào)用父類構(gòu)造
        super(env);
    }
// 父類構(gòu)造
protected StreamTask(Environment env) throws Exception {
        this(env, null);
    }
// 層層調(diào)用在這里注意一點(diǎn)
protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor)
            throws Exception {
        this(
                environment,
                timerService,
                uncaughtExceptionHandler,
                actionExecutor,
                    // 創(chuàng)建了一個(gè)Mailbox對(duì)象,后面task的數(shù)據(jù)接收和發(fā)送都依賴于該對(duì)象,并將構(gòu)建當(dāng)前線程作為參數(shù)傳入
                    // 表示當(dāng)前線程才可以讀取mail --- 畢竟這是屬于你的郵件 
                new TaskMailboxImpl(Thread.currentThread()));
    }

// 最終執(zhí)行到該構(gòu)造方法中
// 只標(biāo)注重點(diǎn)內(nèi)容
  protected StreamTask(
            Environment environment,
            @Nullable TimerService timerService,
            Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
            StreamTaskActionExecutor actionExecutor,
            TaskMailbox mailbox)
            throws Exception {
                // 調(diào)用父類構(gòu)造即AbstractInvokable
        super(environment);

        this.configuration = new StreamConfig(getTaskConfiguration());
    
            // TODO 重點(diǎn) 后面需要
            // private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
            // 構(gòu)建recordWriter,實(shí)際構(gòu)建的是RecordWriterDelegate,通過(guò)getRecordWriter方法獲取具體的recordWriter
            // 下面介紹怎么構(gòu)建出來(lái)recordWriter的
        this.recordWriter = createRecordWriterDelegate(configuration, environment);
    
            // 處理基于mailbox之外的一些動(dòng)作,比如發(fā)送事件等動(dòng)作
        this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
    
            // TODO 重點(diǎn)
            // 構(gòu)建mailbox處理器,用于處理mailbox收到的mail信息
            // 參數(shù)1:MailboxDefaultAction實(shí)現(xiàn)類,由lambda表達(dá)式方式編寫,即調(diào)用當(dāng)前類的processInput方法(即OneInputStreamTask)
            // 參數(shù)2:mailbox   --- 處理郵箱,你首先得有一個(gè)郵箱對(duì)吧
            // 參數(shù)3:處理郵箱的線程
        this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
        this.mailboxProcessor.initMetric(environment.getMetricGroup());
            // 用于執(zhí)行mail的executor,類似于java的線程池,但不醫(yī)院
        this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
        this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
            // 異步處理snapshot的線程,嘿嘿嘿
        this.asyncOperationsThreadPool =
                Executors.newCachedThreadPool(
                        new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));

        this.stateBackend = createStateBackend();
                // task處理checkpoint的conordinator
        this.subtaskCheckpointCoordinator =
                new SubtaskCheckpointCoordinatorImpl(
                        stateBackend.createCheckpointStorage(getEnvironment().getJobID()),
                        getName(),
                        actionExecutor,
                        getCancelables(),
                        getAsyncOperationsThreadPool(),
                        getEnvironment(),
                        this,
                        configuration.isUnalignedCheckpointsEnabled(),
                        this::prepareInputSnapshot);

        if (timerService == null) {
            ThreadFactory timerThreadFactory =
                    new DispatcherThreadFactory(
                            TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
            this.timerService =
                    new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
        } else {
            this.timerService = timerService;
        }
                
            // 用于處理io的線程,比如恢復(fù)state
        this.channelIOExecutor =
                Executors.newSingleThreadExecutor(
                        new ExecutorThreadFactory("channel-state-unspilling"));

        injectChannelStateWriterIntoChannels();
    }

// 在上面我們已經(jīng)了解了streamTask的構(gòu)造方法,現(xiàn)在我們把recordWirter構(gòu)建的過(guò)程介紹一下
// StreamTask.createRecordWriterDelegate
    public static <OUT>
            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
                    createRecordWriterDelegate(
                            StreamConfig configuration, Environment environment) {
        List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
                // 構(gòu)建task對(duì)應(yīng)的writers,通常是1個(gè)
                createRecordWriters(configuration, environment);
        // 判斷writer的輸出數(shù)量,一般情況下就是1 也就是構(gòu)建的都是SingleRecordWriter
        if (recordWrites.size() == 1) {
            // 直接獲取對(duì)其包裝一下即可
            return new SingleRecordWriter<>(recordWrites.get(0));
        } else if (recordWrites.size() == 0) {
            return new NonRecordWriter<>();
        } else {
            // 比如
            // 在下面中,map生成的時(shí)候就會(huì)生成MultipleRecordWriters
            // DataStreamSource<Object> source =....'
            // SingleOutputStreamOperator<U> map =source.map;
            // source.connect(map);
            return new MultipleRecordWriters<>(recordWrites);
        }
    }

// 構(gòu)建task對(duì)應(yīng)的recordWriter
// StreamTask.createRecordWriters
private static <OUT>
  List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
  StreamConfig configuration, Environment environment) {
  List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
    new ArrayList<>();
  // 找到operator的所有出邊
  List<StreamEdge> outEdgesInOrder =
    configuration.getOutEdgesInOrder(
    environment.getUserCodeClassLoader().asClassLoader());
    
  // 遍歷出邊集合,為每一個(gè)出邊構(gòu)建一個(gè)recordWriter
  for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge edge = outEdgesInOrder.get(i);
    recordWriters.add(
      // 構(gòu)建recordWriter
      // 具體不看了,簡(jiǎn)單介紹一下
      // 獲取task對(duì)應(yīng)的partitioner,獲取對(duì)應(yīng)的ResultPartitionWriter,然后通過(guò)構(gòu)造者模式構(gòu)建RecordWriter
      // 在內(nèi)部會(huì)判斷是不是廣播的,如果是廣播的則構(gòu)建BroadcastRecordWriter,否則是ChannelSelectorRecordWriter(基本都是他)
      // ChannelSelectorRecordWriter的構(gòu)造過(guò)程中最終要的兩個(gè)參數(shù)ResultPartitionWriter, ChannelSelector
      createRecordWriter(
        edge,
        i,
        environment,
        environment.getTaskInfo().getTaskName(),
        edge.getBufferTimeout()));
  }
  return recordWriters;
}


在上面我們已經(jīng)了解到了task的構(gòu)建和recordWriter的構(gòu)建了,現(xiàn)在我們要進(jìn)入invoke方法,來(lái)看看怎么執(zhí)行的了,當(dāng)invoke調(diào)用的時(shí)候,Task會(huì)做一些執(zhí)行前的準(zhǔn)備工作,然后真正的開始調(diào)用userFunction讀取數(shù)據(jù)發(fā)送數(shù)據(jù)的過(guò)程了,那么我們現(xiàn)在開始通過(guò)StreamTask來(lái)看內(nèi)部如何實(shí)現(xiàn)的

// StreamTask是抽象類
// StreamTask.invoke方法
@Override
public final void invoke() throws Exception {
    try {
        // 最主要的兩個(gè)方法
      
        // 在里面會(huì)初始化 input和output,來(lái)明確數(shù)據(jù)的輸入和輸出
        // 通過(guò)不同的輸入輸出來(lái)確定數(shù)據(jù)的交互方式,比如線程內(nèi),taskManager的task之間或者基于網(wǎng)絡(luò)傳輸
        beforeInvoke();
        
        // 運(yùn)行mail,開始持續(xù)的讀取數(shù)據(jù)發(fā)送下游,后面看
        runMailboxLoop();
    }
  
    // 不重要代碼去掉了
  protected void beforeInvoke() throws Exception {
            // 重點(diǎn)
            // 構(gòu)建operatorChain,我們需要深入去看,        recordWriter在構(gòu)造方法是構(gòu)建的,前面已經(jīng)講解
        operatorChain = new OperatorChain<>(this, recordWriter);
            // 即運(yùn)行在task里的具體的operator
        mainOperator = operatorChain.getMainOperator();
                
            // 做一些task的初始化工作
        actionExecutor.runThrowing(() -> {
                    // 讀取state的reader
                    SequentialChannelStateReader reader =
                            getEnvironment().getTaskStateManager()
                                    .getSequentialChannelStateReader();
                    reader.readOutputData(getEnvironment().getAllWriters(), false);
                                        // operator初始化,會(huì)調(diào)用的算子的open方法
                    operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
                                        // 讀取state數(shù)據(jù)
                    channelIOExecutor.execute(() -> {
                                try {
                                    reader.readInputData(getEnvironment().getAllInputGates());
                                } catch (Exception e) {
                                    asyncExceptionHandler.handleAsyncException(
                                            "Unable to read channel state", e);
                                }
                            });
                });
        isRunning = true;
    }
  

  // beforeInvoke的最終要一點(diǎn)就是構(gòu)建的OperatorChain,我們一起深入看看operatorChain的構(gòu)造方法
  // 代碼很多,由于我們只關(guān)注數(shù)據(jù)交換的內(nèi)容,其他的地方不做講解
  public OperatorChain(
    StreamTask<OUT, OP> containingTask,
    RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {

    this.operatorEventDispatcher =
      new OperatorEventDispatcherImpl(
      containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
      containingTask.getEnvironment().getOperatorCoordinatorEventGateway());

    final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
    final StreamConfig configuration = containingTask.getConfiguration();

    StreamOperatorFactory<OUT> operatorFactory =
      configuration.getStreamOperatorFactory(userCodeClassloader);

    // we read the chained configs, and the order of record writer registrations by output name
    Map<Integer, StreamConfig> chainedConfigs =
      configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);

    // create the final output stream writers
    // we iterate through all the out edges from this job vertex and create a stream output
    List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
    Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
      new HashMap<>(outEdgesInOrder.size());
    this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];

    // from here on, we need to make sure that the output writers are shut down again on failure
    boolean success = false;
    try {
      createChainOutputs(
        outEdgesInOrder,
        recordWriterDelegate,
        chainedConfigs,
        containingTask,
        streamOutputMap);

      // we create the chain of operators and grab the collector that leads into the chain
      List<StreamOperatorWrapper<?, ?>> allOpWrappers =
        new ArrayList<>(chainedConfigs.size());
      this.mainOperatorOutput =
            // 重點(diǎn)
            // 構(gòu)建output,即數(shù)據(jù)的輸出
        createOutputCollector(
        containingTask,
        configuration,
        chainedConfigs,
        userCodeClassloader,
        streamOutputMap,
        allOpWrappers,
        containingTask.getMailboxExecutorFactory());

      if (operatorFactory != null) {
        Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
          StreamOperatorFactoryUtil.createOperator(
          operatorFactory,
          containingTask,
          configuration,
          mainOperatorOutput,
          operatorEventDispatcher);

        OP mainOperator = mainOperatorAndTimeService.f0;
        mainOperator
          .getMetricGroup()
          .gauge(
          MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
          mainOperatorOutput.getWatermarkGauge());
        this.mainOperatorWrapper =
          createOperatorWrapper(
          mainOperator,
          containingTask,
          configuration,
          mainOperatorAndTimeService.f1,
          true);

        // add main operator to end of chain
        allOpWrappers.add(mainOperatorWrapper);

        this.tailOperatorWrapper = allOpWrappers.get(0);
      } else {
        checkState(allOpWrappers.size() == 0);
        this.mainOperatorWrapper = null;
        this.tailOperatorWrapper = null;
      }

      this.chainedSources =
        createChainedSources(
        containingTask,
        configuration.getInputs(userCodeClassloader),
        chainedConfigs,
        userCodeClassloader,
        allOpWrappers);

      this.numOperators = allOpWrappers.size();

      firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);

      success = true;
    } finally {
      // make sure we clean up after ourselves in case of a failure after acquiring
      // the first resources
      if (!success) {
        for (RecordWriterOutput<?> output : this.streamOutputs) {
          if (output != null) {
            output.close();
          }
        }
      }
    }
  }

 // 在這里面構(gòu)建output
 // OperatorChain.createOutputCollector
    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
            StreamTask<?, ?> containingTask,
            StreamConfig operatorConfig,
            Map<Integer, StreamConfig> chainedConfigs,
            ClassLoader userCodeClassloader,
            Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
            List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
            MailboxExecutorFactory mailboxExecutorFactory) {
        List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
                new ArrayList<>(4);

        // 當(dāng)調(diào)用了startNewChain或進(jìn)行chain之后會(huì)進(jìn)入這里,構(gòu)建非chain的output
        // 這里的需要跨網(wǎng)絡(luò)傳輸或者線程之間傳輸
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

        // 默認(rèn)情況下開啟chain,都會(huì)進(jìn)入到該邏輯,生成對(duì)應(yīng)的output
        // 開啟chain會(huì)進(jìn)入到這個(gè)循環(huán),這個(gè)循環(huán)里構(gòu)建的output都是在一個(gè)task中,即一個(gè)線程中執(zhí)行
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
                        /*
            這里簡(jiǎn)單介紹一下這個(gè)里的遞歸操作
                假設(shè) : chain1 =>  source->map1->map2   | chain2 flatMap->sink
                          我們跟蹤chain1的構(gòu)建過(guò)程
                當(dāng)構(gòu)建operatorChain的output時(shí)候,首先構(gòu)建source的output,由于會(huì)進(jìn)入chained的循環(huán),開始遞歸,進(jìn)入map1的遞歸,
                此時(shí)map1的也會(huì)進(jìn)入chained的循環(huán),繼續(xù)遞歸,此時(shí)構(gòu)建map2的output,由于map2后面是非chained(因?yàn)槭切碌腸hain)一
                起的,所以此時(shí)map2進(jìn)入進(jìn)入nonChained的循環(huán),構(gòu)建出需要網(wǎng)絡(luò)的傳輸?shù)膐utput,構(gòu)建完成map2的output,開始彈棧,對(duì)之
                前入棧的chained的構(gòu)建output的方法,開始完成遞歸的構(gòu)造output
                
 結(jié)果 :    
 調(diào)用 :    input.processElement()        input.processElement()      (包裝了RecordWriterOuput)序列化發(fā)送buffer
chain1 :  source(CopyingChainingOutput)->map1(CopyingChainingOutput)->map2(CountingOuput(RecordWriterOuput)) 
            */
            WatermarkGaugeExposingOutput<StreamRecord<T>> output =
                        // 這個(gè)方法會(huì)遞歸的調(diào)用createOutputCollector(當(dāng)前方法)方法
                        // 因?yàn)樾枰獙?duì)chain在一起的每個(gè)operator創(chuàng)建一個(gè)對(duì)應(yīng)的output
                        // 具體構(gòu)建代碼我們就不看了,簡(jiǎn)單介紹一下
                        // 內(nèi)部條件判斷,是否開啟了reuse,如果開啟了則創(chuàng)建ChainingOutput,否則創(chuàng)建CopyingChainingOutput
                        // CopyingChainingOutput(通過(guò)StreamRecord.copy創(chuàng)建一個(gè)新的)和ChainingOutput(復(fù)用StreamRecord)
                    createOperatorChain(
                            containingTask,
                            chainedOpConfig,
                            chainedConfigs,
                            userCodeClassloader,
                            streamOutputs,
                            allOperatorWrappers,
                            outputEdge.getOutputTag(),
                            mailboxExecutorFactory);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }
                
      // 下面進(jìn)行返回,如果上面構(gòu)建了output,基本都會(huì)進(jìn)入該邏輯,如果廣播/sink則會(huì)進(jìn)行下面的邏輯
        if (allOutputs.size() == 1) {
            return allOutputs.get(0).f0;
        } else {
         
            Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); i++) {
                asArray[i] = allOutputs.get(i).f0;
            }
                        // 如果是sink asArray一定是0,所以下面的collector不會(huì)輸出任何結(jié)果,嘿嘿嘿
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                return closer.register(new CopyingBroadcastingOutputCollector<>(asArray, this));
            } else {
                return closer.register(new BroadcastingOutputCollector<>(asArray, this));
            }
        }
    }




通過(guò)上面我們已經(jīng)了解到了數(shù)據(jù)傳輸所需要的相關(guān)組件,現(xiàn)在根據(jù)不同的交互方式來(lái)觀察一下數(shù)據(jù)的走向

一. 線程內(nèi)數(shù)據(jù)傳遞

在上面我們看到了構(gòu)建output的過(guò)程,那么數(shù)據(jù)發(fā)送到下游就是通過(guò)output來(lái)完成,在上面了解都開啟operatorChain生成output是ChainingOutput,否則創(chuàng)建CopyingChainingOutput,那我們就看看不復(fù)用streamRecord的CopyingChainingOutput

// 未開啟reuse的output,這里面內(nèi)容比較簡(jiǎn)單
final class CopyingChainingOutput<T> extends ChainingOutput<T> {

    private final TypeSerializer<T> serializer;

    public CopyingChainingOutput(
            OneInputStreamOperator<T, ?> operator,
            TypeSerializer<T> serializer, // 用于序列化的
            OutputTag<T> outputTag,
            StreamStatusProvider streamStatusProvider) {
        // 調(diào)用父類的構(gòu)造即ChainingOutput,operator復(fù)制給成員變量input
        // 注意 : input對(duì)應(yīng)的就是下游算子
        //    這么理解 :   op1 -> output -> input -> op2 -> output
        super(operator, streamStatusProvider, outputTag);
        this.serializer = serializer;
    }
        
    // collector.collect方法,畢竟是子類,想想我們?cè)谒阕又?向下游發(fā)送數(shù)據(jù),比如process中也是通過(guò)out.collect方法
    @Override
    public void collect(StreamRecord<T> record) {
        if (this.outputTag != null) {
            // we are not responsible for emitting to the main output.
            return;
        }
                // 具體的發(fā)送到下游的方法
        pushToOperator(record);
    }

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
            // we are not responsible for emitting to the side-output specified by this
            // OutputTag.
            return;
        }

        pushToOperator(record);
    }

    @Override
    protected <X> void pushToOperator(StreamRecord<X> record) {
        try {
            StreamRecord<T> castRecord = (StreamRecord<T>) record;
            // metric內(nèi)容
            numRecordsIn.inc();
            // 調(diào)用record的copy方法獲取一個(gè)新的record,里面是new了一個(gè)新的record
            // 這里調(diào)用了序列化的方法,嘿嘿 后面想起來(lái)了就一起看看
            StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
            input.setKeyContextElement(copy);
            // 直接調(diào)用下游的processElement方法,所以不需要對(duì)數(shù)據(jù)進(jìn)行序列化和反序列
            input.processElement(copy);
        } catch (ClassCastException e) {
            if (outputTag != null) {
                // Enrich error message
                ClassCastException replace =
                        new ClassCastException(
                                String.format("%s. Failed to push OutputTag with id '%s' to operator. "+ "This                                      can occur when multiple OutputTags with different types "+ "but identical names are being used.",
                                              e.getMessage(), outputTag.getId()));

                throw new ExceptionInChainedOperatorException(replace);
            } else {
                throw new ExceptionInChainedOperatorException(e);
            }
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
}

// map算子的包裝類,在同一個(gè)chain中,會(huì)直接調(diào)用算子的processElement方法
// StreamMap.processElement
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
  // 接收到record后直接調(diào)用userFunction的map方法,然后將結(jié)果發(fā)送下游
  output.collect(element.replace(userFunction.map(element.getValue())));
}

可以看到對(duì)于在operatorChain中數(shù)據(jù)傳遞就是直接調(diào)用下游的processElement方法,當(dāng)位于operatorChain最后一個(gè)的operator,他會(huì)持有一個(gè)RecordWriterOutput,將數(shù)據(jù)發(fā)送到下一個(gè)task(chained/operator),對(duì)于數(shù)據(jù)交互,我們分別通過(guò)數(shù)據(jù)的輸入和數(shù)據(jù)的輸出來(lái)進(jìn)行講解

二. TaskManager內(nèi)Task數(shù)據(jù)傳遞

雖然Task都在一個(gè)JVM進(jìn)程內(nèi),但是數(shù)據(jù)在傳遞的過(guò)程中也是需要進(jìn)行序列化和反序列化

在同一個(gè)TaskManger內(nèi)Task傳遞數(shù)據(jù)主要通過(guò)一下幾個(gè)組件完成的,ResultSubPartition,InputGate,InputChannel ,BufferBuilder
流程 :

上游算子持續(xù)寫入數(shù)據(jù),將數(shù)據(jù)寫入到ResultSubPartition中,ResultSubPartition接收到數(shù)據(jù)會(huì)通知InputGate,InputGate將InputChannel加入的可用的Channel隊(duì)列中,下游算子通過(guò)channel.pollNext()讀取數(shù)據(jù)進(jìn)行消費(fèi)交給用戶的代碼執(zhí)行,對(duì)于簡(jiǎn)述的流程我們?cè)谙旅鏁?huì)看到對(duì)應(yīng)實(shí)現(xiàn)

1.數(shù)據(jù)輸出

接上上面,在chain中,一層一層調(diào)用下游的processElement方法,直到chain的最后一個(gè)operator的output是RecordWriterOutput,該operator通過(guò)該output發(fā)送到下游(當(dāng)然不是真的發(fā)送到下游,而是發(fā)送到subpartition中)

// 直接進(jìn)入源碼,看看數(shù)據(jù)到底是怎么發(fā)送的

public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

  public RecordWriterOutput(
    RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
    TypeSerializer<OUT> outSerializer,
    OutputTag outputTag,
    StreamStatusProvider streamStatusProvider,
    boolean supportsUnalignedCheckpoints) {
    // 真正干活的
    // flink在stream中數(shù)據(jù)傳遞的抽象通過(guò)record表示,該writer用于將數(shù)據(jù)序列化并寫入的subPartitin中
    this.recordWriter =(RecordWriter<SerializationDelegate<StreamElement>>) (RecordWriter<?>) recordWriter;
    // 數(shù)據(jù)序列化的包裝器,里面包裝了數(shù)據(jù)類型的序列化器
    TypeSerializer<StreamElement> outRecordSerializer = new StreamElementSerializer<>(outSerializer);
    if (outSerializer != null) {
      serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
    }
    // 忽略非重點(diǎn)....
  }

  @Override
  public void collect(StreamRecord<OUT> record) {
    if (this.outputTag != null) {
      return;
    }
    // 真正干活的方法
    pushToRecordWriter(record);
  }

  private <X> void pushToRecordWriter(StreamRecord<X> record) {
    // 將數(shù)據(jù)設(shè)置到序列化包裝器的instance中
    serializationDelegate.setInstance(record);
    try {
      // 通過(guò)recordWriter將包裝器發(fā)送 
      // recordWriter的實(shí)現(xiàn)類是ChannelSelectorRecordWriter
      recordWriter.emit(serializationDelegate);
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage(), e);
    }
  }
  // 一些事件相關(guān)的發(fā)送邏輯此處省略............
}



// --------------------------------------------------------------------------------------------

// 簡(jiǎn)單先了解一下ChannelSelector,主要返回一個(gè)int值,該值代表的是數(shù)據(jù)發(fā)送下游的具體channel的索引
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
  @Override
  public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
    // 一個(gè)輪訓(xùn)的算法
    nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
    return nextChannelToSendTo;
  }
}

// 通過(guò)上面的邏輯我們可以看到,collect方法最終調(diào)用了recordWriter.emit()方法進(jìn)行發(fā)送數(shù)據(jù),那么我們進(jìn)入具體的實(shí)現(xiàn)類看看細(xì)節(jié)

// 顧名思義,通過(guò)名字我們大致可以猜到,對(duì)于record的發(fā)送,我們需要選擇發(fā)送到對(duì)應(yīng)的channel中
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {

  @Override
  public void emit(T record) throws IOException {
    // 方法負(fù)載,調(diào)用父類emit方法
    // 通過(guò)selectorChannel方法計(jì)算要發(fā)送到的具體channel的索引.
    // 這其實(shí)就是數(shù)據(jù)的分發(fā)策略了,比如shuffle,keyby等,表示數(shù)據(jù)的發(fā)送策略,發(fā)送哪個(gè)channel
    emit(record, channelSelector.selectChannel(record));
  }

  // RecordWriter.emit()
  protected void emit(T record, int targetSubpartition) throws IOException {
    // targetPartition = PipelinedResultPartition
    // serializeRecord方法傳入序列化器和record,將record使用指定的序列化器進(jìn)行序列化會(huì)返回一個(gè)java的ByteBuffer
    // 通過(guò)emitRecord方法發(fā)送數(shù)據(jù)
    targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);

    // 數(shù)據(jù)傳輸中,肯定是一次發(fā)送批量性能會(huì)更高,所以在flink中也會(huì)以這種形式發(fā)送數(shù)據(jù)
    // 在父類中RecordWriter有個(gè)內(nèi)部類(OutputFlusher),繼承Thread并重寫run方法,在父類構(gòu)造方法中構(gòu)建此對(duì)象,
    // 如果傳入的timeout>0則創(chuàng)建OutputFlusher對(duì)象否則不創(chuàng)建直接走下面判斷邏輯,在run方法中會(huì)循環(huán)調(diào)用
    // Thread.sleep(timeout)進(jìn)行睡眠,當(dāng)睡眠結(jié)束后調(diào)用flushAll()
    // flushAlways=(timeout==0)的時(shí)候,每接收一條數(shù)據(jù)發(fā)送一條數(shù)據(jù)
    // timeout時(shí)間配置 DataStream.setBufferTimeout();
    if (flushAlways) {

      targetPartition.flush(targetSubpartition);
    }
  }
}


// 進(jìn)入 BufferWritingResultPartition.emitRecord()
// 這個(gè)用于將數(shù)據(jù)放入buffer中在resultPartitin中,有一個(gè)localBufferPool,緩存這可用的buffer,
// 當(dāng)buffer不可用的時(shí)候請(qǐng)求全局的networkBufferPool來(lái)申請(qǐng)可用的buf
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
  // 通過(guò)appendUnicastDataForNewRecord方法申請(qǐng)buffer,具體代碼不看了,有興趣自己研究一下
  // 簡(jiǎn)單介紹一下
  // 內(nèi)部通過(guò)LocalBuuferPool來(lái)申請(qǐng)buffer,在申請(qǐng)buffer時(shí)候如果local的用完,則申請(qǐng)network的buffer
  // 如果network的buffer申請(qǐng)超過(guò)限制或者network沒(méi)有可用buffer則會(huì)阻塞等待,直到有可用buffer,聯(lián)想一下`反壓`
  // 當(dāng)申請(qǐng)到buffer之后會(huì)構(gòu)建一個(gè)BufferBuilder,然后調(diào)用對(duì)應(yīng)的targetSubpartition的add方法傳入一個(gè)
  // BufferConsumer(通過(guò)buffer內(nèi)部方法構(gòu)建的),在add方法中會(huì)調(diào)用notifyDataAvailable()方法,最終會(huì)調(diào)用
  // 到channel的notifyDataAvailable()方法開始觸發(fā)消費(fèi)
  // 上面走完后,將record寫入的buffer中
  // 該方法我們?cè)跀?shù)據(jù)輸入的時(shí)候進(jìn)行講解
  BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);

  // 對(duì)于上面的數(shù)據(jù)寫入到buffer后,可能會(huì)出現(xiàn)下面三種情況,每種情況對(duì)應(yīng)不同的處理方式
  
  // buffer滿了,但是record之存入了一部分
  while (record.hasRemaining()) {
    // full buffer, partial record
    // 將buffer進(jìn)行finish,表示buffer不可在寫入
    finishUnicastBufferBuilder(targetSubpartition);
    // 重親申請(qǐng)一個(gè)buffer,寫入剩余的數(shù)據(jù),直到退出循環(huán)
    buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
  }
    
  // buffer滿了,record也全部存入了
  if (buffer.isFull()) {
    // full buffer, full record
    // 將buffer進(jìn)行finish,表示buffer不可在寫入
    finishUnicastBufferBuilder(targetSubpartition);
  }
    
  // buffer沒(méi)滿,但record全部存入了
  // 什么也不做,只要record是完整的存入就可以了
  // partial buffer, full record
}

// 到此record已經(jīng)寫入到buffer,等待下游去消費(fèi)數(shù)據(jù)了

2.數(shù)據(jù)輸入

// 對(duì)于上面,我們已經(jīng)了解到數(shù)據(jù)是如何寫入到buffer(`在SubPartition中`)中的了,

// 那么我們接著上面是如何通知下游去觸發(fā)消費(fèi)的

// 該方法在emitRecord()調(diào)用
// BufferWritingResultPartition.appendUnicastDataForNewRecord()
private BufferBuilder appendUnicastDataForNewRecord(
  final ByteBuffer record, final int targetSubpartition) throws IOException {
  // 獲取bufferBuilder,當(dāng)數(shù)據(jù)寫完之后沒(méi)有寫滿buffer
  BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
  // 如果等于null則說(shuō)明已經(jīng)寫滿發(fā)送了,或者剛初始化的時(shí)候
  if (buffer == null) {
    buffer = requestNewUnicastBufferBuilder(targetSubpartition);
    // 通知數(shù)據(jù)可用的方法在add中被調(diào)用
    // 添加buffer消費(fèi)者,                                                     構(gòu)建一個(gè)buffer消費(fèi)者,  0 表示要跳過(guò)的數(shù)據(jù)長(zhǎng)度
    subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
  }
  // 追加的形式寫入數(shù)據(jù)
  buffer.appendAndCommit(record);
  return buffer;
}

// PipelinedSubpartition.add()
@Override
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) {
  return add(bufferConsumer, partialRecordLength, false);
}
// 繼續(xù)走
private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
  checkNotNull(bufferConsumer);
  synchronized (buffers) {
    // 構(gòu)建一個(gè)BufferConsumerWithPartialRecordLength添加到buffers中,在后面會(huì)用到
    if (addBuffer(bufferConsumer, partialRecordLength)) {
                prioritySequenceNumber = sequenceNumber;
            }
  }
  //    其他內(nèi)容省略,不是終點(diǎn),通過(guò)notifyDataAvailable()方法,來(lái)通知數(shù)據(jù)可用,可以被消費(fèi)了
  if (notifyDataAvailable) {
    notifyDataAvailable();
  }
  return true;
}
// 下面就是一長(zhǎng)串的調(diào)用
private void notifyDataAvailable() {
  final PipelinedSubpartitionView readView = this.readView;
  if (readView != null) {
    readView.notifyDataAvailable();
  }
}

// PipelinedSubpartitionView
@Override
public void notifyDataAvailable() {
  availabilityListener.notifyDataAvailable();
}
// LocalInputChannel
@Override
public void notifyDataAvailable() {
  notifyChannelNonEmpty();
}
// InputChanne
protected void notifyChannelNonEmpty() {
  // 傳入了this,表示該channel有數(shù)據(jù)了,可以被消費(fèi)
  inputGate.notifyChannelNonEmpty(this);
}
// SingleInputChanne
void notifyChannelNonEmpty(InputChannel channel) {
  queueChannel(checkNotNull(channel), null);
}
// 最終會(huì)調(diào)用SingleInputGate.queueChannel
// 代碼走到這里在后面都是有OutputFlusher線程去執(zhí)行到的(不是絕對(duì)的),
// 如果不記得了可以看看上面有RecordWriter內(nèi)部的flush線程做的
// 當(dāng)代碼走完了,說(shuō)明有數(shù)據(jù)的channel已經(jīng)被放入隊(duì)列中,執(zhí)行當(dāng)前task的線程會(huì)開始
// 讀取數(shù)據(jù)通過(guò)pollnext方法
private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) {
  try (GateNotificationHelper notification =
       new GateNotificationHelper(this, inputChannelsWithData)) {
    synchronized (inputChannelsWithData) {
      boolean priority = prioritySequenceNumber != null;
      if (priority
          && isOutdated(
            prioritySequenceNumber,
            lastPrioritySequenceNumber[channel.getChannelIndex()])) {
        // priority event at the given offset already polled (notification is not atomic
        // in respect to
        // buffer enqueuing), so just ignore the notification
        return;
      }

      // 嘗試將channel放入inputChannelsWithData隊(duì)列
      // 如果放入隊(duì)列,
      if (!queueChannelUnsafe(channel, priority)) {
        return;
      }

      if (priority && inputChannelsWithData.getNumPriorityElements() == 1) {
        notification.notifyPriority();
      }
      // 如果隊(duì)列有channel可用,調(diào)用該方法,方法內(nèi)部會(huì)調(diào)用inputChannelsWithData.notifyAll喚醒wait的線程
      // inputChannelsWithData 里面存的是可用的channel,即該channel有數(shù)據(jù)
      if (inputChannelsWithData.size() == 1) {
        notification.notifyDataAvailable();
      }
    }
  }
}

// 上面有數(shù)據(jù)的channel已經(jīng)被加入到隊(duì)列了,對(duì)于當(dāng)前task需要從隊(duì)列拉取到channel來(lái)消費(fèi)數(shù)據(jù)即可
// 在上面構(gòu)建task的時(shí)候我們構(gòu)建了郵箱(mailbox)以及相關(guān)的處理程序
// 那么task是怎么運(yùn)行起來(lái)的呢,我們聊一下
// 在invokeable調(diào)用invoke()后,我們?cè)诶锩嫦日{(diào)用了beforeInvoke構(gòu)建operatorChain調(diào)用open等初始化的邏輯
// 然后調(diào)用runMailboxLoop()方法真的開始執(zhí)行task
// 我們直接進(jìn)入該方法看看
public void runMailboxLoop() throws Exception {
  // 循環(huán)mailbox處理器
  mailboxProcessor.runMailboxLoop();
}

//MailboxProcessor.runMailboxLoop()
public void runMailboxLoop() throws Exception {
  final TaskMailbox localMailbox = mailbox;
  Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");
  assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
  final MailboxController defaultActionContext = new MailboxController(this);
    
  // 基于郵箱的線程模型消費(fèi)數(shù)據(jù),這里不展開
  while (isMailboxLoopRunning()) {
    // default action 可用之前會(huì)阻塞
    processMail(localMailbox, false);
    if (isMailboxLoopRunning()) {
      // 執(zhí)行默認(rèn)的action,里面執(zhí)行的任務(wù),在構(gòu)建task的時(shí)候,構(gòu)建的通過(guò)lambda表達(dá)式傳入了 this::processInput
      
      // 執(zhí)行StreamTask.processInput()方法
      mailboxDefaultAction.runDefaultAction(defaultActionContext); 
    }
  }
}

// StreamTask.processInput()
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
      // 調(diào)用輸入處理器的processInput方法進(jìn)行數(shù)據(jù)的讀取操作,inputPorcessor在初始化的時(shí)候調(diào)用構(gòu)造的
      // 這里進(jìn)入方法里
        InputStatus status = inputProcessor.processInput();
        if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
            return;
        }
        if (status == InputStatus.END_OF_INPUT) {
            controller.allActionsCompleted();
            return;
        }
        CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
        MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
        assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
    }

//StreamOneInputProcessor.processInput()
   public InputStatus processInput() throws Exception {
     // 這里又是 input和output,可能不太好理解, 可以想一想flink的物理執(zhí)行圖的樣子
  
     // input = inputGate  這里表示數(shù)據(jù)的輸入,上游算子將數(shù)據(jù)發(fā)送到subPartition,inputGate的inputChannel消費(fèi)其數(shù)據(jù)
     //          數(shù)據(jù)是序列化后的buffer
     // output = 用于將數(shù)據(jù)拿到反序列化后的record,發(fā)送給operator,output持有operator的引用
         // 數(shù)據(jù)流向      
     // op1(map) --> subPartition --> inputGate --> output --> op2(flatMap) --> subPartition
     
     // 將input的數(shù)據(jù)通過(guò)output傳遞
        InputStatus status = input.emitNext(output);

        if (status == InputStatus.END_OF_INPUT) {
            endOfInputAware.endInput(input.getInputIndex() + 1);
        }

        return status;
    }

// StreamTaskNetworkInput.emitNext()
  public InputStatus emitNext(DataOutput<T> output) throws Exception {
            
    // 可以看到這里是一個(gè)循環(huán),正常的數(shù)據(jù)讀取會(huì)循環(huán)兩次
    // 第一次進(jìn)入processBuffer方法,currentRecordDeserializer對(duì)其賦值
    // 第二次將reocrd反序列化進(jìn)行發(fā)送下游
        while (true) {
            // get the stream element from the deserializer
            // 
            if (currentRecordDeserializer != null) {
                DeserializationResult result;
                try {
                    // 將數(shù)據(jù) 序列化到deserializationDelegate
                    // deserializationDelegate里面會(huì)通過(guò)對(duì)buf的數(shù)據(jù)反序列化,復(fù)制到instance變量
                    result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                } catch (IOException e) {
                    throw new IOException(
                            String.format("Can't get next record for channel %s", lastChannel), e);
                }
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }
                                // 如果結(jié)果是一個(gè)完成的record,那么直接發(fā)送下游不需要在等新的buffer了
                if (result.isFullRecord()) {
                    // 想下游發(fā)送數(shù)據(jù)
                    // 實(shí)際就是 output.emitRecord(reocrd) -> operator.processElement  假設(shè) : op = streamMap
                    // 到這里就完成了數(shù)據(jù)的發(fā)送
                    processElement(deserializationDelegate.getInstance(), output);
                    return InputStatus.MORE_AVAILABLE;
                }
            }
                        
          // 從inputGate中獲取buffer,
            Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
            if (bufferOrEvent.isPresent()) {
                // return to the mailbox after receiving a checkpoint barrier to avoid processing of
                // data after the barrier before checkpoint is performed for unaligned checkpoint
                // mode
                if (bufferOrEvent.get().isBuffer()) {
                    // 處理record的buf
                    // 并獲取對(duì)應(yīng)的序列化器賦值給currentRecordDeserializer
                    processBuffer(bufferOrEvent.get());
                } else {
                  // 處理事件的buf
                    processEvent(bufferOrEvent.get());
                    return InputStatus.MORE_AVAILABLE;
                }
            } else {
                if (checkpointedInputGate.isFinished()) {
                    checkState(
                            checkpointedInputGate.getAvailableFuture().isDone(),
                            "Finished BarrierHandler should be available");
                    return InputStatus.END_OF_INPUT;
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
    }

//CheckpointedInputGate.pollNext()
//CheckpointedInputGate[InputGateWithMetrics[SingleInputGate]],想想裝飾者模式,每次封裝都是對(duì)其功能的增強(qiáng)
// 我們需要看buffer是怎么拿到的
   public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
     // 我們直接進(jìn)入SingleInputGate.pollNext方法
        Optional<BufferOrEvent> next = inputGate.pollNext();
                // 省略無(wú)用代碼
        return next;
    }

//SingleInputGate.pollNext()
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
      // false表示是否要阻塞
        return getNextBufferOrEvent(false);
    }


   private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
            throws IOException, InterruptedException {
     // 省略無(wú)用代碼

        Optional<InputWithData<InputChannel, BufferAndAvailability>> next =
          // 主要獲取buf數(shù)據(jù)的方法
                waitAndGetNextData(blocking);

        InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
        return Optional.of( transformToBufferOrEvent( // 構(gòu)建BufferOrEvent對(duì)象
                        inputWithData.data.buffer(),
                        inputWithData.moreAvailable,
                        inputWithData.input,
                        inputWithData.morePriorityEvents));
    }

// 如果在傳入blocking==true,如果沒(méi)有可用channel的情況下則進(jìn)入wait
// 這里buffer獲取到了,然后就是一層層的封裝過(guò)程,返回到最后就是一個(gè)BufferOrEvent

// 至此 buffer獲取到,開始數(shù)據(jù)的解析,反序列化的過(guò)程
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(
            boolean blocking) throws IOException, InterruptedException {
        while (true) {
            synchronized (inputChannelsWithData) {
              // 從inputChannelsWithData隊(duì)列獲取可用channel,該隊(duì)列前面提到過(guò),如果blocking=true
              // 在沒(méi)有可用的channel的時(shí)候會(huì)進(jìn)入wait狀態(tài)讓出lock
                Optional<InputChannel> inputChannelOpt = getChannel(blocking);
                if (!inputChannelOpt.isPresent()) {
                    return Optional.empty();
                }

                final InputChannel inputChannel = inputChannelOpt.get();
              // inputChannel = LocalInputChannel
              // LocalInputChannel持有subpartitionView引用.可以直接從subpartitionView的buffers獲取buf
              // 在上面addBuffer中添加的
              // 在getNextBuffer中重新包裝buffer,包裝為BufferAndAvailability對(duì)象
                Optional<BufferAndAvailability> bufferAndAvailabilityOpt =
                        inputChannel.getNextBuffer();

                if (!bufferAndAvailabilityOpt.isPresent()) {
                    checkUnavailability();
                    continue;
                }

                final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
                if (bufferAndAvailability.moreAvailable()) {
                    // enqueue the inputChannel at the end to avoid starvation
                    queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
                }

                final boolean morePriorityEvents =
                        inputChannelsWithData.getNumPriorityElements() > 0;
                if (bufferAndAvailability.hasPriority()) {
                    lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
                            bufferAndAvailability.getSequenceNumber();
                    if (!morePriorityEvents) {
                        priorityAvailabilityHelper.resetUnavailable();
                    }
                }

                checkUnavailability();
                                
                return Optional.of(
                        new InputWithData<>(
                                inputChannel,
                                bufferAndAvailability,
                                !inputChannelsWithData.isEmpty(),
                                morePriorityEvents));
            }
        }
    }

在上面StreamTaskNetworkInput.emitNext()方法中最后通過(guò)processElement方法,將數(shù)據(jù)傳入當(dāng)前的operator,然后開始調(diào)用算子處理數(shù)據(jù)

三. TaskManager和TaskManager傳遞數(shù)據(jù)

TaskManagerRunner.runTaskManager()中會(huì)構(gòu)建TaskManagerRunner對(duì)象,該對(duì)象的構(gòu)建過(guò)程很復(fù)雜,一層一層封裝,最終調(diào)用start方法啟動(dòng)tm,在構(gòu)造taskManager的時(shí)候,會(huì)構(gòu)建ShuffleEnvironment<NettyShuffleEnvironment>,在ShuffleEnvironment持有一個(gè)ConnectionManager,該ConnectionManager會(huì)持有netty的client和server,當(dāng)需要網(wǎng)絡(luò)請(qǐng)求的時(shí)候會(huì)通過(guò)ConnectionManager去進(jìn)行網(wǎng)絡(luò)連接做請(qǐng)求

// -----------------   由于代碼比較邏輯比較多,這里直接截取重點(diǎn)地方  ----------------------------
// 說(shuō)一下創(chuàng)建流程
TaskManagerRunner.runTaskManager() // createTaskExecutorService 構(gòu)造方法調(diào)用createTaskExecutorService()
TaskManagerRunner. createTaskExecutorService()  // 構(gòu)建taskExecutor
TaskManagerRunner.startTaskManager() // 啟動(dòng)TaskManagerService
TaskManagerServices.fromConfiguration() // 構(gòu)建tm的內(nèi)部組件
TaskManagerServices.createShuffleEnvironment //構(gòu)建ShuffleEnvironment并調(diào)用start方法 -- 內(nèi)部構(gòu)建connectionManager
NettyShuffleEnvironment.start() // 最終會(huì)調(diào)用connectionManager.start方法,初始化netty的clinet和server


  // 我們看看NettyShuffleEnvironment的構(gòu)建已經(jīng)connectionManager的構(gòu)建過(guò)程中

  static NettyShuffleEnvironment createNettyShuffleEnvironment(
  NettyShuffleEnvironmentConfiguration config,
  ResourceID taskExecutorResourceId,
  TaskEventPublisher taskEventPublisher,
  ResultPartitionManager resultPartitionManager,
  MetricGroup metricGroup,
  Executor ioExecutor) {
  checkNotNull(config);
  checkNotNull(taskExecutorResourceId);
  checkNotNull(taskEventPublisher);
  checkNotNull(resultPartitionManager);
  checkNotNull(metricGroup);
    
  NettyConfig nettyConfig = config.nettyConfig();

  FileChannelManager fileChannelManager =
    new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
    
  // 如果本地測(cè)試構(gòu)建的是LocalConnectionManger,當(dāng)然如果一個(gè)tm就已經(jīng)把任務(wù)啟動(dòng)起來(lái)了,
  // 那么也是LocalConnectionManger,否則都是基于netty的
  
  // 這里我們稍后看內(nèi)容
  ConnectionManager connectionManager =
    nettyConfig != null? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig)
                                    : new LocalConnectionManager();
    
  // 全局的buffer,是基于netty的
  // 回想一下,每個(gè)task有自己的localBufferPool,當(dāng)buffer不夠了,就回去請(qǐng)求netty申請(qǐng)buffer
  // 是的,沒(méi)錯(cuò),請(qǐng)求的就是這個(gè)pool
  NetworkBufferPool networkBufferPool =
    new NetworkBufferPool(
    config.numNetworkBuffers(),
    config.networkBufferSize(),
    config.getRequestSegmentsTimeout());

  registerShuffleMetrics(metricGroup, networkBufferPool);

  ResultPartitionFactory resultPartitionFactory =
    new ResultPartitionFactory(
    resultPartitionManager,
    fileChannelManager,
    networkBufferPool,
    config.getBlockingSubpartitionType(),
    config.networkBuffersPerChannel(),
    config.floatingNetworkBuffersPerGate(),
    config.networkBufferSize(),
    config.isBlockingShuffleCompressionEnabled(),
    config.getCompressionCodec(),
    config.getMaxBuffersPerChannel(),
    config.sortShuffleMinBuffers(),
    config.sortShuffleMinParallelism(),
    config.isSSLEnabled());
    
  // 這里可以看到是一個(gè)SingleInputGate的工廠,為什么這么做呢,因?yàn)閠ask在初始化的時(shí)候,需要針對(duì)該task每個(gè)要消費(fèi)的分區(qū)構(gòu)建
  // 對(duì)應(yīng)的inputGate,通過(guò)工廠可以簡(jiǎn)化構(gòu)建過(guò)程
  SingleInputGateFactory singleInputGateFactory =
    new SingleInputGateFactory(
    taskExecutorResourceId,
    config,
    connectionManager,
    resultPartitionManager,
    taskEventPublisher,
    networkBufferPool);
    
  return new NettyShuffleEnvironment(taskExecutorResourceId,config,networkBufferPool,connectionManager,
    resultPartitionManager, fileChannelManager,resultPartitionFactory,singleInputGateFactory,ioExecutor);
}

上面我們看到了如果構(gòu)建出NettyShuffleEnvironment,在NettyConnectionManager中管理netty的client和server,作為TaskManager的tcp連接,所有Task共用

// 管理netty的client和server

public class NettyConnectionManager implements ConnectionManager {
  public NettyConnectionManager(
    ResultPartitionProvider partitionProvider,
    TaskEventPublisher taskEventPublisher,
    NettyConfig nettyConfig) {
        // server : 用于其他client連接自己的    clinet : 用于連接其他server
        // server和client就是對(duì)netty代碼的一層簡(jiǎn)單封裝,內(nèi)部就是構(gòu)建bootstrap然后配置一些option和handler等
    this.server = new NettyServer(nettyConfig);
    this.client = new NettyClient(nettyConfig);
    this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

    this.partitionRequestClientFactory =
      new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());
        // 內(nèi)部提供了兩個(gè)方法,分別是提供server和client的handlers方法
    this.nettyProtocol =
      new NettyProtocol(
      checkNotNull(partitionProvider), checkNotNull(taskEventPublisher));
  }

  @Override
  public int start() throws IOException {
    // 初始化clinet 和 server
    client.init(nettyProtocol, bufferPool);
    return server.init(nettyProtocol, bufferPool);
  }
    
  // 請(qǐng)求partition,并為其創(chuàng)建一個(gè)client
  // 里面主要包含的兩個(gè)內(nèi)容
  // CreditBasedPartitionRequestClientHandler channelHandler  
  // Channel tcpChannel  task用于發(fā)送tcp請(qǐng)求
  @Override
  public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
    throws IOException, InterruptedException {
    return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
  }
    
  // 省略其他代碼 ......
}

上面我們看到TaskManager已經(jīng)構(gòu)建出了network相關(guān)的所有內(nèi)容了,那么數(shù)據(jù)交換的流程是什么樣子的呢

其實(shí)和TaskManager內(nèi)的task之間的交互流程很相似,對(duì)于上面TaskManager內(nèi)數(shù)據(jù)傳輸?shù)幕A(chǔ)上加了四個(gè)組件分別是PartitonRequestQueue,PartitionRequestServerHandler(server端的handler),PartitionRequestClient,CreditBasedClientHandler,四個(gè)組件

流程 :

上游算子寫入數(shù)據(jù)到subPartiton中,有PartitonRequestQueue讀取buffer,將buffer寫入netty的channel,在netty出站之前會(huì)被netty的handler進(jìn)行處理,即PartitionRequestServerHandler,在PartitionRequestClient讀取到server發(fā)來(lái)的數(shù)據(jù),通過(guò)CreditBasedPartitionRequestClientHandler(client端handler)的處理寫入InputChannel

注意 : Flink數(shù)據(jù)交互是基于生產(chǎn)者消費(fèi)者模型的

// 對(duì)于通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)來(lái)說(shuō),數(shù)據(jù)寫入到buffer與tm內(nèi)傳輸是一致的都是調(diào)用addBuffer方法將生成的buffer加入的buffers隊(duì)列中,其中不同的區(qū)別在于消費(fèi)數(shù)據(jù)的對(duì)象有一些變更,在后面我們會(huì)看到

// 由于其中涉及網(wǎng)絡(luò)通信可能看起來(lái)比較亂,如果我們把思路捋清楚之后其實(shí)就會(huì)變得很簡(jiǎn)單
// 網(wǎng)絡(luò)傳輸是基于netty的所以下面內(nèi)容需要一定的netty知識(shí),如果netty不太了解,建議先了解一下netty


// 繼承netty的入棧處理器,具體調(diào)用時(shí)機(jī)可以參考netty
class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
  // 其他方法忽略掉

  // 當(dāng)有數(shù)據(jù)進(jìn)入的時(shí)候入棧處理器的該方法被調(diào)用
  // 請(qǐng)求時(shí)機(jī),下游的PartitionRequestClient發(fā)起請(qǐng)求,當(dāng)前task接收到對(duì)應(yīng)的請(qǐng)求后,會(huì)調(diào)用該方法進(jìn)行響應(yīng)的處理
  // 要注意在進(jìn)入該handler之前會(huì)先進(jìn)入decodeHandler對(duì)數(shù)據(jù)進(jìn)行解碼,解碼出來(lái)的對(duì)象就是NettyMessage對(duì)象
  protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
    try {
      Class<?> msgClazz = msg.getClass();
      // 根據(jù)不同的請(qǐng)求做出不同的處理,我們只關(guān)注ResumeConsumption這個(gè)事件即可
      if (msgClazz == PartitionRequest.class) {
        PartitionRequest request = (PartitionRequest) msg;
        try {
          NetworkSequenceViewReader reader;
          reader =
            new CreditBasedSequenceNumberingViewReader(
            request.receiverId, request.credit, outboundQueue);

          reader.requestSubpartitionView(
            partitionProvider, request.partitionId, request.queueIndex);

          outboundQueue.notifyReaderCreated(reader);
        } catch (PartitionNotFoundException notFound) {
          respondWithError(ctx, notFound, request.receiverId);
        }
      }else if (msgClazz == TaskEventRequest.class) {
        TaskEventRequest request = (TaskEventRequest) msg;

        if (!taskEventPublisher.publish(request.partitionId, request.event)) {
          respondWithError(
            ctx,
            new IllegalArgumentException("Task event receiver not found."),
            request.receiverId);
        }
      } else if (msgClazz == CancelPartitionRequest.class) {
        CancelPartitionRequest request = (CancelPartitionRequest) msg;
        outboundQueue.cancel(request.receiverId);
      } else if (msgClazz == CloseRequest.class) {
        outboundQueue.close();
      } 
      // 收到client發(fā)來(lái)的credit,flink的流控機(jī)制是基于credit
      // 收到credit表示這我一次性要發(fā)送多少數(shù)據(jù),如果對(duì)方credit過(guò)低,則只會(huì)
      // 發(fā)送對(duì)應(yīng)credit的數(shù)據(jù),不然會(huì)導(dǎo)致整的tcp連接反壓
      // credit 表示 下游消費(fèi)者可用的buffer數(shù)量
      else if (msgClazz == AddCredit.class) {
        AddCredit request = (AddCredit) msg;
        // 添加credit或者resume之后,消費(fèi)者可以開始消費(fèi)數(shù)據(jù),即有請(qǐng)求我們基于響應(yīng)
        // resume是在checkpoint barrier對(duì)齊階段的,當(dāng)前barrier之后的數(shù)據(jù)不可以被消費(fèi)
        outboundQueue.addCreditOrResumeConsumption(

          request.receiverId,
          // 將當(dāng)前的credit與歷史的credit累加 lambda表達(dá)式
          reader -> reader.addCredit(request.credit));
      } else if (msgClazz == ResumeConsumption.class) {
        ResumeConsumption request = (ResumeConsumption) msg;

        outboundQueue.addCreditOrResumeConsumption(
          request.receiverId, NetworkSequenceViewReader::resumeConsumption);
      } else {
        LOG.warn("Received unexpected client request: {}", msg);
      }
    } catch (Throwable t) {
      respondWithError(ctx, t);
    }
  }

}


//PartitonRequestQueue.addCreditOrResumeConsumption()
// 上面是我們netty收到credit請(qǐng)求,那我們需要給消費(fèi)者響應(yīng)數(shù)據(jù)
void addCreditOrResumeConsumption(
  InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)
  throws Exception {
  if (fatalError) {
    return;
  }
  // 通過(guò)對(duì)應(yīng)的recevierId獲取對(duì)應(yīng)的reader讀取數(shù)據(jù)
  // recevierId記錄了發(fā)起請(qǐng)求或者需要接收的inputChannel編號(hào)
  NetworkSequenceViewReader reader = allReaders.get(receiverId);
  if (reader != null) {
    // 傳入的表達(dá)式,累加credit
    operation.accept(reader);

    // 嘗試將reader加入的隊(duì)列,加入隊(duì)列后,會(huì)被循環(huán)的調(diào)用處理該reader讀取數(shù)據(jù)
    // 后面一層一層調(diào)用  availableReaders.add(reader); 將其加入到隊(duì)列中
    enqueueAvailableReader(reader);
  } else {
    throw new IllegalStateException(
      "No reader for receiverId = " + receiverId + " exists.");
  }
}


private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
  if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
    return;
  }

  boolean triggerWrite = availableReaders.isEmpty();
  registerAvailableReader(reader);
  // 如果為空則開始觸發(fā)寫
  if (triggerWrite) {
    writeAndFlushNextMessageIfPossible(ctx.channel());
  }
}


// 如果仔細(xì)看下面的內(nèi)容其實(shí)和tm內(nèi)數(shù)據(jù)傳輸很相似
// 區(qū)別就是通過(guò)一個(gè)是先獲取對(duì)應(yīng)reader,在調(diào)用subpartitionView.pollNext,
// 而tm內(nèi)傳輸就是通過(guò)直接調(diào)用subpartitionView

// 注意他們的類也不一樣,該類是PartitonRequestQueue
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  if (fatalError || !channel.isWritable()) {
    return;
  }
  BufferAndAvailability next = null;
  try {
    while (true) {
      // 從可用reader中獲取一個(gè)reader,如果存在的話則開始進(jìn)行處理
      NetworkSequenceViewReader reader = pollAvailableReader();
      if (reader == null) {
        return;
      }
            // reader里是調(diào)用了subpartitionView.pollNext,與tm內(nèi)tsask交互類似
      next = reader.getNextBuffer();
      if (next == null) {
        if (!reader.isReleased()) {
          continue;
        }

        Throwable cause = reader.getFailureCause();
        if (cause != null) {
          ErrorResponse msg =
            new ErrorResponse(
            new ProducerFailedException(cause), reader.getReceiverId());

          ctx.writeAndFlush(msg);
        }
      } else {
        // 如果還有數(shù)據(jù)則將reader在次加入隊(duì)列中,等待下次被調(diào)用,注意這里是循環(huán)
        if (next.moreAvailable()) {
          registerAvailableReader(reader);
        }
                
        // 構(gòu)建一個(gè)response
        // 該response的響應(yīng)帶有buffer,buffer內(nèi)是record,注意是被序列化的
        BufferResponse msg =
          new BufferResponse(
          next.buffer(),
          next.getSequenceNumber(),
          reader.getReceiverId(),
          next.buffersInBacklog());

       // 將數(shù)據(jù)發(fā)送給clinet端, 當(dāng)前屬于server端
        // 時(shí)刻要記住現(xiàn)在是在netty階段,不要忘記網(wǎng)絡(luò)傳輸
        channel.writeAndFlush(msg)
          // 當(dāng)flush完成會(huì)嘗試處理下一個(gè)buffer
          // 添加一個(gè)listener,該listener也會(huì)做與上面相同的內(nèi)容
          .addListener(writeListener);

        return;
      }
    }
  } catch (Throwable t) {
    if (next != null) {
      next.buffer().recycleBuffer();
    }

    throw new IOException(t.getMessage(), t);
  }
}


上面的內(nèi)容是下游算子發(fā)送請(qǐng)求到上游,netty接收到下游的請(qǐng)求,對(duì)消息進(jìn)行decode,最后封裝成nettyMessage,在PartitionRequestServerHandler判斷事件類型,進(jìn)行響應(yīng)的處理,由于我們關(guān)注的是數(shù)據(jù)交互內(nèi)容,所以我們關(guān)注的是AddCredit事件,收到該實(shí)際后找到請(qǐng)求的inpuCannel對(duì)應(yīng)的reader,然后通過(guò)reader調(diào)用pollNext讀取數(shù)據(jù)反饋給client響應(yīng)

下面我們需要對(duì)client的響應(yīng)內(nèi)容進(jìn)行分析

netty的server端即producer,收到client端即consumer發(fā)送的請(qǐng)求后,處理完成后會(huì)進(jìn)行響應(yīng),數(shù)據(jù)是通過(guò)tcp進(jìn)行發(fā)送的,在netty對(duì)數(shù)據(jù)處理抽象成了一個(gè)pipeline,pipeline是由多個(gè)handler組成的,當(dāng)netty接收的數(shù)據(jù)的時(shí)候會(huì)觸發(fā)inboundHander的計(jì)算,當(dāng)發(fā)送數(shù)據(jù)出去的時(shí)候會(huì)觸發(fā)outboundHander的計(jì)算,所以netty的client端接收到數(shù)據(jù)后對(duì)進(jìn)入decode進(jìn)行解碼響應(yīng),解析的response會(huì)向下游的handler發(fā)送,下游的handler就是CreditBasedPartitionRequestClientHandler,所以我們的入口就是這個(gè)類

注 : 由于涉及網(wǎng)絡(luò)連接我們對(duì)于其關(guān)系的調(diào)用可能會(huì)比較混亂,畢竟跨網(wǎng)絡(luò)其代碼復(fù)雜度就會(huì)增加,在后面我會(huì)通過(guò)畫圖的方式將其原理講解出來(lái)

class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter
  implements NetworkClientHandler {

  // netty client接收到數(shù)據(jù)的時(shí)候調(diào)用該方法
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
      // 解碼消息
      decodeMsg(msg);
    } catch (Throwable t) {
      notifyAllChannelsOfErrorAndClose(t);
    }
  }

  private void decodeMsg(Object msg) throws Throwable {
    final Class<?> msgClazz = msg.getClass();

    // 解碼實(shí)際上在數(shù)據(jù)接入的時(shí)候已經(jīng)解碼,現(xiàn)在只不過(guò)是對(duì)msg做了強(qiáng)轉(zhuǎn)
    if (msgClazz == NettyMessage.BufferResponse.class) {
      NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

      RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
      if (inputChannel == null || inputChannel.isReleased()) {
        bufferOrEvent.releaseBuffer();
        cancelRequestFor(bufferOrEvent.receiverId);
        return;
      }
      try {
        // 就當(dāng)層層調(diào)用嗎
        decodeBufferOrEvent(inputChannel, bufferOrEvent);
      } catch (Throwable t) {
        inputChannel.onError(t);
      }
    } else if (msgClazz == NettyMessage.ErrorResponse.class) {
      // 省略 .....
    } else {
      throw new IllegalStateException( "Received unknown message from producer: " + msg.getClass());
    }
  }


  private void decodeBufferOrEvent(
    RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent)
    throws Throwable {
    if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
      inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
    } else if (bufferOrEvent.getBuffer() != null) {
      // 如果有buffer調(diào)用inputChannel的onBuffer方法
      // inputChannel此時(shí)是RemoteInputChannel
      // 主要就是將buffer加入到隊(duì)列中
      inputChannel.onBuffer(
        bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
    } else {
      throw new IllegalStateException(
        "The read buffer is null in credit-based input channel.");
    }
  }

}


// RemoteInputChannel.onBuffer

   public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean recycleBuffer = true;

        try {
            if (expectedSequenceNumber != sequenceNumber) {
                onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                return;
            }

            final boolean wasEmpty;
            boolean firstPriorityEvent = false;
            synchronized (receivedBuffers) {
                NetworkActionsLogger.traceInput(
                        "RemoteInputChannel#onBuffer",
                        buffer,
                        inputGate.getOwningTaskName(),
                        channelInfo,
                        channelStatePersister,
                        sequenceNumber);
           
                if (isReleased.get()) {
                    return;
                }

                wasEmpty = receivedBuffers.isEmpty();

                SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
                DataType dataType = buffer.getDataType();
                if (dataType.hasPriority()) {
                    firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
                    recycleBuffer = false;
                } else {
                    receivedBuffers.add(sequenceBuffer);
                    recycleBuffer = false;
                    channelStatePersister.maybePersist(buffer);
                    if (dataType.requiresAnnouncement()) {
                        firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
                    }
                }
                ++expectedSequenceNumber;
            }

            if (firstPriorityEvent) {
                notifyPriorityEvent(sequenceNumber);
            }
          // 改代碼前面遇到過(guò)很多次了
            if (wasEmpty) {
              // 通知channel有數(shù)據(jù),將channel加入可用channel,和前面的代碼一樣
              // 都是調(diào)用到了inputGate的notifyChannelNonEmpty方法
              // 并喚醒在inputChannelsWithData等待的線程
                notifyChannelNonEmpty();
            }

            if (backlog >= 0) {
                onSenderBacklog(backlog);
            }
        } finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }


上面已經(jīng)完成了數(shù)據(jù)進(jìn)入隊(duì)列和可用channel進(jìn)入隊(duì)列,后面的內(nèi)容與tm內(nèi)數(shù)據(jù)交互基本一直只不過(guò)在請(qǐng)求inputChannel的時(shí)候從LocalInputChannel變成了RemotInputChannel,其余邏輯都是相同的

到這里我們已經(jīng)把數(shù)據(jù)交換的內(nèi)容已經(jīng)講解完成,下面是圖解,對(duì)于里面的內(nèi)容在代碼中都有體現(xiàn)

數(shù)據(jù)網(wǎng)絡(luò)交換流程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末存哲,一起剝皮案震驚了整個(gè)濱河市春感,隨后出現(xiàn)的幾起案子弥喉,更是在濱河造成了極大的恐慌捷兰,老刑警劉巖饲化,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鞠鲜,死亡現(xiàn)場(chǎng)離奇詭異焰情,居然都是意外死亡闹司,警方通過(guò)查閱死者的電腦和手機(jī)嗤朴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門配椭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人雹姊,你說(shuō)我怎么就攤上這事股缸。” “怎么了吱雏?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵敦姻,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我歧杏,道長(zhǎng)替劈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任得滤,我火速辦了婚禮陨献,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘懂更。我一直安慰自己眨业,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布沮协。 她就那樣靜靜地躺著龄捡,像睡著了一般。 火紅的嫁衣襯著肌膚如雪慷暂。 梳的紋絲不亂的頭發(fā)上聘殖,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音行瑞,去河邊找鬼奸腺。 笑死,一個(gè)胖子當(dāng)著我的面吹牛血久,可吹牛的內(nèi)容都是我干的突照。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼氧吐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼讹蘑!你這毒婦竟也來(lái)了末盔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤座慰,失蹤者是張志新(化名)和其女友劉穎陨舱,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體版仔,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡隅忿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了邦尊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片背桐。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蝉揍,靈堂內(nèi)的尸體忽然破棺而出链峭,到底是詐尸還是另有隱情,我是刑警寧澤又沾,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布弊仪,位于F島的核電站,受9級(jí)特大地震影響杖刷,放射性物質(zhì)發(fā)生泄漏励饵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一滑燃、第九天 我趴在偏房一處隱蔽的房頂上張望役听。 院中可真熱鬧,春花似錦表窘、人聲如沸典予。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瘤袖。三九已至,卻和暖如春昂验,著一層夾襖步出監(jiān)牢的瞬間捂敌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工既琴, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留占婉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓呛梆,卻偏偏與公主長(zhǎng)得像锐涯,于是被迫代替她去往敵國(guó)和親磕诊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子填物,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容