Apache IoTDB 查詢引擎源碼閱讀——數(shù)據(jù)異步傳輸模塊

本文參考了 Apache IoTDB 社區(qū)成員田原和王中的設(shè)計(jì)文檔,由于飛書(shū)鏈接限制晚吞,本文沒(méi)有貼出參考鏈接。

背景

Apache IoTDB 查詢引擎目前采用 MPP 架構(gòu),一條查詢 SQL 大致會(huì)經(jīng)歷下圖幾個(gè)階段:

img

FragmentInstance 是分布式計(jì)劃被拆分后實(shí)際分發(fā)到各個(gè)節(jié)點(diǎn)進(jìn)行執(zhí)行的實(shí)例熙含。這些被拆分出的 FragmentInstance 邏輯上仍然構(gòu)成一個(gè)樹(shù)形結(jié)構(gòu),父親結(jié)點(diǎn)需要子結(jié)點(diǎn)的輸出作為輸入(即下游 FragmentInstance 需要接收上游 FragmentInstance 的輸出作為輸入)來(lái)完成相應(yīng)的邏輯艇纺。

由于 FragmentInstance 可能被分發(fā)到不同節(jié)點(diǎn)怎静,數(shù)據(jù)傳輸需要進(jìn)行網(wǎng)絡(luò)通信并且具有依賴關(guān)系,需要對(duì) FragmentInstance 之間的數(shù)據(jù)傳輸進(jìn)行管理黔衡,因此引入了數(shù)據(jù)異步傳輸模塊蚓聘。該模塊可以看成是火山模型中 ExchangeOperator 的一種實(shí)現(xiàn)方式,使用了生產(chǎn)者消費(fèi)者模型员帮。

重要概念

  • ISinkHandle:通常每一個(gè) FragmentInstance 持有一個(gè) ISinkHandle或粮,用于向上游 FragmentInstance 異步傳輸計(jì)算結(jié)果。
  • ExchangeOperator:FragmentInstance 間數(shù)據(jù)傳輸邏輯和 FragmentInstance 的執(zhí)行邏輯是解耦的捞高,F(xiàn)ragmentInstance 的算子樹(shù)結(jié)點(diǎn)可能存在 ExchangeOperator氯材,ExchangeOperator 持有 ISourceHandle,可以從上游獲取輸入硝岗。
  • ISourceHandle:與 ISinkHandle 一一對(duì)應(yīng)氢哮,接收 ISinkHandle 的計(jì)算結(jié)果,傳給 ExchangeOperator型檀。
  • MPPDataExchangeManager:當(dāng)前節(jié)點(diǎn)數(shù)據(jù)傳輸模塊的管理中心冗尤,持有線程資源,是 ISinkHandle 和 ISourceHandle 交互的中間站胀溺。

具體實(shí)現(xiàn)

MPPDataExchangeManager

MppDataExchangeManager 是當(dāng)前節(jié)點(diǎn)數(shù)據(jù)傳輸模塊的管理中心裂七,有下述職責(zé):

  • 負(fù)責(zé)創(chuàng)建 ISinkHandle 和 ISourceHandle。FragmentInstance 需要通過(guò) MPPDataExchangeManager 創(chuàng)建 ISinkHandle 和 ISourceHandle仓坞,MPPDataExchangeManager 維護(hù)了兩個(gè) Map背零。
// FragmentInstance 可能有多個(gè) ExchangeOperator,進(jìn)而有多個(gè) ISourceHandle
// 因此這里的 Map 是一個(gè)兩層 Map无埃,即 FragmentInstance -> PlanNodeID -> ISourceHandle
private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;

// FragmentInstance -> SinkHandle
private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
  • 定義了 SinkHandleListener 和 SourceHandleListener徙瓶。ISinkHandle 和 ISourceHandle 定義了 abort(), close() 等方法, SinkHandleListener 和 SourceHandleListener 會(huì)在這些方法里被使用嫉称,用于通知相應(yīng) FragmentInstance 以及更新前述兩個(gè) Map侦镇。
  • 實(shí)現(xiàn)了 MPPDataExchangeService.Iface。不同節(jié)點(diǎn)間通過(guò) Thrift RPC 通信织阅,MPPDataExchangeService.Iface 定義了 SinkHandle 和 SourceHandle 的交互接口壳繁,接口具體邏輯將在下文分析。

