Flink源碼分析系列文檔目錄
請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄
從collector到buffer
下面我們從數(shù)據(jù)源出開(kāi)始分析數(shù)據(jù)是如何寫(xiě)入到Flink緩存中的拧篮。
NoTimestampContext.collect方法儡嘶。該方法位于數(shù)據(jù)源(SourceFunction)中。
@Override
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
這里調(diào)用的是output
對(duì)象的collect
方法胁编。Output對(duì)象是Output<StreamRecord<T>>
類(lèi)型。經(jīng)過(guò)debug我們發(fā)現(xiàn)這里的output真實(shí)類(lèi)型為CountingOutput
類(lèi)型务冕。
CountingOutput
僅僅是一個(gè)包裝類(lèi)型届良,包裝了一個(gè)Output
。相比于其他Output
而言多出了收集元素?cái)?shù)量的監(jiān)控答捕。CountingOutput
維護(hù)了一個(gè)計(jì)數(shù)器類(lèi)型監(jiān)控變量:
private final Counter numRecordsOut;
在collect元素的時(shí)候調(diào)用了numRecordsOut.inc()
方法,實(shí)現(xiàn)了對(duì)收集元素?cái)?shù)量的監(jiān)控屑那。
NoTimestampContext
的CountingOuput封裝的output是什么類(lèi)型的呢拱镐?我們通過(guò)debug查看發(fā)現(xiàn)內(nèi)層的類(lèi)型為RecordWriterOutput
。
RecordWriterOutput
的collect
方法如下所示:
@Override
public void collect(StreamRecord<OUT> record) {
// outputTag使用旁路輸出的時(shí)候會(huì)用到持际,這里只支持輸出到main input
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToRecordWriter(record);
}
pushToRecordWriter
方法使用序列化代理沃琅,將record傳遞給recordWriter
。代碼如下:
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
RecordWriter
負(fù)責(zé)把數(shù)據(jù)序列化蜘欲,然后寫(xiě)入到緩存中益眉。它有兩個(gè)實(shí)現(xiàn)類(lèi):
-
BroadcastRecordWriter
: 維護(hù)了多個(gè)下游channel,發(fā)送數(shù)據(jù)到下游所有的channel中。 -
ChannelSelectorRecordWriter
: 通過(guò)channelSelector
對(duì)象判斷數(shù)據(jù)需要發(fā)往下游的哪個(gè)channel郭脂。keyBy
算子用的正是這個(gè)RecordWriter
年碘。
這里我們分析下ChannelSelectorRecordWriter
的emit
方法:
@Override
public void emit(T record) throws IOException, InterruptedException {
emit(record, channelSelector.selectChannel(record));
}
很明顯這里使用了channelSelector.selectChannel
方法。該方法為record和對(duì)應(yīng)下游channel id的函數(shù)關(guān)系展鸡。
接下來(lái)我們又回到了父類(lèi)RecordWriter
屿衅。
protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
checkErroneous();
serializer.serializeRecord(record);
// Make sure we don't hold onto the large intermediate serialization buffer for too long
if (copyFromSerializerToTargetChannel(targetChannel)) {
// 壓縮序列化器中間數(shù)據(jù)緩存大小
serializer.prune();
}
}
關(guān)鍵的邏輯在于copyFromSerializerToTargetChannel
。此方法從序列化器中復(fù)制數(shù)據(jù)到目標(biāo)channel莹弊。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
// We should reset the initial position of the intermediate serialization buffer before
// copying, so the serialization results can be copied to multiple target buffers.
// 此處Serializer為SpanningRecordSerializer
// reset方法將serializer內(nèi)部的databuffer position重置為0
serializer.reset();
boolean pruneTriggered = false;
// 獲取目標(biāo)channel的bufferBuilder
// bufferBuilder內(nèi)維護(hù)了MemorySegment涤久,即內(nèi)存片段
// Flink的內(nèi)存管理依賴MemorySegment,可實(shí)現(xiàn)堆內(nèi)堆外內(nèi)存的管理
// RecordWriter內(nèi)有一個(gè)bufferBuilder數(shù)組忍弛,長(zhǎng)度和下游channel數(shù)目相同
// 該數(shù)組以channel ID為下標(biāo)拴竹,存儲(chǔ)和channel對(duì)應(yīng)的bufferBuilder
// 如果對(duì)應(yīng)channel的bufferBuilder尚未創(chuàng)建,調(diào)用requestNewBufferBuilder申請(qǐng)一個(gè)新的bufferBuilder
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
// 復(fù)制serializer的數(shù)據(jù)到bufferBuilder中
SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
// 循環(huán)直到result完全被寫(xiě)入到buffer
// 一條數(shù)據(jù)可能會(huì)被寫(xiě)入到多個(gè)緩存中
// 如果緩存不夠用剧罩,會(huì)申請(qǐng)新的緩存
// 數(shù)據(jù)完全寫(xiě)入完畢之時(shí),當(dāng)前正在操作的緩存是沒(méi)有寫(xiě)滿的
// 因此返回true座泳,表明需要壓縮該buffer的空間
while (result.isFullBuffer()) {
finishBufferBuilder(bufferBuilder);
// If this was a full record, we are done. Not breaking out of the loop at this point
// will lead to another buffer request before breaking out (that would not be a
// problem per se, but it can lead to stalls in the pipeline).
if (result.isFullRecord()) {
pruneTriggered = true;
emptyCurrentBufferBuilder(targetChannel);
break;
}
bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
// 如果buffer超時(shí)時(shí)間為0惠昔,需要flush目標(biāo)channel的數(shù)據(jù)
if (flushAlways) {
flushTargetPartition(targetChannel);
}
return pruneTriggered;
}
接下來(lái)分析下getBufferBuilder
方法。以ChannelSelectorRecordWriter
的此方法為例說(shuō)明挑势。
@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
if (bufferBuilders[targetChannel] != null) {
return bufferBuilders[targetChannel];
} else {
return requestNewBufferBuilder(targetChannel);
}
}
如果bufferBuilders
數(shù)組中targetChannel下標(biāo)不存在镇防,申請(qǐng)一個(gè)新的BufferBuilder
。此處我們發(fā)現(xiàn)各個(gè)channel對(duì)應(yīng)的bufferBuilder
是懶加載的潮饱,只有第一次用到的時(shí)候才創(chuàng)建来氧。
我們跟蹤到requestNewBufferBuilder
方法。
@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
// 首先需要檢查targetChannel對(duì)應(yīng)的buffer必須為null或數(shù)據(jù)已寫(xiě)入完畢
checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
// 獲取目標(biāo)分區(qū)的bufferBuilder
BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
// 創(chuàng)建一個(gè)bufferConsumer香拉,bufferConsumer保存了bufferBuilder的memorySegment啦扬,當(dāng)前寫(xiě)入指針和當(dāng)前讀取指針
// BufferBuilder用于寫(xiě)入數(shù)據(jù),BufferConsumer用于讀取數(shù)據(jù)
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
bufferBuilders[targetChannel] = bufferBuilder;
return bufferBuilder;
}
addBufferConsumer
方法
@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
checkNotNull(bufferConsumer);
ResultSubpartition subpartition;
try {
checkInProduceState();
// 獲取subPartition
// 此處subpartitionIndex為targetChannel
subpartition = subpartitions[subpartitionIndex];
}
catch (Exception ex) {
bufferConsumer.close();
throw ex;
}
// 將bufferConsumer添加入subpartition
return subpartition.add(bufferConsumer);
}
ReultSubPartition
有兩個(gè)實(shí)現(xiàn)類(lèi)凫碌,PipelinedSubpartition
和BoundedBlockingSubpartition
扑毡。
其中PipelinedSubpartition
用于流處理場(chǎng)景下的數(shù)據(jù)消費(fèi)。它內(nèi)部維護(hù)了一個(gè)BufferBuilder
隊(duì)列盛险。消費(fèi)者通過(guò)調(diào)用createReadView
創(chuàng)建一個(gè)PipelinedSubpartitionView
來(lái)消費(fèi)數(shù)據(jù)瞄摊。創(chuàng)建view的時(shí)候需要提供一個(gè)BufferAvailabilityListener
對(duì)象,用于作為buffer中有數(shù)據(jù)可用時(shí)候的回調(diào)苦掘。因此PipelinedSubpartition
可以做到一旦有數(shù)據(jù)就及時(shí)提醒下游去消費(fèi)换帜。
BoundedBlockingSubpartition
適合批處理場(chǎng)景下的數(shù)據(jù)消費(fèi)。和PipelinedSubpartition
不同的是鹤啡,BoundedBlockingSubpartition
數(shù)據(jù)是先寫(xiě)入后消費(fèi)的惯驼,可以一次寫(xiě)入,多次消費(fèi)递瑰。它的數(shù)據(jù)寫(xiě)入到BoundedData
中跳座。數(shù)據(jù)落地的方式隨著BoundedData
實(shí)現(xiàn)的不同而不同端铛。數(shù)據(jù)可以保存在文件系統(tǒng)(FileChannelBoundedData
),內(nèi)存(MemoryMappedBoundedData
)或者同時(shí)在文件系統(tǒng)和內(nèi)存(FileChannelMemoryMappedBoundedData
)疲眷。
下游請(qǐng)求SubPartition
上一段分析過(guò)數(shù)據(jù)的消費(fèi)是通過(guò)ResultSubPartition
調(diào)用createReadView
方法禾蚕。
PipelinedSubpartition
的createReadView
代碼如下:
@Override
public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
final boolean notifyDataAvailable;
synchronized (buffers) {
// 檢查該SubPartition的緩存不能被釋放
checkState(!isReleased);
// 檢查之前不能創(chuàng)建過(guò)read view
checkState(readView == null,
"Subpartition %s of is being (or already has been) consumed, " +
"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
parent.getOwningTaskName(), index, parent.getPartitionId());
// 創(chuàng)建view,同時(shí)轉(zhuǎn)入availabilityListener
readView = new PipelinedSubpartitionView(this, availabilityListener);
// 如果buffer不為空狂丝,需要調(diào)用listener通知數(shù)據(jù)已準(zhǔn)備好换淆,可供消費(fèi)
notifyDataAvailable = !buffers.isEmpty();
}
if (notifyDataAvailable) {
notifyDataAvailable();
}
return readView;
}
上述方法在ResultPartitionManager
中調(diào)用。
ResultPartitionManager
負(fù)責(zé)維護(hù)當(dāng)前創(chuàng)建和消費(fèi)的分區(qū)几颜。
ResultPartitionManager
的createSubpartitionView
方法:
@Override
public ResultSubpartitionView createSubpartitionView(
ResultPartitionID partitionId,
int subpartitionIndex,
BufferAvailabilityListener availabilityListener) throws IOException {
synchronized (registeredPartitions) {
final ResultPartition partition = registeredPartitions.get(partitionId);
if (partition == null) {
throw new PartitionNotFoundException(partitionId);
}
LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
return partition.createSubpartitionView(subpartitionIndex, availabilityListener);
}
}
該方法邏輯比較簡(jiǎn)單倍试,ResultPartition
在setup
的時(shí)候會(huì)將該分區(qū)注冊(cè)到ResultPartitionManager
中。創(chuàng)建view的時(shí)候會(huì)根據(jù)partition id從已注冊(cè)的分區(qū)列表中獲取到指定的ResultPartition
蛋哭,然后創(chuàng)建一個(gè)subpartition view县习。
繼續(xù)跟蹤該方法的調(diào)用鏈,我們可以發(fā)現(xiàn)該方法在兩個(gè)類(lèi)中調(diào)用:LocalInputChannel
和CreditBasedSequenceNumberingViewReader
谆趾。
LocalInputChannel
負(fù)責(zé)從本地請(qǐng)求一個(gè)subPartition view躁愿。
CreditBasedSequenceNumberingViewReader
負(fù)責(zé)通過(guò)網(wǎng)絡(luò)從其他節(jié)點(diǎn)獲取subPartition view。同時(shí)提供了credit based反壓機(jī)制的支持沪蓬。
我們跟蹤下CreditBasedSequenceNumberingViewReader
的requestSubpartitionView
方法:
@Override
public void requestSubpartitionView(
ResultPartitionProvider partitionProvider,
ResultPartitionID resultPartitionId,
int subPartitionIndex) throws IOException {
synchronized (requestLock) {
if (subpartitionView == null) {
// This this call can trigger a notification we have to
// schedule a separate task at the event loop that will
// start consuming this. Otherwise the reference to the
// view cannot be available in getNextBuffer().
this.subpartitionView = partitionProvider.createSubpartitionView(
resultPartitionId,
subPartitionIndex,
this);
} else {
throw new IllegalStateException("Subpartition already requested");
}
}
}
方法邏輯僅為創(chuàng)建一個(gè)subpartitionView
彤钟。繼續(xù)向上跟蹤該方法的調(diào)用位置,我們找到了PartitionRequestServerHandler
的channelRead0
方法:
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
try {
// 獲取接收到消息的類(lèi)型
Class<?> msgClazz = msg.getClass();
// ----------------------------------------------------------------
// Intermediate result partition requests
// ----------------------------------------------------------------
// 如果是分區(qū)請(qǐng)求消息
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
try {
NetworkSequenceViewReader reader;
// 創(chuàng)建一個(gè)reader
reader = new CreditBasedSequenceNumberingViewReader(
request.receiverId,
request.credit,
outboundQueue);
// 為該reader分配一個(gè)subpartitionView
reader.requestSubpartitionView(
partitionProvider,
request.partitionId,
request.queueIndex);
// 注冊(cè)reader到outboundQueue中
// outboundQueue中存放了多個(gè)reader跷叉,這些reader在隊(duì)列中排隊(duì)逸雹,等待數(shù)據(jù)發(fā)送
outboundQueue.notifyReaderCreated(reader);
} catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}
// ----------------------------------------------------------------
// Task events
// ----------------------------------------------------------------
else if (msgClazz == TaskEventRequest.class) {
TaskEventRequest request = (TaskEventRequest) msg;
if (!taskEventPublisher.publish(request.partitionId, request.event)) {
respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
}
} else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
} else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
} else if (msgClazz == AddCredit.class) {
AddCredit request = (AddCredit) msg;
outboundQueue.addCredit(request.receiverId, request.credit);
} else {
LOG.warn("Received unexpected client request: {}", msg);
}
} catch (Throwable t) {
respondWithError(ctx, t);
}
}
此方法在上游數(shù)據(jù)發(fā)送端執(zhí)行,數(shù)據(jù)發(fā)送端對(duì)應(yīng)的netty角色為server云挟。
這里我們根據(jù)netty接收到的消息的類(lèi)型梆砸,來(lái)做出對(duì)應(yīng)的響應(yīng)。如果接收到的消息類(lèi)型為PartitionRequest
园欣,需要?jiǎng)?chuàng)建一個(gè)CreditBasedSequenceNumberingViewReader
并將該reader加入到outboundQueue
中辫樱。
outboundQueue
是一個(gè)PartitionRequestQueue
類(lèi)型對(duì)象。該對(duì)象負(fù)責(zé)處理partition request俊庇。每次partition request會(huì)在PartitionRequestServerHandler
中創(chuàng)建一個(gè)NetworkSequenceViewReader
對(duì)象狮暑。然后給每個(gè)reader分配SubPartitionView(調(diào)用requestSubpartitionView)。最后調(diào)用notifyReaderCreated
把reader加入到PartitionRequestQueue
的allReaders
中辉饱。PartitionRequestQueue
監(jiān)聽(tīng)下游的channel是否可寫(xiě)(writability)搬男。下游channel變?yōu)榭蓪?xiě)的時(shí)候會(huì)調(diào)用channelWritabilityChanged
方法,將allReaders
中排隊(duì)的reader逐個(gè)取出彭沼,發(fā)往下游缔逛。
channelWritabilityChanged
方法代碼如下:
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
writeAndFlushNextMessageIfPossible
方法代碼如下:
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
// 如果channel不可寫(xiě),返回
if (fatalError || !channel.isWritable()) {
return;
}
// The logic here is very similar to the combined input gate and local
// input channel logic. You can think of this class acting as the input
// gate and the consumed views as the local input channels.
BufferAndAvailability next = null;
try {
while (true) {
// 隊(duì)列中取出一個(gè)reader
NetworkSequenceViewReader reader = pollAvailableReader();
// No queue with available data. We allow this here, because
// of the write callbacks that are executed after each write.
if (reader == null) {
return;
}
// 獲取buffer
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg = new ErrorResponse(
new ProducerFailedException(cause),
reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
// 包裝buffer
BufferResponse msg = new BufferResponse(
next.buffer(),
reader.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
// 將msg發(fā)送到下游
channel.writeAndFlush(msg).addListener(writeListener);
return;
}
}
} catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
}
}
接下來(lái)我們回到PartitionRequest
這個(gè)請(qǐng)求。PartitionRequest
是在哪里發(fā)送的呢褐奴?我們跟蹤到NettyPartitionRequestClient
的requestSubpartition
方法:
@Override
public void requestSubpartition(
final ResultPartitionID partitionId,
final int subpartitionIndex,
final RemoteInputChannel inputChannel,
int delayMs) throws IOException {
checkNotClosed();
LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
subpartitionIndex, partitionId, delayMs);
// clientHandler為CreditBasedPartitionRequestClientHandler
// 它內(nèi)部維護(hù)了input channel ID和channel的對(duì)應(yīng)關(guān)系按脚,是一個(gè)map類(lèi)型變量
// 在讀取消息的時(shí)候,需要依賴該map從channel ID獲取到channel對(duì)象本身
clientHandler.addInputChannel(inputChannel);
// 創(chuàng)建PartitionRequest對(duì)象
final PartitionRequest request = new PartitionRequest(
partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
// 發(fā)送PartitionRequest請(qǐng)求發(fā)送成功之后的回調(diào)函數(shù)
final ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果遇到了錯(cuò)誤
if (!future.isSuccess()) {
// map中移除這個(gè)channel
clientHandler.removeInputChannel(inputChannel);
SocketAddress remoteAddr = future.channel().remoteAddress();
// 為inputChannel內(nèi)部的cause變量賦值敦冬,設(shè)置一個(gè)error
inputChannel.onError(
new LocalTransportException(
String.format("Sending the partition request to '%s' failed.", remoteAddr),
future.channel().localAddress(), future.cause()
));
}
}
};
// 如果不需要延遲發(fā)送
if (delayMs == 0) {
ChannelFuture f = tcpChannel.writeAndFlush(request);
f.addListener(listener);
} else {
// 如果需要延遲發(fā)送辅搬,調(diào)用eventLoop的schedule方法
final ChannelFuture[] f = new ChannelFuture[1];
tcpChannel.eventLoop().schedule(new Runnable() {
@Override
public void run() {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
}, delayMs, TimeUnit.MILLISECONDS);
}
}
繼續(xù)跟蹤調(diào)用鏈,到RemoteInputChannel
的requestSubpartition
方法脖旱。代碼如下所示:
@VisibleForTesting
@Override
public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
try {
partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
} catch (IOException e) {
// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
throw new PartitionConnectionException(partitionId, e);
}
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
}
}
RemoteInputChannel
的requestSubpartition
方法中堪遂,如果partitionRequestClient
,會(huì)預(yù)先通過(guò)connectionManager
創(chuàng)建一個(gè)client萌庆,再調(diào)用requestSubpartition
方法溶褪。
繼續(xù)跟蹤,我們找到SingleInputGate
的requestPartitions
方法践险。代碼如下:
@VisibleForTesting
void requestPartitions() throws IOException, InterruptedException {
synchronized (requestLock) {
// 只能請(qǐng)求一次partition猿妈,第一次調(diào)用該方法后此flag會(huì)被設(shè)置為true
if (!requestedPartitionsFlag) {
if (closeFuture.isDone()) {
throw new IllegalStateException("Already released.");
}
// Sanity checks
if (numberOfInputChannels != inputChannels.size()) {
throw new IllegalStateException(String.format(
"Bug in input gate setup logic: mismatch between " +
"number of total input channels [%s] and the currently set number of input " +
"channels [%s].",
inputChannels.size(),
numberOfInputChannels));
}
// 循環(huán)所有的inputChannels,請(qǐng)求他們對(duì)應(yīng)的subPartition
for (InputChannel inputChannel : inputChannels.values()) {
inputChannel.requestSubpartition(consumedSubpartitionIndex);
}
}
// 方法調(diào)用完畢設(shè)置flag為true巍虫,防止重復(fù)調(diào)用
requestedPartitionsFlag = true;
}
}
SingleInputGate
繼承了InputGate
接口彭则。InputGate
的作用為從intermediate result讀取數(shù)據(jù)到task中。
根據(jù)JobGraph(參見(jiàn)Flink 源碼之JobGraph生成
)的分析我們可以得知operatorChain之間使用的intermediate result來(lái)作為中間結(jié)果緩存。Intermediate result在執(zhí)行時(shí)的真實(shí)數(shù)據(jù)承載對(duì)象為ResultPartition
(一個(gè)或多個(gè),視分區(qū)條件而定)抄伍。ResultPartition
分為一個(gè)或多個(gè)ResultSubPartition
熏挎。每一個(gè)ResultSubPartition
和下游的某一個(gè)InputGate
有對(duì)應(yīng)關(guān)系。下游的InputGate
讀取上游所有對(duì)應(yīng)ResultSubPartition
的內(nèi)容前痘。
讀取數(shù)據(jù)
我們分析下StreamTask
的processInput
方法凛捏。代碼如下所示:
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
// 調(diào)用inputProcessor的processInput方法
InputStatus status = inputProcessor.processInput();
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
// 如果輸入結(jié)束,將mailboxLoopRunning設(shè)置為false芹缔,停止運(yùn)行
controller.allActionsCompleted();
return;
}
// 在inputGate recordWriter或inputProcessor恢復(fù)可用之后異步調(diào)用default action的恢復(fù)操作
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
jointFuture.thenRun(suspendedDefaultAction::resume);
}
這里我們重點(diǎn)關(guān)注下inputProcessor.processInput()
調(diào)用坯癣。
inputProcessor
有兩個(gè)實(shí)現(xiàn)類(lèi):StreamOneInputProcessor
和StreamTwoInputProcessor
。我們看一下StreamOneInputProcessor
的processInput
方法最欠。代碼如下:
@Override
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endHeadOperatorInput(1);
}
}
return status;
}
這里的input
具有兩個(gè)實(shí)現(xiàn)類(lèi)StreamTaskSourceInput
和StreamTaskNetworkInput
示罗。如果該StreamTask運(yùn)行的是數(shù)據(jù)源,則實(shí)現(xiàn)類(lèi)為StreamTaskSourceInput
芝硬。其他情況使用的實(shí)現(xiàn)類(lèi)為StreamTaskNetworkInput
蚜点,需要通過(guò)網(wǎng)絡(luò)讀取數(shù)據(jù)。
我們分析下StreamTaskNetworkInput
的emitNext
方法拌阴。代碼如下:
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// get the stream element from the deserializer
if (currentRecordDeserializer != null) {
// 從buffer的memorySegment中反序列化數(shù)據(jù)
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
// 如果buffer已經(jīng)消費(fèi)了绍绘,可以回收buffer
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
// 如果已經(jīng)讀取到完整記錄
if (result.isFullRecord()) {
// 處理從buffer中反序列化出的數(shù)據(jù),在后續(xù)博客中分析
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// 從CheckpointInputGate讀取數(shù)據(jù)
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// 處理獲取到的緩存,并將緩存中的memory segment提供給currentRecordDeserializer陪拘,供反序列化出消息厂镇,代碼稍后分析
processBufferOrEvent(bufferOrEvent.get());
} else {
// 如果checkpointedInputGate 輸入流結(jié)束,返回END_OF_INPUT
if (checkpointedInputGate.isFinished()) {
checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
if (!checkpointedInputGate.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
我們?cè)倏聪?code>processBufferOrEvent方法的源代碼:
private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
// 如果是buffer的話
if (bufferOrEvent.isBuffer()) {
// 讀取buffer對(duì)應(yīng)的channel id
lastChannel = bufferOrEvent.getChannelIndex();
checkState(lastChannel != StreamTaskInput.UNSPECIFIED);
// 獲取channel對(duì)應(yīng)的record反序列化器
currentRecordDeserializer = recordDeserializers[lastChannel];
checkState(currentRecordDeserializer != null,
"currentRecordDeserializer has already been released");
// 此處是關(guān)鍵左刽,設(shè)置反序列化器要讀取的buffer為inputGate獲取到的buffer
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
// 如果接收到的是event
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
// release the record deserializer immediately,
// which is very valuable in case of bounded stream
// 清除channel對(duì)應(yīng)的反序列化器
// 并將recordDeserializers[channelIndex] 引用置空
releaseDeserializer(bufferOrEvent.getChannelIndex());
}
}
我們繼續(xù)跟蹤buffer是如何從inputGate中獲取的捺信。經(jīng)debug我們發(fā)現(xiàn)這個(gè)inputGate使用的2層包裝。CheckpointedInputGate
包裝了InputGateWithMetrics
悠反,又包裝了SingleInputGate
残黑。其中CheckpointedInputGate
負(fù)責(zé)檢查數(shù)據(jù)流中的checkpoint barrier,調(diào)用對(duì)應(yīng)的barrierHandler
決定是否觸發(fā)checkpoint操作斋否。參見(jiàn)Flink 源碼之分布式快照
梨水。InputGateWithMetrics
負(fù)責(zé)監(jiān)控接收的數(shù)據(jù),統(tǒng)計(jì)所有流入數(shù)據(jù)的總字節(jié)數(shù)茵臭。
經(jīng)歷2層包裝之后疫诽,程序邏輯進(jìn)行到SingleInputGate
的pollNext
方法
SingleInputGate
有pollNext
和getNext
兩個(gè)方法。這兩個(gè)方法基本相同旦委,唯一的區(qū)別是pollNext
為非阻塞方式奇徒,getNext
為阻塞方式。
@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
return getNextBufferOrEvent(false);
}
getNextBufferOrEvent
方法:
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
// 如果接收到所有分區(qū)終止的事件缨硝,則返回空
if (hasReceivedAllEndOfPartitionEvents) {
return Optional.empty();
}
// 如果input gate被關(guān)閉
if (closeFuture.isDone()) {
throw new CancelTaskException("Input gate is already closed.");
}
// 以阻塞方式讀取數(shù)據(jù)
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input));
}
waitAndGetNextData
方法:
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
// 獲取channel摩钙,根據(jù)blocking參數(shù)決定是否是阻塞方式
Optional<InputChannel> inputChannel = getChannel(blocking);
if (!inputChannel.isPresent()) {
return Optional.empty();
}
// Do not query inputChannel under the lock, to avoid potential deadlocks coming from
// notifications.
// 獲取input channel的緩存數(shù)據(jù)
Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();
synchronized (inputChannelsWithData) {
// 能獲取到數(shù)據(jù),并且還有更多數(shù)據(jù)
if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
// channel加入到inputChannelsWithData隊(duì)列中
inputChannelsWithData.add(inputChannel.get());
// 下面這個(gè)BitSet負(fù)責(zé)記錄哪些channel已經(jīng)加入到了inputChannelsWithData隊(duì)列
enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
}
// 如果inputChannelsWithData為空查辩,設(shè)置為不可用狀態(tài)
if (inputChannelsWithData.isEmpty()) {
availabilityHelper.resetUnavailable();
}
// 返回包裝后的結(jié)果
if (result.isPresent()) {
return Optional.of(new InputWithData<>(
inputChannel.get(),
result.get(),
!inputChannelsWithData.isEmpty()));
}
}
}
}
每一個(gè)InputGate
包含一個(gè)或多個(gè)InputChannel
胖笛。其中InputChannel
分為2種。LocalInputChannel
負(fù)責(zé)從本地的SubPartition讀取數(shù)據(jù)宜岛,RemoteInputChannel
負(fù)責(zé)從遠(yuǎn)程(其他節(jié)點(diǎn))的Subpartition讀取數(shù)據(jù)长踊。
LocalInputChannel
的getNextBuffer
方法:
@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
checkError();
// 獲取requestSubpartition方法得到的subpartitionView
ResultSubpartitionView subpartitionView = this.subpartitionView;
// 如果沒(méi)有獲取到subpartitionView,需要再次檢查subpartitionView
// 如果此時(shí)另一線程正在調(diào)用requestSubpartition方法萍倡,checkAndWaitForSubpartitionView方法會(huì)被阻塞
// 等待requestSubpartition執(zhí)行完畢
if (subpartitionView == null) {
// There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local
// channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When
// they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or
// during) it was released during reading the EndOfPartitionEvent (2).
if (isReleased) {
return Optional.empty();
}
// this can happen if the request for the partition was triggered asynchronously
// by the time trigger
// would be good to avoid that, by guaranteeing that the requestPartition() and
// getNextBuffer() always come from the same thread
// we could do that by letting the timer insert a special "requesting channel" into the input gate's queue
subpartitionView = checkAndWaitForSubpartitionView();
}
// 獲取緩存數(shù)據(jù)
BufferAndBacklog next = subpartitionView.getNextBuffer();
if (next == null) {
if (subpartitionView.isReleased()) {
throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
} else {
return Optional.empty();
}
}
// 更新已讀取字節(jié)數(shù)
numBytesIn.inc(next.buffer().getSize());
// 更新以讀取緩存數(shù)
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}
以上是LocalInputChannel
的getNextBuffer
方法身弊。下面我們分析下RemoteInputChannel
的getNextBuffer
方法。該方法和LocalInputChannel
不同的是它從receivedBuffers隊(duì)列中獲取buffer列敲,而不是直接從subpartitionView獲取阱佛。
@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");
checkError();
final Buffer next;
final boolean moreAvailable;
// 從receivedBuffers隊(duì)列中獲取buffer
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
moreAvailable = !receivedBuffers.isEmpty();
}
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
}
接下來(lái)大家會(huì)問(wèn),receivedBuffers
中的緩存數(shù)據(jù)是什么時(shí)候被加入的呢戴而?答案在onBuffer
方法瘫絮。代碼如下:
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
// 是否需要回收此buffer
boolean recycleBuffer = true;
try {
final boolean wasEmpty;
synchronized (receivedBuffers) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers from receivedBuffers
// (see above for details).
// 在releaseAllResources()調(diào)用之后無(wú)法在接收新的buffer
if (isReleased.get()) {
return;
}
// 檢查sequenceNumber
if (expectedSequenceNumber != sequenceNumber) {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
return;
}
// 判斷添加buffer之前的隊(duì)列是否為空
wasEmpty = receivedBuffers.isEmpty();
// 添加緩存數(shù)據(jù)到隊(duì)列中
receivedBuffers.add(buffer);
// 已接收到數(shù)據(jù),緩存不需要回收
recycleBuffer = false;
}
// 增加SequenceNumber
++expectedSequenceNumber;
// 如果添加buffer之前的隊(duì)列為空填硕,需要通知對(duì)應(yīng)的inputGate麦萤,現(xiàn)在已經(jīng)有數(shù)據(jù)了(不為空)
if (wasEmpty) {
notifyChannelNonEmpty();
}
if (backlog >= 0) {
// 負(fù)責(zé)提前分配buffer
onSenderBacklog(backlog);
}
} finally {
// 回收buffer
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
這里我們先看一下如何提前分配buffer的邏輯鹿鳖。代碼如下:
void onSenderBacklog(int backlog) throws IOException {
int numRequestedBuffers = 0;
// 鎖定bufferQueue
synchronized (bufferQueue) {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers (see above for details).
// 避免在releaseAllResources()之后執(zhí)行
if (isReleased.get()) {
return;
}
// backlog為后續(xù)所需的buffer(積壓數(shù)量)
// initialCredit為初始預(yù)留的buffer數(shù)量
numRequiredBuffers = backlog + initialCredit;
// 如果可用buffer數(shù)小于numRequiredBuffers
// 并且不在等待請(qǐng)求浮動(dòng)Buffers的狀態(tài)
// 需要為bufferQueue增加浮動(dòng)buffer
while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
// 申請(qǐng)一個(gè)buffer
Buffer buffer = inputGate.getBufferPool().requestBuffer();
if (buffer != null) {
// 加入浮動(dòng)buffer隊(duì)列
bufferQueue.addFloatingBuffer(buffer);
numRequestedBuffers++;
} else if (inputGate.getBufferProvider().addBufferListener(this)) {
// If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
// 如果請(qǐng)求不到buffer(channel沒(méi)有足夠的buffer)
// 注冊(cè)一個(gè)監(jiān)聽(tīng)器,并且標(biāo)記等待請(qǐng)求浮動(dòng)Buffers的狀態(tài)為true
isWaitingForFloatingBuffers = true;
break;
}
}
}
// 如果本次操作請(qǐng)求的buffer數(shù)量大于0
// unannouncedCredit為未告知上游生產(chǎn)者的credit壮莹,用于數(shù)據(jù)反壓
// 如果unannouncedCredit在增加numRequestedBuffers之前的值為0
// 需要通知上游這里有credit翅帜,可以接收數(shù)據(jù)
if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
notifyCreditAvailable();
}
}
上面分析到如果請(qǐng)求buffer失敗,會(huì)注冊(cè)一個(gè)監(jiān)聽(tīng)器命满。那么當(dāng)監(jiān)聽(tīng)器執(zhí)行到buffer創(chuàng)建成功的時(shí)候執(zhí)行什么方法呢涝滴?我們分析下notifyBufferAvailable
方法。
@Override
public NotificationResult notifyBufferAvailable(Buffer buffer) {
NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
try {
synchronized (bufferQueue) {
// 保證必須在等待浮動(dòng)buffer狀態(tài)
checkState(isWaitingForFloatingBuffers,
"This channel should be waiting for floating buffers.");
// Important: make sure that we never add a buffer after releaseAllResources()
// released all buffers. Following scenarios exist:
// 1) releaseAllResources() already released buffers inside bufferQueue
// -> then isReleased is set correctly
// 2) releaseAllResources() did not yet release buffers from bufferQueue
// -> we may or may not have set isReleased yet but will always wait for the
// lock on bufferQueue to release buffers
// 確保沒(méi)有release胶台,并且可用buffer數(shù)量小于所需的buffer才執(zhí)行后續(xù)流程
if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
return notificationResult;
}
// 添加浮動(dòng)buffer
bufferQueue.addFloatingBuffer(buffer);
// 如果可用buffer數(shù)量和所需buffer數(shù)量一致歼疮,返回不再需要新的buffer
if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
notificationResult = NotificationResult.BUFFER_USED_NO_NEED_MORE;
} else {
// 否則返回仍需新的buffer
notificationResult = NotificationResult.BUFFER_USED_NEED_MORE;
}
}
// 如果unannouncedCredit在加1之前為0,通知上游诈唬,下游可以接收數(shù)據(jù)
if (unannouncedCredit.getAndAdd(1) == 0) {
notifyCreditAvailable();
}
} catch (Throwable t) {
setError(t);
}
return notificationResult;
}
下面我們回到onBuffer
方法韩脏。繼續(xù)跟蹤,我們發(fā)現(xiàn)onBuffer
方法在CreditBasedPartitionRequestClientHandler
的decodeBufferOrEvent
方法中調(diào)用铸磅。這個(gè)方法負(fù)責(zé)處理接收到的數(shù)據(jù)赡矢。數(shù)據(jù)的類(lèi)型可能為buffer或者event。代碼如下:
private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
try {
ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
final int receivedSize = nettyBuffer.readableBytes();
// 如果是buffer
if (bufferOrEvent.isBuffer()) {
// ---- Buffer ------------------------------------------------
// Early return for empty buffers. Otherwise Netty's readBytes() throws an
// IndexOutOfBoundsException.
// 如果收到字節(jié)數(shù)為0阅仔,調(diào)用RemoteInputChannel的onEmptyBuffer方法
if (receivedSize == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
return;
}
// 請(qǐng)求一個(gè)空的buffer
Buffer buffer = inputChannel.requestBuffer();
if (buffer != null) {
// 寫(xiě)入網(wǎng)絡(luò)讀取到的數(shù)據(jù)至buffer中
nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
// 設(shè)置壓縮
buffer.setCompressed(bufferOrEvent.isCompressed);
// 調(diào)用onBuffer處理方法
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (inputChannel.isReleased()) {
// 如果channel已經(jīng)release吹散,調(diào)用取消請(qǐng)求方法
cancelRequestFor(bufferOrEvent.receiverId);
} else {
throw new IllegalStateException("No buffer available in credit-based input channel.");
}
} else {
// 如果是事件(event),創(chuàng)建一個(gè)memSeg 八酒,數(shù)據(jù)為event內(nèi)容
// 再把它包裹進(jìn)networkBuffer對(duì)象空民,通過(guò)onBuffer方法教給RemoteInputChannel處理
// ---- Event -------------------------------------------------
// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
byte[] byteArray = new byte[receivedSize];
nettyBuffer.readBytes(byteArray);
MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
}
} finally {
// 釋放netty的buffer
bufferOrEvent.releaseBuffer();
}
}
繼續(xù)追蹤此方法的調(diào)用鏈到decodeMsg
方法。該方法的部分源代碼如下:
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
// 獲取接收此buffer的input channel
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
// 調(diào)用decodeBufferOrEvent方法
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
// ---- Error ---------------------------------------------------------
// 剩余代碼省略
} else {
throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
}
}
繼續(xù)追蹤羞迷,我們到netty框架的channelRead
方法界轩。代碼如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
到此,整個(gè)數(shù)據(jù)的讀取流程分析完畢闭树。
本博客為作者原創(chuàng)耸棒,歡迎大家參與討論和批評(píng)指正荒澡。如需轉(zhuǎn)載請(qǐng)注明出處报辱。