本文參考了 Apache IoTDB 社區(qū)成員田原和王中的設(shè)計(jì)文檔,由于飛書(shū)鏈接限制晚吞,本文沒(méi)有貼出參考鏈接。
背景
Apache IoTDB 查詢引擎目前采用 MPP 架構(gòu),一條查詢 SQL 大致會(huì)經(jīng)歷下圖幾個(gè)階段:
FragmentInstance 是分布式計(jì)劃被拆分后實(shí)際分發(fā)到各個(gè)節(jié)點(diǎn)進(jìn)行執(zhí)行的實(shí)例熙含。這些被拆分出的 FragmentInstance 邏輯上仍然構(gòu)成一個(gè)樹(shù)形結(jié)構(gòu),父親結(jié)點(diǎn)需要子結(jié)點(diǎn)的輸出作為輸入(即下游 FragmentInstance 需要接收上游 FragmentInstance 的輸出作為輸入)來(lái)完成相應(yīng)的邏輯艇纺。
由于 FragmentInstance 可能被分發(fā)到不同節(jié)點(diǎn)怎静,數(shù)據(jù)傳輸需要進(jìn)行網(wǎng)絡(luò)通信并且具有依賴關(guān)系,需要對(duì) FragmentInstance 之間的數(shù)據(jù)傳輸進(jìn)行管理黔衡,因此引入了數(shù)據(jù)異步傳輸模塊蚓聘。該模塊可以看成是火山模型中 ExchangeOperator 的一種實(shí)現(xiàn)方式,使用了生產(chǎn)者消費(fèi)者模型员帮。
重要概念
- ISinkHandle:通常每一個(gè) FragmentInstance 持有一個(gè) ISinkHandle或粮,用于向上游 FragmentInstance 異步傳輸計(jì)算結(jié)果。
- ExchangeOperator:FragmentInstance 間數(shù)據(jù)傳輸邏輯和 FragmentInstance 的執(zhí)行邏輯是解耦的捞高,F(xiàn)ragmentInstance 的算子樹(shù)結(jié)點(diǎn)可能存在 ExchangeOperator氯材,ExchangeOperator 持有 ISourceHandle,可以從上游獲取輸入硝岗。
- ISourceHandle:與 ISinkHandle 一一對(duì)應(yīng)氢哮,接收 ISinkHandle 的計(jì)算結(jié)果,傳給 ExchangeOperator型檀。
- MPPDataExchangeManager:當(dāng)前節(jié)點(diǎn)數(shù)據(jù)傳輸模塊的管理中心冗尤,持有線程資源,是 ISinkHandle 和 ISourceHandle 交互的中間站胀溺。
具體實(shí)現(xiàn)
MPPDataExchangeManager
MppDataExchangeManager 是當(dāng)前節(jié)點(diǎn)數(shù)據(jù)傳輸模塊的管理中心裂七,有下述職責(zé):
- 負(fù)責(zé)創(chuàng)建 ISinkHandle 和 ISourceHandle。FragmentInstance 需要通過(guò) MPPDataExchangeManager 創(chuàng)建 ISinkHandle 和 ISourceHandle仓坞,MPPDataExchangeManager 維護(hù)了兩個(gè) Map背零。
// FragmentInstance 可能有多個(gè) ExchangeOperator,進(jìn)而有多個(gè) ISourceHandle
// 因此這里的 Map 是一個(gè)兩層 Map无埃,即 FragmentInstance -> PlanNodeID -> ISourceHandle
private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
// FragmentInstance -> SinkHandle
private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
- 定義了 SinkHandleListener 和 SourceHandleListener徙瓶。ISinkHandle 和 ISourceHandle 定義了 abort(), close() 等方法, SinkHandleListener 和 SourceHandleListener 會(huì)在這些方法里被使用嫉称,用于通知相應(yīng) FragmentInstance 以及更新前述兩個(gè) Map侦镇。
- 實(shí)現(xiàn)了 MPPDataExchangeService.Iface。不同節(jié)點(diǎn)間通過(guò) Thrift RPC 通信织阅,MPPDataExchangeService.Iface 定義了 SinkHandle 和 SourceHandle 的交互接口壳繁,接口具體邏輯將在下文分析。
SinkHandle 和 SourceHandle
SinkHandle 和 SourceHandle 是 ISinkHandle 和 ISourceHandle 的一組實(shí)現(xiàn)類(lèi),用于不同節(jié)點(diǎn)間 FragmentInstance 的數(shù)據(jù)通信氮趋。
SinkHandle 和 SourceHandle 的數(shù)據(jù)通信主要分為三步:
- 每產(chǎn)生一個(gè) TsBlock伍派,SinkHandle 向 SourceHandle 發(fā)送一個(gè) NewDataBlockEvent,包含該 TsBlock 的sequenceId 以及所占內(nèi)存大惺P病(如果再無(wú)新的數(shù)據(jù)產(chǎn)生诉植,則發(fā)送一個(gè)EndOfDataBlockEvent)。在接收到 SourceHandle 對(duì)該 TsBlock 的 ack 之前昵观,保存該 TsBlock晾腔。
- SourceHandle 收到 NewDataBlockEvent后,在內(nèi)存中選取一段連續(xù)區(qū)間的 sequenceId啊犬,向 SinkHandle 發(fā)起拉取數(shù)據(jù)的請(qǐng)求灼擂。
- SourceHandle 拉取到數(shù)據(jù)后,向 SinkHandle 發(fā)送 ack 消息觉至,SinkHandle 收到 ack 消息后剔应,便可以將對(duì)應(yīng)的TsBlock 釋放。
- img
首先來(lái)看 SinkHandle 發(fā)送數(shù)據(jù)的邏輯(只有 SinkHandle 的 isFull() 返回的 Future 被 complete 后语御,send 方法才會(huì)被調(diào)用峻贮,具體可以參考 Driver#processInternal):
@Override
public synchronized void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
if (noMoreTsBlocks) {
return;
}
long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
.left;
bufferRetainedSizeInBytes += retainedSizeInBytes;
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
nextSequenceId += 1;
currentTsBlockSize = retainedSizeInBytes;
// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
} finally {
QUERY_METRICS.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
- SinkHandle 在初始化的時(shí)候就會(huì)向內(nèi)存池申請(qǐng)內(nèi)存,此時(shí)會(huì)初始化 blocked 這個(gè) Future应闯。
- 進(jìn)入 send 方法說(shuō)明 blocked.isDone() == true纤控,send 并不會(huì)直接發(fā)送 TsBlock,而是發(fā)送 NewDataBlockEventTask碉纺,SourceHandle 后續(xù)會(huì)通過(guò) sequenceId 拉取指定的 TsBlock船万。
- send() 用這次發(fā)送的 TsBlock 的大小來(lái)估計(jì)下一次要發(fā)送的 TsBlock 的大小,所以 16 -25 行更新 blocked 時(shí)使用的是當(dāng)前 TsBlock 的 retainedSizeInBytes骨田。
下面來(lái)看 SourceHandle 拉取 TsBlock 的邏輯耿导,可以直接參考注釋?zhuān)?/p>
private synchronized void trySubmitGetDataBlocksTask() {
if (aborted || closed) {
return;
}
if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
return;
}
final int startSequenceId = nextSequenceId;
int endSequenceId = nextSequenceId;
long reservedBytes = 0L;
Pair<ListenableFuture<Void>, Boolean> pair = null;
long blockedSize = 0L;
// 選取一段連續(xù)的 sequenceId
while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
if (bytesToReserve == null) {
throw new IllegalStateException("Data block size is null.");
}
// 從內(nèi)存池申請(qǐng)內(nèi)存
pair =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
bufferRetainedSizeInBytes += bytesToReserve;
endSequenceId += 1;
reservedBytes += bytesToReserve;
// 沒(méi)有申請(qǐng)到內(nèi)存,跳出循環(huán)
if (!pair.right) {
blockedSize = bytesToReserve;
break;
}
}
if (pair == null) {
// Next data block not generated yet. Do nothing.
return;
}
nextSequenceId = endSequenceId;
// 注冊(cè)回調(diào)函數(shù)态贤,在申請(qǐng)內(nèi)存的 future 被 complete(表明內(nèi)存被申請(qǐng)到了)時(shí)拉取指定 sequenceId 的 TsBlock
if (!pair.right) {
endSequenceId--;
reservedBytes -= blockedSize;
// The future being not completed indicates,
// 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
// 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
// 3. Have not reserve memory for the rest of blocks.
//
// startSequenceId endSequenceId - 1 endSequenceId
// |-------- reserved --------|--- blocked ---|--- not reserved ---|
// Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
blockedOnMemory = pair.left;
final int blockedSequenceId = endSequenceId;
final long blockedRetainedSize = blockedSize;
blockedOnMemory.addListener(
() ->
executorService.submit(
new GetDataBlocksTask(
blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)),
executorService);
}
if (endSequenceId > startSequenceId) {
executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
}
}
LocalSinkHandle 和 LocalSourceHandle
LocalSinkHandle 和 LocalSourceHandle 是 ISinkHandle 和 ISourceHandle 的另一組實(shí)現(xiàn)類(lèi)舱呻,用于同一節(jié)點(diǎn)不同 FragmentInstance 的數(shù)據(jù)通信。不復(fù)用 SinkHandle 和 SourceHandle 是因?yàn)橥还?jié)點(diǎn)沒(méi)必要再使用 RPC 通信抵卫,可以節(jié)省網(wǎng)絡(luò)開(kāi)銷(xiāo)狮荔。
LocalSinkHandle 和 LocalSourceHandle 通過(guò)一個(gè)共享的阻塞隊(duì)列 SharedTsBlockQueue 進(jìn)行通信胎撇。
LocalSinkHandle 的發(fā)送邏輯(只有 LocalSinkHandle 的 isFull() 返回的 Future 被 complete 后才會(huì)發(fā)送介粘,直接往 queue 里放 TsBlock):
@Override
public void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
synchronized (this) {
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
}
synchronized (queue) {
if (queue.hasNoMoreTsBlocks()) {
return;
}
logger.debug("[StartSendTsBlockOnLocal]");
synchronized (this) {
blocked = queue.add(tsBlock);
}
}
} finally {
QUERY_METRICS.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
LocalSourceHandle 的拉取邏輯(只有 isBlocked() 返回的 Future complete 時(shí)才會(huì)被調(diào)用):
@Override
public TsBlock receive() {
long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
if (!queue.isBlocked().isDone()) {
throw new IllegalStateException("Source handle is blocked.");
}
TsBlock tsBlock;
synchronized (queue) {
tsBlock = queue.remove();
}
if (tsBlock != null) {
logger.debug(
"[GetTsBlockFromQueue] TsBlock:{} size:{}",
currSequenceId,
tsBlock.getRetainedSizeInBytes());
currSequenceId++;
}
checkAndInvokeOnFinished();
return tsBlock;
} finally {
QUERY_METRICS.recordDataExchangeCost(
SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
LocalSinkHandle 的 send 方法和 LocalSourceHandle 的 receive 方法實(shí)現(xiàn)都較為簡(jiǎn)單,主要通過(guò) SharedTsBlockQueue 進(jìn)行交互晚树,下面是 SharedTsBlockQueue 的 remove 和 add 方法:
/**
* Remove a tsblock from the head of the queue and return. Should be invoked only when the future
* returned by {@link #isBlocked()} completes.
*/
public TsBlock remove() {
if (closed) {
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the event to
// corresponding LocalSinkHandle.
if (sinkHandle != null) {
sinkHandle.checkAndInvokeOnFinished();
}
// 釋放當(dāng)前 TsBlock 在 MemoryPool 中占用的內(nèi)存
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
return tsBlock;
}
/**
* Add tsblocks to the queue. Except the first invocation, this method should be invoked only when
* the returned future of last invocation completes.
*/
public ListenableFuture<Void> add(TsBlock tsBlock) {
if (closed) {
logger.warn("queue has been destroyed");
return immediateVoidFuture();
}
Validate.notNull(tsBlock, "TsBlock cannot be null");
Validate.isTrue(blockedOnMemory == null || blockedOnMemory.isDone(), "queue is full");
Pair<ListenableFuture<Void>, Boolean> pair =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getInstanceId(),
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
// reserve memory failed, we should wait until there is enough memory
if (!pair.right) {
blockedOnMemory.addListener(
() -> {
synchronized (this) {
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
}
},
directExecutor());
} else { // reserve memory succeeded, add the TsBlock directly
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
}
}
return blockedOnMemory;
MemoryPool
由于采用異步傳輸機(jī)制姻采,SinkHandle 在實(shí)際發(fā)送數(shù)據(jù)前需先將計(jì)算好的 TsBlock 保留在內(nèi)存中,SourceHandle 在接收 TsBlock 前也需要先預(yù)留內(nèi)存爵憎,為了對(duì)數(shù)據(jù)傳輸模塊占用的內(nèi)存進(jìn)行管理慨亲,SinkHandle 和 SourceHandle 需要通過(guò) MemoryPool 申請(qǐng)內(nèi)存婚瓜。
每個(gè)節(jié)點(diǎn)持有一個(gè) MemoryPool,大小由配置參數(shù)決定:
public class LocalMemoryManager {
private final MemoryPool queryPool;
public LocalMemoryManager() {
queryPool =
new MemoryPool(
"query",
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForDataExchange(),
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance());
}
public MemoryPool getQueryPool() {
return queryPool;
}
public MemoryPool(String id, long maxBytes, long maxBytesPerFragmentInstance) {
this.id = Validate.notNull(id);
Validate.isTrue(maxBytes > 0L, "max bytes should be greater than zero: %d", maxBytes);
// maxBytes 是整個(gè) pool 最大可以使用的內(nèi)存容量
this.maxBytes = maxBytes;
Validate.isTrue(
maxBytesPerFragmentInstance > 0L && maxBytesPerFragmentInstance <= maxBytes,
"max bytes per query should be greater than zero while less than or equal to max bytes. maxBytesPerQuery: %d, maxBytes: %d",
maxBytesPerFragmentInstance,
maxBytes);
// maxBytesPerFragmentInstance 是單個(gè) FragmentInstance 的 ISinkHandle 和 ISourceHandle
// 占用內(nèi)存之和的最大值
this.maxBytesPerFragmentInstance = maxBytesPerFragmentInstance;
}
ISinkHandle 和 ISourceHandle 通過(guò) MemoryPool 的 reserve 方法申請(qǐng)內(nèi)存刑棵,reserve 在判斷是否申請(qǐng)成功時(shí)進(jìn)行兩層判斷:
- 首先判斷申請(qǐng)的內(nèi)存會(huì)不會(huì)超過(guò) MemoryPool 最大限制巴刻,maxBytes - reservedBytes < bytesToReserve 表明超過(guò)限制。
- 每一個(gè) ISinkHandle/ISourceHandle 能申請(qǐng)的內(nèi)存也有限制蛉签,第二層判斷申請(qǐng)的內(nèi)存會(huì)不會(huì)超過(guò)調(diào)用 reserve 方法的 ISinkHandle/ISourceHandle 的限制胡陪,即 27 - 32 行邏輯。
如果申請(qǐng)成功碍舍,則更新 MemoryPool 已使用的內(nèi)存以及該 ISinkHandle/ISourceHandle 占用的內(nèi)存(更新 queryMemoryReservations)柠座,然后返回 Futures.immediateFuture(null);
如果申請(qǐng)失敗,則創(chuàng)建一個(gè) MemoryReservationFuture片橡,加入維持的 list妈经,當(dāng)調(diào)用 MemoryPool#free 釋放內(nèi)存的時(shí)候,會(huì)選取 list 中的 future 進(jìn)行 complete捧书。
下面是 reserve 方法的源碼:
/** @return if reserve succeed, pair.right will be true, otherwise false */
public Pair<ListenableFuture<Void>, Boolean> reserve(
String queryId,
String fragmentInstanceId,
String planNodeId,
long bytesToReserve,
long maxBytesCanReserve) {
Validate.notNull(queryId);
Validate.notNull(fragmentInstanceId);
Validate.notNull(planNodeId);
Validate.isTrue(
bytesToReserve > 0L && bytesToReserve <= maxBytesPerFragmentInstance,
"bytes should be greater than zero while less than or equal to max bytes per fragment instance: %d",
bytesToReserve);
if (bytesToReserve > maxBytesCanReserve) {
LOGGER.warn(
"Cannot reserve {} bytes memory from MemoryPool for planNodeId{}",
bytesToReserve,
planNodeId);
throw new IllegalArgumentException(
"Query is aborted since it requests more memory than can be allocated.");
}
ListenableFuture<Void> result;
synchronized (this) {
if (maxBytes - reservedBytes < bytesToReserve
|| maxBytesCanReserve
- queryMemoryReservations
.getOrDefault(queryId, Collections.emptyMap())
.getOrDefault(fragmentInstanceId, Collections.emptyMap())
.getOrDefault(planNodeId, 0L)
< bytesToReserve) {
LOGGER.debug(
"Blocked reserve request: {} bytes memory for planNodeId{}",
bytesToReserve,
planNodeId);
result =
MemoryReservationFuture.create(
queryId, fragmentInstanceId, planNodeId, bytesToReserve, maxBytesCanReserve);
memoryReservationFutures.add((MemoryReservationFuture<Void>) result);
return new Pair<>(result, Boolean.FALSE);
} else {
reservedBytes += bytesToReserve;
queryMemoryReservations
.computeIfAbsent(queryId, x -> new HashMap<>())
.computeIfAbsent(fragmentInstanceId, x -> new HashMap<>())
.merge(planNodeId, bytesToReserve, Long::sum);
result = Futures.immediateFuture(null);
return new Pair<>(result, Boolean.TRUE);
}
}
}
調(diào)用 MemoryPool#free 時(shí)吹泡,首先會(huì)更新 MemoryPool 占用的內(nèi)存和 ISinkHanlde/ISourceHandle 占用的內(nèi)存。
然后會(huì)遍歷 memoryReservationFutures 查看可以 complete 的 Future:
public void free(String queryId, String fragmentInstanceId, String planNodeId, long bytes) {
List<MemoryReservationFuture<Void>> futureList = new ArrayList<>();
synchronized (this) {
Validate.notNull(queryId);
Validate.isTrue(bytes > 0L);
Long queryReservedBytes =
queryMemoryReservations
.getOrDefault(queryId, Collections.emptyMap())
.getOrDefault(fragmentInstanceId, Collections.emptyMap())
.get(planNodeId);
Validate.notNull(queryReservedBytes);
Validate.isTrue(bytes <= queryReservedBytes);
queryReservedBytes -= bytes;
if (queryReservedBytes == 0) {
queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
} else {
queryMemoryReservations
.get(queryId)
.get(fragmentInstanceId)
.put(planNodeId, queryReservedBytes);
}
reservedBytes -= bytes;
if (memoryReservationFutures.isEmpty()) {
return;
}
Iterator<MemoryReservationFuture<Void>> iterator = memoryReservationFutures.iterator();
while (iterator.hasNext()) {
MemoryReservationFuture<Void> future = iterator.next();
if (future.isCancelled() || future.isDone()) {
continue;
}
long bytesToReserve = future.getBytesToReserve();
String curQueryId = future.getQueryId();
String curFragmentInstanceId = future.getFragmentInstanceId();
String curPlanNodeId = future.getPlanNodeId();
// check total reserved bytes in memory pool
if (maxBytes - reservedBytes < bytesToReserve) {
continue;
}
// check total reserved bytes of one Sink/Source handle
if (future.getMaxBytesCanReserve()
- queryMemoryReservations
.getOrDefault(curQueryId, Collections.emptyMap())
.getOrDefault(curFragmentInstanceId, Collections.emptyMap())
.getOrDefault(curPlanNodeId, 0L)
>= bytesToReserve) {
reservedBytes += bytesToReserve;
queryMemoryReservations
.computeIfAbsent(curQueryId, x -> new HashMap<>())
.computeIfAbsent(curFragmentInstanceId, x -> new HashMap<>())
.merge(curPlanNodeId, bytesToReserve, Long::sum);
futureList.add(future);
iterator.remove();
}
}
}
// why we need to put this outside MemoryPool's lock?
// If we put this block inside the MemoryPool's lock, we will get deadlock case like the
// following:
// Assuming that thread-A: LocalSourceHandle.receive() -> A-SharedTsBlockQueue.remove() ->
// MemoryPool.free() (hold MemoryPool's lock) -> future.set(null) -> try to get
// B-SharedTsBlockQueue's lock
// thread-B: LocalSourceHandle.receive() -> B-SharedTsBlockQueue.remove() (hold
// B-SharedTsBlockQueue's lock) -> try to get MemoryPool's lock
for (MemoryReservationFuture<Void> future : futureList) {
try {
future.set(null);
} catch (Throwable t) {
// ignore it, because we still need to notify other future
LOGGER.error("error happened while trying to free memory: ", t);
}
}
}
總結(jié)
上述流程存在幾點(diǎn)問(wèn)題:
- SourceHandle 無(wú)法對(duì)事件到達(dá)做任務(wù)順序的假設(shè)鳄厌,導(dǎo)致 SourceHandle 的編寫(xiě)有些復(fù)雜荞胡,需要考慮事件亂序到達(dá)的情況。
- 假設(shè) NewDataBlockEvent 事件順序到達(dá)了嚎,且 SourceHandle 的消費(fèi)速度與 SinkHandle 的生成速度一致泪漂,則傳輸一個(gè) TsBlock,需3次 RPC歪泳,網(wǎng)絡(luò)開(kāi)銷(xiāo)較大:
- SinkHandle 發(fā)送 NewDataBlockEvent
- SourceHandle 拉取 NewDataBlockEvent 對(duì)應(yīng) sequenceId的 TsBlock
- SourceHandle 成功拉取到 TsBlock 后萝勤,發(fā)送 ack 消息
傳輸一個(gè) TsBlock 需要三次 rpc 通信的設(shè)計(jì)初衷:
- 控制數(shù)據(jù)傳輸線程數(shù)量,并且不讓一個(gè) SourceHandle 占據(jù)一個(gè)線程過(guò)久呐伞,以致其他 SourceHandle 無(wú)法發(fā)起數(shù)據(jù)傳輸請(qǐng)求:
- 因?yàn)?thrift rpc 是同步的敌卓,所以如果沒(méi)有 NewDataBlockEvent,SourceHandle 盲目去拉取數(shù)據(jù)伶氢,那么可能SinkHandle 端數(shù)據(jù)還未準(zhǔn)備好趟径。如果阻塞等待 SinkHandle 產(chǎn)生數(shù)據(jù)后,返回此次 rpc 結(jié)果癣防,會(huì)導(dǎo)致一個(gè)SourceHandle 占據(jù)一個(gè)數(shù)據(jù)傳輸線程過(guò)久蜗巧,在數(shù)據(jù)傳輸線程總數(shù)一定的情況下,其他 SourceHandle 的請(qǐng)求會(huì)得不到及時(shí)處理蕾盯。
- 所以需要依賴 SinkHandle 的 NewDataBlockEvent 通知幕屹,在 SinkHandle 數(shù)據(jù)準(zhǔn)備好的時(shí)候,發(fā)送一個(gè) rpc 去通知 SourceHandle 拉取數(shù)據(jù)。此時(shí) SourceHandle 拉取數(shù)據(jù)的 rpc 雖然也是同步的望拖,但是 SinkHandle 端的數(shù)據(jù)一定是準(zhǔn)備好的渺尘,所以該 rpc 同步阻塞占用數(shù)據(jù)傳輸線程的時(shí)間很短,不會(huì)導(dǎo)致 SourceHandle的請(qǐng)求過(guò)多等待
- 防止 TsBlock 丟失说敏,啟用 ack 消息去做容錯(cuò)
- 如果 SinkHandle 在收到 SourceHandle 拉取請(qǐng)求的 rpc 后鸥跟,就將對(duì)應(yīng)的 TsBlock 釋放,那么會(huì)存在數(shù)據(jù)丟失的風(fēng)險(xiǎn):網(wǎng)絡(luò)問(wèn)題導(dǎo)致此次 rpc 失敗盔沫,雖然 SourceHandle 那邊會(huì)重試锌雀,但是 SinkHandle 在處理上次 rpc 請(qǐng)求時(shí),已經(jīng)把對(duì)應(yīng)的 TsBlock 釋放掉了迅诬,導(dǎo)致 SourceHandle 的重試也是徒勞無(wú)功腋逆。
- 內(nèi)存控制實(shí)現(xiàn)較為簡(jiǎn)單,因?yàn)樵诶?shù)據(jù)之前就已經(jīng)得知每個(gè) TsBlock 的大小侈贷,所以可以判斷 SourceHandle 端內(nèi)存是否足夠惩歉,足夠了才去拉取,無(wú)需提前預(yù)分配內(nèi)存給查詢俏蛮,每次即時(shí)申請(qǐng)即可撑蚌。