SinkHandle 和 SourceHandle

SinkHandle 和 SourceHandle 是 ISinkHandle 和 ISourceHandle 的一組實(shí)現(xiàn)類(lèi),用于不同節(jié)點(diǎn)間 FragmentInstance 的數(shù)據(jù)通信氮趋。

SinkHandle 和 SourceHandle 的數(shù)據(jù)通信主要分為三步:

  1. 每產(chǎn)生一個(gè) TsBlock伍派,SinkHandle 向 SourceHandle 發(fā)送一個(gè) NewDataBlockEvent,包含該 TsBlock 的sequenceId 以及所占內(nèi)存大惺P病(如果再無(wú)新的數(shù)據(jù)產(chǎn)生诉植,則發(fā)送一個(gè)EndOfDataBlockEvent)。在接收到 SourceHandle 對(duì)該 TsBlock 的 ack 之前昵观,保存該 TsBlock晾腔。
  2. SourceHandle 收到 NewDataBlockEvent后,在內(nèi)存中選取一段連續(xù)區(qū)間的 sequenceId啊犬,向 SinkHandle 發(fā)起拉取數(shù)據(jù)的請(qǐng)求灼擂。
  3. SourceHandle 拉取到數(shù)據(jù)后,向 SinkHandle 發(fā)送 ack 消息觉至,SinkHandle 收到 ack 消息后剔应,便可以將對(duì)應(yīng)的TsBlock 釋放。
    1. img

首先來(lái)看 SinkHandle 發(fā)送數(shù)據(jù)的邏輯(只有 SinkHandle 的 isFull() 返回的 Future 被 complete 后语御,send 方法才會(huì)被調(diào)用峻贮,具體可以參考 Driver#processInternal):

@Override
public synchronized void send(TsBlock tsBlock) {
  long startTime = System.nanoTime();
  try {
    Validate.notNull(tsBlock, "tsBlocks is null");
    checkState();
    if (!blocked.isDone()) {
      throw new IllegalStateException("Sink handle is blocked.");
    }
    if (noMoreTsBlocks) {
      return;
    }
    long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
    int startSequenceId;
    startSequenceId = nextSequenceId;
    blocked =
        localMemoryManager
            .getQueryPool()
            .reserve(
                localFragmentInstanceId.getQueryId(),
                localFragmentInstanceId.getInstanceId(),
                localPlanNodeId,
                retainedSizeInBytes,
                maxBytesCanReserve)
            .left;
    bufferRetainedSizeInBytes += retainedSizeInBytes;

    sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
    nextSequenceId += 1;
    currentTsBlockSize = retainedSizeInBytes;

    // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
    submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
  }
}
  • SinkHandle 在初始化的時(shí)候就會(huì)向內(nèi)存池申請(qǐng)內(nèi)存,此時(shí)會(huì)初始化 blocked 這個(gè) Future应闯。
  • 進(jìn)入 send 方法說(shuō)明 blocked.isDone() == true纤控,send 并不會(huì)直接發(fā)送 TsBlock,而是發(fā)送 NewDataBlockEventTask碉纺,SourceHandle 后續(xù)會(huì)通過(guò) sequenceId 拉取指定的 TsBlock船万。
  • send() 用這次發(fā)送的 TsBlock 的大小來(lái)估計(jì)下一次要發(fā)送的 TsBlock 的大小,所以 16 -25 行更新 blocked 時(shí)使用的是當(dāng)前 TsBlock 的 retainedSizeInBytes骨田。

下面來(lái)看 SourceHandle 拉取 TsBlock 的邏輯耿导,可以直接參考注釋?zhuān)?/p>

