Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
源代碼分支
release-0.9.0
Hudi 源代碼GitHub地址:apache/hudi: Upserts, Deletes And Incremental Processing on Big Data. (github.com)
HoodieTableFactory
Flink通過SPI機制加載org.apache.flink.table.factories.Factory
接口的實現(xiàn)類片拍。Hudi的hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
文件內(nèi)容如下:
org.apache.hudi.table.HoodieTableFactory
這個類是Flink SQL創(chuàng)建Table Sink和Source的入口類。本篇我們從這個類開始娩践,分析HoodieTableSink
的創(chuàng)建過程陪腌。創(chuàng)建TableSink的入口方法邏輯如下:
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// 獲取create table是否with子句附帶的參數(shù)
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
// 獲取表的物理Schema,意思是不包含計算字段和元數(shù)據(jù)字段
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// 檢查參數(shù)合理性
// 檢查hoodie.datasource.write.recordkey.field和write.precombine.field配置項是否包含在表字段中莫绣,如果不包含則拋出異常
sanityCheck(conf, schema);
// 根據(jù)table定義和主鍵等配置畴蒲,Hudi自動附加一些屬性配置
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
// 返回HoodieTableSink
return new HoodieTableSink(conf, schema);
}
HoodieTableSink
Flink SQL在執(zhí)行過程中最終被解析轉(zhuǎn)換為Flink的TableSink
或者TableSource
。本篇我們關注數(shù)據(jù)寫入Hudi的過程对室。HoodieTableSink
寫入數(shù)據(jù)的邏輯位于getSinkRuntimeProvider
方法模燥。它的內(nèi)容和解析如下所示:
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider) dataStream -> {
// setup configuration
// 獲取checkpoint超時配置
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
// 設置Hudi的instant commit超時時間為Flink的checkpoint超時時間
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// 獲取schema對應每列數(shù)據(jù)類型
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
// bulk_insert mode
// 獲取寫入操作類型咖祭,默認是upsert
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
// 如果寫入操作類型配置的為bulk_insert,進入這個if分支
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
// 創(chuàng)建出批量插入operator工廠類
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
// 獲取分區(qū)字段
final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
if (partitionFields.length > 0) {
// 創(chuàng)建出key生成器蔫骂,用于指定數(shù)據(jù)分組么翰,keyBy算子使用
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// 如果啟用write.bulk_insert.shuffle_by_partition
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
// shuffle by partition keys
// 數(shù)據(jù)流按照分區(qū)字段值進行keyBy操作
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
}
// 如果需要按照分區(qū)排序
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
// 創(chuàng)建一個排序operator
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
// 為datastream增加一個排序操作符
dataStream = dataStream
.transform("partition_key_sorter",
TypeInformation.of(RowData.class),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
}
// 為dataStream加入批量寫入operator并返回
return dataStream
.transform("hoodie_bulk_insert_write",
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
// 對于非批量寫入模式,采用流式寫入
// stream write
int parallelism = dataStream.getExecutionConfig().getParallelism();
// 創(chuàng)建流式寫入operator
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
// 將數(shù)據(jù)從RowData格式轉(zhuǎn)換為HoodieRecord
DataStream<HoodieRecord> dataStream1 = dataStream
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
// bootstrap index
// TODO: This is a very time-consuming operation, will optimization
// 是否啟動時加載索引
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
// 如果啟用辽旋,會在啟動時自動加載索引浩嫌,包裝為IndexRecord發(fā)往下游
dataStream1 = dataStream1.rebalance()
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
// 按照record key分區(qū),然后使用ucketAssignFunction分桶
// 再按照分桶id分區(qū)补胚,使用StreamWriteFunction流式寫入
DataStream<Object> pipeline = dataStream1
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
// compaction
// 如果需要壓縮(表類型為MERGE_ON_READ码耐,并且啟用了異步壓縮)
if (StreamerUtil.needsAsyncCompaction(conf)) {
// 首先在coordinator通知checkpoint完畢的時候生成壓縮計劃
// 然后使用CompactFunction壓縮hudi table數(shù)據(jù)
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
} else {
return pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
};
}
從上面源代碼我們可大致梳理出數(shù)據(jù)入Hudi表的流程:
- 如果配置了批量插入,采用
BulkInsertWriteOperator
批量寫入數(shù)據(jù)溶其。根據(jù)是否需要排序的要求骚腥,決定是否采用SortOperator
。 - 將
RowData
格式的數(shù)據(jù)轉(zhuǎn)換為Hudi專用的HoodieRecord
格式瓶逃。 - 根據(jù)配置需要束铭,確定是否使用
BootstrapFunction
加載索引,此步驟耗時較長厢绝。 - 根據(jù)數(shù)據(jù)的partition分配數(shù)據(jù)的存儲位置(BucketAssignFunction)契沫。
- 將數(shù)據(jù)通過流的方式落地
StreamWriteFunction
。 - 如果是MOR類型表昔汉,且開啟了異步壓縮埠褪,schedule一個壓縮操作(
CompactionPlanOperator
和CompactFunction
)。
批量插入相關
BulkInsertWriteOperator
BulkInsertWriteOperator
使用BulkInsertWriteFunction
進行批量數(shù)據(jù)插入操作挤庇。
BulkInsertWriteFunction
的初始化邏輯位于open
方法中钞速,代碼如下所示:
@Override
public void open(Configuration parameters) throws IOException {
// 獲取批量插入數(shù)據(jù)作業(yè)的taskID
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
// 創(chuàng)建writeClient,它負責創(chuàng)建index嫡秕,提交數(shù)據(jù)和回滾渴语,以及數(shù)據(jù)增刪改查操作
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
// 根據(jù)table類型和寫入操作類型推斷操作類型
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
// 獲取上一個進行中的instant時間戳
this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
// 發(fā)送一個WriteMetadataEvent到coordinator,結束上一批數(shù)據(jù)寫入過程
sendBootstrapEvent();
// 初始化writerHelper昆咽,用于輔助進行數(shù)據(jù)批量插入
initWriterHelper();
}
該function遇到每一個元素驾凶,通過writerHelper
將這個元素寫入到Parquet文件中。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
this.writerHelper.write((RowData) value);
}
每一批數(shù)據(jù)結束后掷酗,會調(diào)用endInput
方法调违。執(zhí)行writeHelper
關閉和通知coordinator批量插入完畢。
public void endInput() {
final List<WriteStatus> writeStatus;
try {
// 關閉writeHelper
this.writerHelper.close();
// 獲取所有HoodieRowDataCreateHandle對應的writeStatus泻轰,每個數(shù)據(jù)寫入的partitionPath對應一個handle
writeStatus = this.writerHelper.getWriteStatuses().stream()
.map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
}
// 發(fā)送本批數(shù)據(jù)已完全寫入的event給coordinator
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
}
SortOperator
SortOperator
用于將一批插入的數(shù)據(jù)排序后再寫入技肩。開啟write.bulk_insert.sort_by_partition
配置項會啟用此特性。
它的初始化邏輯位于open
方法浮声,內(nèi)容和分析如下:
@Override
public void open() throws Exception {
super.open();
LOG.info("Opening SortOperator");
// 獲取用戶代碼classloader
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
// 獲取RowData序列化器
AbstractRowDataSerializer inputSerializer =
(AbstractRowDataSerializer)
getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
// 創(chuàng)建Hudi專用的序列化器虚婿,傳入?yún)?shù)為RowData字段數(shù)
this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
NormalizedKeyComputer computer = gComputer.newInstance(cl);
RecordComparator comparator = gComparator.newInstance(cl);
gComputer = null;
gComparator = null;
// 獲取作業(yè)的內(nèi)存管理器
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
// 使用Flink提供的二進制MergeSort工具對RowData排序
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
inputSerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
// 排序工具包含了排序線程旋奢,合并線程以及溢寫Thread,該方法啟動這些線程
this.sorter.startThreads();
// 創(chuàng)建結果收集器然痊,用于發(fā)送結果到下游
collector = new StreamRecordCollector<>(output);
// register the the metrics.
// 創(chuàng)建監(jiān)控儀表至朗,包含內(nèi)存已用字節(jié)數(shù),溢寫文件數(shù)和溢寫字節(jié)數(shù)
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
SortOperator
每次接收到一個RowData類型數(shù)據(jù)剧浸,都把它放入BinaryExternalSorter
的緩存中锹引。
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
this.sorter.write(element.getValue());
}
當一批數(shù)據(jù)插入過程結束時,SortOperator
將sorter
中以排序的二進制RowData數(shù)據(jù)順序取出唆香,發(fā)往下游粤蝎。
@Override
public void endInput() throws Exception {
BinaryRowData row = binarySerializer.createInstance();
MutableObjectIterator<BinaryRowData> iterator = sorter.getIterator();
while ((row = iterator.next(row)) != null) {
collector.collect(row);
}
}
RowDataToHoodieFunction
負責將RowData映射為HoodieRecord
,轉(zhuǎn)換的邏輯位于toHoodieRecord
方法中袋马。
private HoodieRecord toHoodieRecord(I record) throws Exception {
// 根據(jù)AvroSchema,將RowData數(shù)據(jù)轉(zhuǎn)換為Avro格式
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
// 獲取HoodieKey秸应,它由record key字段值和partitionPath(分區(qū)路徑)共同確定
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// 創(chuàng)建數(shù)據(jù)載體虑凛,該對象包含RowData數(shù)據(jù)
HoodieRecordPayload payload = payloadCreation.createPayload(gr);
// 獲取操作類型,增刪改查
HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
// 構造出HoodieRecord
return new HoodieRecord<>(hoodieKey, payload, operation);
}
BootstrapFunction
通途為加載時候生成索引软啼。該特性通過index.bootstrap.enabled
配置項開啟桑谍。索引在接收到數(shù)據(jù)的時候開始加載,只加載index.partition.regex
配置項正則表達式匹配的partition path對應的索引祸挪。加載完畢之后锣披,該算子將不再進行任何其他操作,直接將數(shù)據(jù)發(fā)往下游贿条。
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
// 標記是否已啟動雹仿,初始值為false
if (!alreadyBootstrap) {
// 獲取hoodie表元數(shù)據(jù)所在路徑
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
// 遍歷表包含的所有partitionPath
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
// pattern為index.partition.regex配置項的值疗锐,決定加載哪些partition的index屹逛,默認全加載
if (pattern.matcher(partitionPath).matches()) {
// 加載分區(qū)索引
loadRecords(partitionPath, out);
}
}
// wait for others bootstrap task send bootstrap complete.
// 等待其他task啟動完畢
waitForBootstrapReady(taskID);
// 標記已啟動完畢
alreadyBootstrap = true;
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
// send the trigger record
// 把數(shù)據(jù)原封不動發(fā)往下游
// 該算子不操作數(shù)據(jù)锭硼,僅僅是通過數(shù)據(jù)觸發(fā)加載索引的操作
out.collect((O) value);
}
loadRecords
方法加載partition的索引敞映。索引是Indexrecord
格式小染,保存了record key钮呀,partition path(兩者合起來為HoodieKey)和所在fileSlice的對應關系删掀。
private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
long start = System.currentTimeMillis();
// 根據(jù)存儲格式患膛,創(chuàng)建對應的格式處理工具凡蚜,目前支持Parquet和Orc
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
// 獲取table對應的avro schema
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
// 獲取并行度人断,最大并行度和taskID
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// 獲取時間線上最后一個已提交的instant
Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
// 如果這個instant存在
if (latestCommitTime.isPresent()) {
// 獲取這個commit時間之前的所有FileSlice
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
.collect(toList());
for (FileSlice fileSlice : fileSlices) {
// 判斷這個fileSlice是否歸本task加載
// 如果不是則跳過
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
}
LOG.info("Load records from {}.", fileSlice);
// load parquet records
// 加載FlieSlice中的數(shù)據(jù)文件
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
// 根據(jù)文件類型,校驗文件是否正常
if (!isValidFile(baseFile.getFileStatus())) {
return;
}
final List<HoodieKey> hoodieKeys;
try {
// 獲取Partition對應的HoodieKey
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}
// 發(fā)送indexRecord(各個HoodieKey和fileSlice的對應關系)到下游朝蜘,這里是列存儲文件的index
for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
}
});
// load avro log records
// 加載所有avro格式log文件的路徑
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
// 掃描log文件恶迈,合并record key相同的數(shù)據(jù)
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
// 遍歷合并后的數(shù)據(jù),遍歷他們的record key
// 發(fā)送IndexRecord到下游谱醇,這里處理的是log文件中數(shù)據(jù)的index
for (String recordKey : scanner.getRecords().keySet()) {
out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
} finally {
scanner.close();
}
}
}
BucketAssignFunction
執(zhí)行數(shù)據(jù)分桶操作蝉绷。為每一條數(shù)據(jù)分配它的存儲位置鸭廷。如果開啟了索引加載(BootstrapFunction
),BucketAssignFunction
會把索引數(shù)據(jù)(IndexRecord
)加載入operator狀態(tài)緩存中熔吗。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
// 如果接收到的是索引數(shù)據(jù)
// 如果啟用的加載索引辆床,上一節(jié)的BootstrapFunction會產(chǎn)生IndexRecord
// 這里需要根據(jù)索引,更新recordKey和儲存位置的對應關系
if (value instanceof IndexRecord) {
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
// 設置operator StateHandler當前處理的key為record key
this.context.setCurrentKey(indexRecord.getRecordKey());
// 更新indexState為索引數(shù)據(jù)對應的位置
// 將IndexRecord攜帶的recordKey和location信息對應存入indexState中
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
// 進入此分支傷命接收到的事HoodieRecord桅狠,開始處理數(shù)據(jù)過程
processRecord((HoodieRecord<?>) value, out);
}
}
數(shù)據(jù)處理過程位于processRecord
方法讼载,邏輯如下所示:
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
// 獲取HoodieKey,分別拿出recordKey和partitionPath
final HoodieKey hoodieKey = record.getKey();
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
// 封裝了HoodieRecord的存儲位置中跌,即這條HoodieRecord對應哪個文件
final HoodieRecordLocation location;
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// 獲取index中保存的location信息
HoodieRecordGlobalLocation oldLoc = indexState.value();
// 如果操作類型為UPSERT咨堤,DELETE或者UPSERT_PREPPED,isChangingRecords為true
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// 如果index的partitionPath和當前HoodieRecord的不同
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
// 由index.global.enabled配置項控制
// 表示一個相同key的record到來但是partitionPath不同漩符,是否需要更新舊的partitionPath
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
// 創(chuàng)建一個刪除元素發(fā)給下游一喘,刪除老的partitionPath信息
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
// 通過BucketAssigner獲取新的存儲位置
location = getNewRecordLocation(partitionPath);
// 更新IndexState為新的partitionPath和location
updateIndexState(partitionPath, location);
} else {
location = oldLoc.toLocal("U");
// 加入更新數(shù)據(jù)的位置信息到bucketAssigner
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
// 如果不是數(shù)據(jù)更新操作
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
}
// always refresh the index
// 確保數(shù)據(jù)更新操作刷新索引(indexState)
if (isChangingRecords) {
updateIndexState(partitionPath, location);
}
// 設置record的存放位置,發(fā)送給下游
record.setCurrentLocation(location);
out.collect((O) record);
}
StreamWriteFunction
用于寫入HoodieRecord到文件系統(tǒng)中嗜暴。
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
bufferRecord((HoodieRecord<?>) value);
}
processElement
又調(diào)用了bufferRecord
方法凸克。在存入數(shù)據(jù)到buffer之前,先檢查是否需要flush bucket和buffer闷沥。先提前判斷如果某條數(shù)據(jù)加入bucket后將超過了bucket大小限制萎战,會flush這個bucket。buffer為多個bucket的最大占用內(nèi)存數(shù)量總和舆逃,如果buffer空閑容量耗盡蚂维,Hudi挑一個當前數(shù)據(jù)寫入最多的bucket執(zhí)行flush。代碼如下所示:
private void bufferRecord(HoodieRecord<?> value) {
// 根據(jù)HoodieRecord的partitionPath和fileId構建出bucketID
final String bucketID = getBucketID(value);
// 根據(jù)bucketID緩存了一組DataBucket路狮,保存在buckets變量
// 如果bucketID對應的DataBucket不存在虫啥,這里創(chuàng)建一個新的并放入buckets中
// bucket batch大小設置為write.batch.size
// partitionPath和fileID與HoodieRecord一致
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
// 將HoodieRecord轉(zhuǎn)換為DataItem
// DataItem為數(shù)據(jù)保存在buffer中的格式,在flush之前DataItem會再轉(zhuǎn)換回HoodieRecord
final DataItem item = DataItem.fromHoodieRecord(value);
// buffer中已存元素大小加上當前dataitem是否大于batch size奄妨,如果大于需要flush
boolean flushBucket = bucket.detector.detect(item);
// 檢查buffer size是否超過最大緩存容量
// 最大緩存容量為write.task.max.size - 100MB - write.merge.max_memory
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
// 如果需要flushBucket
if (flushBucket) {
// 如果bucket數(shù)據(jù)被writeClient成功寫入
if (flushBucket(bucket)) {
// tracer持有的緩存使用量減掉bucket容量
this.tracer.countDown(bucket.detector.totalSize);
// 清空bucket
bucket.reset();
}
} else if (flushBuffer) {
// 如果需要清空buffer孝鹊,找到大小最大的bucket然后flush它
// find the max size bucket and flush it out
// 找到所有的bucket,按照totalSize從大到小排序
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
// 取出第一個bucket展蒂,即totalSize最大的bucket
final DataBucket bucketToFlush = sortedBuckets.get(0);
// flush這個bucket
if (flushBucket(bucketToFlush)) {
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
} else {
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
}
}
// 將record加入bucket中
bucket.records.add(item);
}
CompactionPlanOperator
如果符合數(shù)據(jù)壓縮的條件(Merge on Read表又活,并且啟用異步壓縮),CompactionPlanOperator
將會生成數(shù)據(jù)壓縮計劃锰悼。CompactionPlanOperator
不處理數(shù)據(jù)柳骄,只在checkpoint完成之后,schedule一個compact操作箕般。
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
// 獲取Hoodie表
HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
// 回滾之前沒進行完的壓縮操作
CompactionUtil.rollbackCompaction(hoodieTable, writeClient, conf);
// schedule一個新的壓縮操作
scheduleCompaction(hoodieTable, checkpointId);
} catch (Throwable throwable) {
// make it fail safe
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
}
}
scheduleCompaction
方法:
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// the last instant takes the highest priority.
// 獲取最近一個活躍的可被壓縮的instant
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
if (!lastRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}
// 獲取這個instant的時間
String compactionInstantTime = lastRequested.get().getTimestamp();
// 如果當前正在壓縮的instant時間和最近一個活躍的可被壓縮的instant時間相同
// 說明schedule的compact操作重復了
if (this.compactionInstantTime != null
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
// do nothing
LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
return;
}
// generate compaction plan
// should support configurable commit metadata
// 創(chuàng)建HoodieCompactionPlan
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId + " and instant " + compactionInstantTime);
} else {
this.compactionInstantTime = compactionInstantTime;
// 獲取要壓縮的instant
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
// 標記該instant狀態(tài)為inflight(正在處理)
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
// 創(chuàng)建壓縮操作
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanOperator compacting " + operations + " files");
// 逐個發(fā)送壓縮操作到下游
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}
}
}
CompactFunction
接前一步生成的壓縮計劃耐薯,執(zhí)行數(shù)據(jù)壓縮過程。
@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
// 獲取要壓縮的instant
final String instantTime = event.getCompactionInstantTime();
// 獲取壓縮操作
final CompactionOperation compactionOperation = event.getOperation();
// 如果是異步壓縮,通過線程池執(zhí)行doCompaction方法
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector);
}
}
doCompaction
方法:
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
// 通過FlinkCompactHelpers執(zhí)行數(shù)據(jù)壓縮操作
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation);
// 收集數(shù)據(jù)壓縮結果到下游
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}
到此為止曲初,F(xiàn)link寫入Hudi表的流程已分析完畢体谒。
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正臼婆。如需轉(zhuǎn)載請注明出處抒痒。