private synchronized void trySubmitGetDataBlocksTask() {
  if (aborted || closed) {
    return;
  }
  if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
    return;
  }

  final int startSequenceId = nextSequenceId;
  int endSequenceId = nextSequenceId;
  long reservedBytes = 0L;
  Pair<ListenableFuture<Void>, Boolean> pair = null;
  long blockedSize = 0L;
  
  // 選取一段連續(xù)的 sequenceId
  while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
    Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
    if (bytesToReserve == null) {
      throw new IllegalStateException("Data block size is null.");
    }
    // 從內(nèi)存池申請(qǐng)內(nèi)存
    pair =
        localMemoryManager
            .getQueryPool()
            .reserve(
                localFragmentInstanceId.getQueryId(),
                localFragmentInstanceId.getInstanceId(),
                localPlanNodeId,
                bytesToReserve,
                maxBytesCanReserve);
    bufferRetainedSizeInBytes += bytesToReserve;
    endSequenceId += 1;
    reservedBytes += bytesToReserve;
    // 沒(méi)有申請(qǐng)到內(nèi)存,跳出循環(huán)
    if (!pair.right) {
      blockedSize = bytesToReserve;
      break;
    }
  }

  if (pair == null) {
    // Next data block not generated yet. Do nothing.
    return;
  }
  nextSequenceId = endSequenceId;

  // 注冊(cè)回調(diào)函數(shù)态贤,在申請(qǐng)內(nèi)存的 future 被 complete(表明內(nèi)存被申請(qǐng)到了)時(shí)拉取指定 sequenceId 的 TsBlock
  if (!pair.right) {
    endSequenceId--;
    reservedBytes -= blockedSize;
    // The future being not completed indicates,
    //   1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
    //   2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
    //   3. Have not reserve memory for the rest of blocks.
    //
    //  startSequenceId          endSequenceId - 1  endSequenceId
    //         |-------- reserved --------|--- blocked ---|--- not reserved ---|

    // Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
    blockedOnMemory = pair.left;
    final int blockedSequenceId = endSequenceId;
    final long blockedRetainedSize = blockedSize;
    blockedOnMemory.addListener(
        () ->
            executorService.submit(
                new GetDataBlocksTask(
                    blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)),
        executorService);
  }

  if (endSequenceId > startSequenceId) {
    executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
  }
}

LocalSinkHandle 和 LocalSourceHandle

LocalSinkHandle 和 LocalSourceHandle 是 ISinkHandle 和 ISourceHandle 的另一組實(shí)現(xiàn)類(lèi)舱呻,用于同一節(jié)點(diǎn)不同 FragmentInstance 的數(shù)據(jù)通信。不復(fù)用 SinkHandle 和 SourceHandle 是因?yàn)橥还?jié)點(diǎn)沒(méi)必要再使用 RPC 通信抵卫,可以節(jié)省網(wǎng)絡(luò)開(kāi)銷(xiāo)狮荔。

LocalSinkHandle 和 LocalSourceHandle 通過(guò)一個(gè)共享的阻塞隊(duì)列 SharedTsBlockQueue 進(jìn)行通信胎撇。

LocalSinkHandle 的發(fā)送邏輯(只有 LocalSinkHandle 的 isFull() 返回的 Future 被 complete 后才會(huì)發(fā)送介粘,直接往 queue 里放 TsBlock):

@Override
public void send(TsBlock tsBlock) {
  long startTime = System.nanoTime();
  try {
    Validate.notNull(tsBlock, "tsBlocks is null");
    synchronized (this) {
      checkState();
      if (!blocked.isDone()) {
        throw new IllegalStateException("Sink handle is blocked.");
      }
    }

    synchronized (queue) {
      if (queue.hasNoMoreTsBlocks()) {
        return;
      }
      logger.debug("[StartSendTsBlockOnLocal]");
      synchronized (this) {
        blocked = queue.add(tsBlock);
      }
    }
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
  }
}

LocalSourceHandle 的拉取邏輯(只有 isBlocked() 返回的 Future complete 時(shí)才會(huì)被調(diào)用):

@Override
public TsBlock receive() {
  long startTime = System.nanoTime();
  try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
    checkState();

    if (!queue.isBlocked().isDone()) {
      throw new IllegalStateException("Source handle is blocked.");
    }
    TsBlock tsBlock;
    synchronized (queue) {
      tsBlock = queue.remove();
    }
    if (tsBlock != null) {
      logger.debug(
          "[GetTsBlockFromQueue] TsBlock:{} size:{}",
          currSequenceId,
          tsBlock.getRetainedSizeInBytes());
      currSequenceId++;
    }
    checkAndInvokeOnFinished();
    return tsBlock;
  } finally {
    QUERY_METRICS.recordDataExchangeCost(
        SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
  }
}

LocalSinkHandle 的 send 方法和 LocalSourceHandle 的 receive 方法實(shí)現(xiàn)都較為簡(jiǎn)單,主要通過(guò) SharedTsBlockQueue 進(jìn)行交互晚树,下面是 SharedTsBlockQueue 的 remove 和 add 方法:

/**
 * Remove a tsblock from the head of the queue and return. Should be invoked only when the future
 * returned by {@link #isBlocked()} completes.
 */
public TsBlock remove() {
  if (closed) {
    throw new IllegalStateException("queue has been destroyed");
  }
  TsBlock tsBlock = queue.remove();
  // Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
  // corresponding LocalSinkHandle.
  if (sinkHandle != null) {
    sinkHandle.checkAndInvokeOnFinished();
  }
  
  // 釋放當(dāng)前 TsBlock 在 MemoryPool 中占用的內(nèi)存
  localMemoryManager
      .getQueryPool()
      .free(
          localFragmentInstanceId.getQueryId(),
          localFragmentInstanceId.getInstanceId(),
          localPlanNodeId,
          tsBlock.getRetainedSizeInBytes());
  bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
  if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
    blocked = SettableFuture.create();
  }
  return tsBlock;
}

/**
 * Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
 * the returned future of last invocation completes.
 */
public ListenableFuture<Void> add(TsBlock tsBlock) {
  if (closed) {
    logger.warn("queue has been destroyed");
    return immediateVoidFuture();
  }

  Validate.notNull(tsBlock, "TsBlock cannot be null");
  Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
  Pair<ListenableFuture<Void>, Boolean> pair =
      localMemoryManager
          .getQueryPool()
          .reserve(
              localFragmentInstanceId.getQueryId(),
              localFragmentInstanceId.getInstanceId(),
              localPlanNodeId,
              tsBlock.getRetainedSizeInBytes(),
              maxBytesCanReserve);
  blockedOnMemory = pair.left;
  bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();

  // reserve memory failed, we should wait until there is enough memory
  if (!pair.right) {
    blockedOnMemory.addListener(
        () -> {
          synchronized (this) {
            queue.add(tsBlock);
            if (!blocked.isDone()) {
              blocked.set(null);
            }
          }
        },
        directExecutor());
  } else { // reserve memory succeeded, add the TsBlock directly
    queue.add(tsBlock);
    if (!blocked.isDone()) {
      blocked.set(null);
    }
  }

  return blockedOnMemory;

MemoryPool

由于采用異步傳輸機(jī)制姻采,SinkHandle 在實(shí)際發(fā)送數(shù)據(jù)前需先將計(jì)算好的 TsBlock 保留在內(nèi)存中,SourceHandle 在接收 TsBlock 前也需要先預(yù)留內(nèi)存爵憎,為了對(duì)數(shù)據(jù)傳輸模塊占用的內(nèi)存進(jìn)行管理慨亲,SinkHandle 和 SourceHandle 需要通過(guò) MemoryPool 申請(qǐng)內(nèi)存婚瓜。

每個(gè)節(jié)點(diǎn)持有一個(gè) MemoryPool,大小由配置參數(shù)決定:

public class LocalMemoryManager {

  private final MemoryPool queryPool;

  public LocalMemoryManager() {
    queryPool =
        new MemoryPool(
            "query",
            IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
            IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
  }

  public MemoryPool getQueryPool() {
    return queryPool;
  }
public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
  this.id = Validate.notNull(id);
  Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
  // maxBytes 是整個(gè) pool 最大可以使用的內(nèi)存容量
  this.maxBytes = maxBytes;
  Validate.isTrue(
      maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
      "max bytes per query should be greater than zero while less than or equal to max bytes. maxBytesPerQuery: %d, maxBytes: %d",
      maxBytesPerFragmentInstance,
      maxBytes);
  // maxBytesPerFragmentInstance 是單個(gè) FragmentInstance 的 ISinkHandle 和 ISourceHandle
  // 占用內(nèi)存之和的最大值
  this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
}

ISinkHandle 和 ISourceHandle 通過(guò) MemoryPool 的 reserve 方法申請(qǐng)內(nèi)存刑棵,reserve 在判斷是否申請(qǐng)成功時(shí)進(jìn)行兩層判斷:

  • 首先判斷申請(qǐng)的內(nèi)存會(huì)不會(huì)超過(guò) MemoryPool 最大限制巴刻,maxBytes - reservedBytes < bytesToReserve 表明超過(guò)限制。
  • 每一個(gè) ISinkHandle/ISourceHandle 能申請(qǐng)的內(nèi)存也有限制蛉签,第二層判斷申請(qǐng)的內(nèi)存會(huì)不會(huì)超過(guò)調(diào)用 reserve 方法的 ISinkHandle/ISourceHandle 的限制胡陪,即 27 - 32 行邏輯。

如果申請(qǐng)成功碍舍,則更新 MemoryPool 已使用的內(nèi)存以及該 ISinkHandle/ISourceHandle 占用的內(nèi)存(更新 queryMemoryReservations)柠座,然后返回 Futures.immediateFuture(null);

如果申請(qǐng)失敗,則創(chuàng)建一個(gè) MemoryReservationFuture片橡,加入維持的 list妈经,當(dāng)調(diào)用 MemoryPool#free 釋放內(nèi)存的時(shí)候,會(huì)選取 list 中的 future 進(jìn)行 complete捧书。

下面是 reserve 方法的源碼:

/** @return if reserve succeed, pair.right will be true, otherwise false */
public Pair<ListenableFuture<Void>, Boolean> reserve(
    String queryId,
    String fragmentInstanceId,
    String planNodeId,
    long bytesToReserve,
    long maxBytesCanReserve) {
  Validate.notNull(queryId);
  Validate.notNull(fragmentInstanceId);
  Validate.notNull(planNodeId);
  Validate.isTrue(
      bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
      "bytes should be greater than zero while less than or equal to max bytes per fragment instance: %d",
      bytesToReserve);
  if (bytesToReserve > maxBytesCanReserve) {
    LOGGER.warn(
        "Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
        bytesToReserve,
        planNodeId);
    throw new IllegalArgumentException(
        "Query is aborted since it requests more memory than can be allocated.");
  }

  ListenableFuture<Void> result;
  synchronized (this) {
    if (maxBytes - reservedBytes < bytesToReserve
        || maxBytesCanReserve
                - queryMemoryReservations
                    .getOrDefault(queryId, Collections.emptyMap())
                    .getOrDefault(fragmentInstanceId, Collections.emptyMap())
                    .getOrDefault(planNodeId, 0L)
            < bytesToReserve) {
      LOGGER.debug(
          "Blocked reserve request: {} bytes memory for planNodeId{}",
          bytesToReserve,
          planNodeId);
      result =
          MemoryReservationFuture.create(
              queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
      memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
      return new Pair<>(result, Boolean.FALSE);
    } else {
      reservedBytes += bytesToReserve;
      queryMemoryReservations
          .computeIfAbsent(queryId, x -> new HashMap<>())
          .computeIfAbsent(fragmentInstanceId, x -> new HashMap<>())
          .merge(planNodeId, bytesToReserve, Long::sum);
      result = Futures.immediateFuture(null);
      return new Pair<>(result, Boolean.TRUE);
    }
  }
}

調(diào)用 MemoryPool#free 時(shí)吹泡,首先會(huì)更新 MemoryPool 占用的內(nèi)存和 ISinkHanlde/ISourceHandle 占用的內(nèi)存。

然后會(huì)遍歷 memoryReservationFutures 查看可以 complete 的 Future:

public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
  List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
  synchronized (this) {
    Validate.notNull(queryId);
    Validate.isTrue(bytes > 0L);

    Long queryReservedBytes =
        queryMemoryReservations
            .getOrDefault(queryId, Collections.emptyMap())
            .getOrDefault(fragmentInstanceId, Collections.emptyMap())
            .get(planNodeId);
    Validate.notNull(queryReservedBytes);
    Validate.isTrue(bytes <= queryReservedBytes);

    queryReservedBytes -= bytes;
    if (queryReservedBytes == 0) {
      queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
    } else {
      queryMemoryReservations
          .get(queryId)
          .get(fragmentInstanceId)
          .put(planNodeId, queryReservedBytes);
    }
    reservedBytes -= bytes;

    if (memoryReservationFutures.isEmpty()) {
      return;
    }
    Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
    while (iterator.hasNext()) {
      MemoryReservationFuture<Void> future = iterator.next();
      if (future.isCancelled() || future.isDone()) {
        continue;
      }
      long bytesToReserve = future.getBytesToReserve();
      String curQueryId = future.getQueryId();
      String curFragmentInstanceId = future.getFragmentInstanceId();
      String curPlanNodeId = future.getPlanNodeId();
      // check total reserved bytes in memory pool
      if (maxBytes - reservedBytes < bytesToReserve) {
        continue;
      }
      // check total reserved bytes of one Sink/Source handle
      if (future.getMaxBytesCanReserve()
              - queryMemoryReservations
                  .getOrDefault(curQueryId, Collections.emptyMap())
                  .getOrDefault(curFragmentInstanceId, Collections.emptyMap())
                  .getOrDefault(curPlanNodeId, 0L)
          >= bytesToReserve) {
        reservedBytes += bytesToReserve;
        queryMemoryReservations
            .computeIfAbsent(curQueryId, x -> new HashMap<>())
            .computeIfAbsent(curFragmentInstanceId, x -> new HashMap<>())
            .merge(curPlanNodeId, bytesToReserve, Long::sum);
        futureList.add(future);
        iterator.remove();
      }
    }
  }

  // why we need to put this outside MemoryPool's lock?
  // If we put this block inside the MemoryPool's lock, we will get deadlock case like the
  // following:
  // Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
  // MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
  // B-SharedTsBlockQueue's lock
  // thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
  // B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
  for (MemoryReservationFuture<Void> future : futureList) {
    try {
      future.set(null);
    } catch (Throwable t) {
      // ignore it, because we still need to notify other future
      LOGGER.error("error happened while trying to free memory: ", t);
    }
  }
}

總結(jié)

上述流程存在幾點(diǎn)問(wèn)題:

  1. SourceHandle 無(wú)法對(duì)事件到達(dá)做任務(wù)順序的假設(shè)鳄厌,導(dǎo)致 SourceHandle 的編寫(xiě)有些復(fù)雜荞胡,需要考慮事件亂序到達(dá)的情況。
  2. 假設(shè) NewDataBlockEvent 事件順序到達(dá)了嚎,且 SourceHandle 的消費(fèi)速度與 SinkHandle 的生成速度一致泪漂,則傳輸一個(gè) TsBlock,需3次 RPC歪泳,網(wǎng)絡(luò)開(kāi)銷(xiāo)較大:
    1. SinkHandle 發(fā)送 NewDataBlockEvent
    2. SourceHandle 拉取 NewDataBlockEvent 對(duì)應(yīng) sequenceId的 TsBlock
    3. SourceHandle 成功拉取到 TsBlock 后萝勤,發(fā)送 ack 消息

傳輸一個(gè) TsBlock 需要三次 rpc 通信的設(shè)計(jì)初衷:

  • 控制數(shù)據(jù)傳輸線程數(shù)量,并且不讓一個(gè) SourceHandle 占據(jù)一個(gè)線程過(guò)久呐伞,以致其他 SourceHandle 無(wú)法發(fā)起數(shù)據(jù)傳輸請(qǐng)求:
    • 因?yàn)?thrift rpc 是同步的敌卓,所以如果沒(méi)有 NewDataBlockEvent,SourceHandle 盲目去拉取數(shù)據(jù)伶氢,那么可能SinkHandle 端數(shù)據(jù)還未準(zhǔn)備好趟径。如果阻塞等待 SinkHandle 產(chǎn)生數(shù)據(jù)后,返回此次 rpc 結(jié)果癣防,會(huì)導(dǎo)致一個(gè)SourceHandle 占據(jù)一個(gè)數(shù)據(jù)傳輸線程過(guò)久蜗巧,在數(shù)據(jù)傳輸線程總數(shù)一定的情況下,其他 SourceHandle 的請(qǐng)求會(huì)得不到及時(shí)處理蕾盯。
    • 所以需要依賴 SinkHandle 的 NewDataBlockEvent 通知幕屹,在 SinkHandle 數(shù)據(jù)準(zhǔn)備好的時(shí)候,發(fā)送一個(gè) rpc 去通知 SourceHandle 拉取數(shù)據(jù)。此時(shí) SourceHandle 拉取數(shù)據(jù)的 rpc 雖然也是同步的望拖,但是 SinkHandle 端的數(shù)據(jù)一定是準(zhǔn)備好的渺尘,所以該 rpc 同步阻塞占用數(shù)據(jù)傳輸線程的時(shí)間很短,不會(huì)導(dǎo)致 SourceHandle的請(qǐng)求過(guò)多等待
  • 防止 TsBlock 丟失说敏,啟用 ack 消息去做容錯(cuò)
    • 如果 SinkHandle 在收到 SourceHandle 拉取請(qǐng)求的 rpc 后鸥跟,就將對(duì)應(yīng)的 TsBlock 釋放,那么會(huì)存在數(shù)據(jù)丟失的風(fēng)險(xiǎn):網(wǎng)絡(luò)問(wèn)題導(dǎo)致此次 rpc 失敗盔沫,雖然 SourceHandle 那邊會(huì)重試锌雀,但是 SinkHandle 在處理上次 rpc 請(qǐng)求時(shí),已經(jīng)把對(duì)應(yīng)的 TsBlock 釋放掉了迅诬,導(dǎo)致 SourceHandle 的重試也是徒勞無(wú)功腋逆。
  • 內(nèi)存控制實(shí)現(xiàn)較為簡(jiǎn)單,因?yàn)樵诶?shù)據(jù)之前就已經(jīng)得知每個(gè) TsBlock 的大小侈贷,所以可以判斷 SourceHandle 端內(nèi)存是否足夠惩歉,足夠了才去拉取,無(wú)需提前預(yù)分配內(nèi)存給查詢俏蛮,每次即時(shí)申請(qǐng)即可撑蚌。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市搏屑,隨后出現(xiàn)的幾起案子争涌,更是在濱河造成了極大的恐慌,老刑警劉巖辣恋,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件亮垫,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡伟骨,警方通過(guò)查閱死者的電腦和手機(jī)饮潦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)携狭,“玉大人继蜡,你說(shuō)我怎么就攤上這事」渫龋” “怎么了稀并?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)单默。 經(jīng)常有香客問(wèn)我碘举,道長(zhǎng),這世上最難降的妖魔是什么雕凹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任殴俱,我火速辦了婚禮,結(jié)果婚禮上枚抵,老公的妹妹穿的比我還像新娘线欲。我一直安慰自己,他們只是感情好汽摹,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布李丰。 她就那樣靜靜地躺著,像睡著了一般逼泣。 火紅的嫁衣襯著肌膚如雪趴泌。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天拉庶,我揣著相機(jī)與錄音嗜憔,去河邊找鬼。 笑死氏仗,一個(gè)胖子當(dāng)著我的面吹牛吉捶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播皆尔,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼呐舔,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了慷蠕?” 一聲冷哼從身側(cè)響起珊拼,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎流炕,沒(méi)想到半個(gè)月后澎现,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡每辟,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年昔头,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片影兽。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡揭斧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出峻堰,到底是詐尸還是另有隱情讹开,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布捐名,位于F島的核電站旦万,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏镶蹋。R本人自食惡果不足惜成艘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一赏半、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧淆两,春花似錦断箫、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至剑勾,卻和暖如春埃撵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背虽另。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工暂刘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人捂刺。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓鸳惯,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親叠萍。 傳聞我的和親對(duì)象是個(gè)殘疾皇子芝发,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容