Flink Hudi 源碼之HoodieTableSink

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

源代碼分支

release-0.9.0

Hudi 源代碼GitHub地址:apache/hudi: Upserts, Deletes And Incremental Processing on Big Data. (github.com)

HoodieTableFactory

Flink通過SPI機制加載org.apache.flink.table.factories.Factory接口的實現(xiàn)類片拍。Hudi的hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory文件內(nèi)容如下:

org.apache.hudi.table.HoodieTableFactory

這個類是Flink SQL創(chuàng)建Table Sink和Source的入口類。本篇我們從這個類開始娩践,分析HoodieTableSink的創(chuàng)建過程陪腌。創(chuàng)建TableSink的入口方法邏輯如下:

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
    // 獲取create table是否with子句附帶的參數(shù)
    Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
    // 獲取表的物理Schema,意思是不包含計算字段和元數(shù)據(jù)字段
    TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
    // 檢查參數(shù)合理性
    // 檢查hoodie.datasource.write.recordkey.field和write.precombine.field配置項是否包含在表字段中莫绣,如果不包含則拋出異常
    sanityCheck(conf, schema);
    // 根據(jù)table定義和主鍵等配置畴蒲,Hudi自動附加一些屬性配置
    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
    // 返回HoodieTableSink
    return new HoodieTableSink(conf, schema);
}

HoodieTableSink

Flink SQL在執(zhí)行過程中最終被解析轉(zhuǎn)換為Flink的TableSink或者TableSource。本篇我們關注數(shù)據(jù)寫入Hudi的過程对室。HoodieTableSink寫入數(shù)據(jù)的邏輯位于getSinkRuntimeProvider方法模燥。它的內(nèi)容和解析如下所示:

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProvider) dataStream -> {

        // setup configuration
        // 獲取checkpoint超時配置
        long ckpTimeout = dataStream.getExecutionEnvironment()
            .getCheckpointConfig().getCheckpointTimeout();
        // 設置Hudi的instant commit超時時間為Flink的checkpoint超時時間
        conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

        // 獲取schema對應每列數(shù)據(jù)類型
        RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();

        // bulk_insert mode
        // 獲取寫入操作類型咖祭,默認是upsert
        final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
        // 如果寫入操作類型配置的為bulk_insert,進入這個if分支
        if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
            // 創(chuàng)建出批量插入operator工廠類
            BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
           // 獲取分區(qū)字段
            final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
            if (partitionFields.length > 0) {
                // 創(chuàng)建出key生成器蔫骂,用于指定數(shù)據(jù)分組么翰,keyBy算子使用
                RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
                // 如果啟用write.bulk_insert.shuffle_by_partition
                if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {

                    // shuffle by partition keys
                    // 數(shù)據(jù)流按照分區(qū)字段值進行keyBy操作
                    dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
                }
                // 如果需要按照分區(qū)排序
                if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
                    // 創(chuàng)建一個排序operator
                    SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
                    // sort by partition keys
                    // 為datastream增加一個排序操作符
                    dataStream = dataStream
                        .transform("partition_key_sorter",
                                   TypeInformation.of(RowData.class),
                                   sortOperatorGen.createSortOperator())
                        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
                    ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
                                                             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
                }
            }
            // 為dataStream加入批量寫入operator并返回
            return dataStream
                .transform("hoodie_bulk_insert_write",
                           TypeInformation.of(Object.class),
                           operatorFactory)
                // follow the parallelism of upstream operators to avoid shuffle
                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
                .addSink(new CleanFunction<>(conf))
                .setParallelism(1)
                .name("clean_commits");
        }
        // 對于非批量寫入模式,采用流式寫入
        // stream write
        int parallelism = dataStream.getExecutionConfig().getParallelism();
        // 創(chuàng)建流式寫入operator
        StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);

        // 將數(shù)據(jù)從RowData格式轉(zhuǎn)換為HoodieRecord
        DataStream<HoodieRecord> dataStream1 = dataStream
            .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));

        // bootstrap index
        // TODO: This is a very time-consuming operation, will optimization
        // 是否啟動時加載索引
        if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
            // 如果啟用辽旋,會在啟動時自動加載索引浩嫌,包裝為IndexRecord發(fā)往下游
            dataStream1 = dataStream1.rebalance()
                .transform(
                "index_bootstrap",
                TypeInformation.of(HoodieRecord.class),
                new ProcessOperator<>(new BootstrapFunction<>(conf)))
                .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
                .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
        }

        // 按照record key分區(qū),然后使用ucketAssignFunction分桶
        // 再按照分桶id分區(qū)补胚,使用StreamWriteFunction流式寫入
        DataStream<Object> pipeline = dataStream1
            // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
            .keyBy(HoodieRecord::getRecordKey)
            .transform(
            "bucket_assigner",
            TypeInformation.of(HoodieRecord.class),
            new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
            .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
            .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
            // shuffle by fileId(bucket id)
            .keyBy(record -> record.getCurrentLocation().getFileId())
            .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
            .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
        // compaction
        // 如果需要壓縮(表類型為MERGE_ON_READ码耐,并且啟用了異步壓縮)
        if (StreamerUtil.needsAsyncCompaction(conf)) {
            // 首先在coordinator通知checkpoint完畢的時候生成壓縮計劃
            // 然后使用CompactFunction壓縮hudi table數(shù)據(jù)
            return pipeline.transform("compact_plan_generate",
                                      TypeInformation.of(CompactionPlanEvent.class),
                                      new CompactionPlanOperator(conf))
                .setParallelism(1) // plan generate must be singleton
                .rebalance()
                .transform("compact_task",
                           TypeInformation.of(CompactionCommitEvent.class),
                           new ProcessOperator<>(new CompactFunction(conf)))
                .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
                .addSink(new CompactionCommitSink(conf))
                .name("compact_commit")
                .setParallelism(1); // compaction commit should be singleton
        } else {
            return pipeline.addSink(new CleanFunction<>(conf))
                .setParallelism(1)
                .name("clean_commits");
        }
    };
}

從上面源代碼我們可大致梳理出數(shù)據(jù)入Hudi表的流程:

  1. 如果配置了批量插入,采用BulkInsertWriteOperator批量寫入數(shù)據(jù)溶其。根據(jù)是否需要排序的要求骚腥,決定是否采用SortOperator
  2. RowData格式的數(shù)據(jù)轉(zhuǎn)換為Hudi專用的HoodieRecord格式瓶逃。
  3. 根據(jù)配置需要束铭,確定是否使用BootstrapFunction加載索引,此步驟耗時較長厢绝。
  4. 根據(jù)數(shù)據(jù)的partition分配數(shù)據(jù)的存儲位置(BucketAssignFunction)契沫。
  5. 將數(shù)據(jù)通過流的方式落地StreamWriteFunction
  6. 如果是MOR類型表昔汉,且開啟了異步壓縮埠褪,schedule一個壓縮操作(CompactionPlanOperatorCompactFunction)。

批量插入相關

BulkInsertWriteOperator

BulkInsertWriteOperator使用BulkInsertWriteFunction進行批量數(shù)據(jù)插入操作挤庇。

BulkInsertWriteFunction的初始化邏輯位于open方法中钞速,代碼如下所示:

@Override
public void open(Configuration parameters) throws IOException {
    // 獲取批量插入數(shù)據(jù)作業(yè)的taskID
    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
    // 創(chuàng)建writeClient,它負責創(chuàng)建index嫡秕,提交數(shù)據(jù)和回滾渴语,以及數(shù)據(jù)增刪改查操作
    this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
    // 根據(jù)table類型和寫入操作類型推斷操作類型
    this.actionType = CommitUtils.getCommitActionType(
        WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
        HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

    // 獲取上一個進行中的instant時間戳
    this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
    // 發(fā)送一個WriteMetadataEvent到coordinator,結束上一批數(shù)據(jù)寫入過程
    sendBootstrapEvent();
    // 初始化writerHelper昆咽,用于輔助進行數(shù)據(jù)批量插入
    initWriterHelper();
}

該function遇到每一個元素驾凶,通過writerHelper將這個元素寫入到Parquet文件中。

@Override
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
    this.writerHelper.write((RowData) value);
}

每一批數(shù)據(jù)結束后掷酗,會調(diào)用endInput方法调违。執(zhí)行writeHelper關閉和通知coordinator批量插入完畢。

public void endInput() {
    final List<WriteStatus> writeStatus;
    try {
        // 關閉writeHelper
        this.writerHelper.close();
        // 獲取所有HoodieRowDataCreateHandle對應的writeStatus泻轰,每個數(shù)據(jù)寫入的partitionPath對應一個handle
        writeStatus = this.writerHelper.getWriteStatuses().stream()
            .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
    } catch (IOException e) {
        throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
    }
    // 發(fā)送本批數(shù)據(jù)已完全寫入的event給coordinator
    final WriteMetadataEvent event = WriteMetadataEvent.builder()
        .taskID(taskID)
        .instantTime(this.writerHelper.getInstantTime())
        .writeStatus(writeStatus)
        .lastBatch(true)
        .endInput(true)
        .build();
    this.eventGateway.sendEventToCoordinator(event);
}

SortOperator

SortOperator用于將一批插入的數(shù)據(jù)排序后再寫入技肩。開啟write.bulk_insert.sort_by_partition配置項會啟用此特性。

它的初始化邏輯位于open方法浮声,內(nèi)容和分析如下:

@Override
public void open() throws Exception {
    super.open();
    LOG.info("Opening SortOperator");

    // 獲取用戶代碼classloader
    ClassLoader cl = getContainingTask().getUserCodeClassLoader();

    // 獲取RowData序列化器
    AbstractRowDataSerializer inputSerializer =
        (AbstractRowDataSerializer)
        getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
    // 創(chuàng)建Hudi專用的序列化器虚婿,傳入?yún)?shù)為RowData字段數(shù)
    this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());

    NormalizedKeyComputer computer = gComputer.newInstance(cl);
    RecordComparator comparator = gComparator.newInstance(cl);
    gComputer = null;
    gComparator = null;

    // 獲取作業(yè)的內(nèi)存管理器
    MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
    // 使用Flink提供的二進制MergeSort工具對RowData排序
    this.sorter =
        new BinaryExternalSorter(
        this.getContainingTask(),
        memManager,
        computeMemorySize(),
        this.getContainingTask().getEnvironment().getIOManager(),
        inputSerializer,
        binarySerializer,
        computer,
        comparator,
        getContainingTask().getJobConfiguration());
    // 排序工具包含了排序線程旋奢,合并線程以及溢寫Thread,該方法啟動這些線程
    this.sorter.startThreads();

    // 創(chuàng)建結果收集器然痊,用于發(fā)送結果到下游
    collector = new StreamRecordCollector<>(output);

    // register the the metrics.
    // 創(chuàng)建監(jiān)控儀表至朗,包含內(nèi)存已用字節(jié)數(shù),溢寫文件數(shù)和溢寫字節(jié)數(shù)
    getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
    getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
    getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}

SortOperator每次接收到一個RowData類型數(shù)據(jù)剧浸,都把它放入BinaryExternalSorter的緩存中锹引。

@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
    this.sorter.write(element.getValue());
}

當一批數(shù)據(jù)插入過程結束時,SortOperatorsorter中以排序的二進制RowData數(shù)據(jù)順序取出唆香,發(fā)往下游粤蝎。

@Override
public void endInput() throws Exception {
    BinaryRowData row = binarySerializer.createInstance();
    MutableObjectIterator<BinaryRowData> iterator = sorter.getIterator();
    while ((row = iterator.next(row)) != null) {
        collector.collect(row);
    }
}

RowDataToHoodieFunction

負責將RowData映射為HoodieRecord,轉(zhuǎn)換的邏輯位于toHoodieRecord方法中袋马。

private HoodieRecord toHoodieRecord(I record) throws Exception {
    // 根據(jù)AvroSchema,將RowData數(shù)據(jù)轉(zhuǎn)換為Avro格式
    GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
    // 獲取HoodieKey秸应,它由record key字段值和partitionPath(分區(qū)路徑)共同確定
    final HoodieKey hoodieKey = keyGenerator.getKey(gr);

    // 創(chuàng)建數(shù)據(jù)載體虑凛,該對象包含RowData數(shù)據(jù)
    HoodieRecordPayload payload = payloadCreation.createPayload(gr);
    // 獲取操作類型,增刪改查
    HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
    // 構造出HoodieRecord
    return new HoodieRecord<>(hoodieKey, payload, operation);
}

BootstrapFunction

通途為加載時候生成索引软啼。該特性通過index.bootstrap.enabled配置項開啟桑谍。索引在接收到數(shù)據(jù)的時候開始加載,只加載index.partition.regex配置項正則表達式匹配的partition path對應的索引祸挪。加載完畢之后锣披,該算子將不再進行任何其他操作,直接將數(shù)據(jù)發(fā)往下游贿条。

public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
    // 標記是否已啟動雹仿,初始值為false
    if (!alreadyBootstrap) {
        // 獲取hoodie表元數(shù)據(jù)所在路徑
        String basePath = hoodieTable.getMetaClient().getBasePath();
        int taskID = getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
        // 遍歷表包含的所有partitionPath
        for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
            // pattern為index.partition.regex配置項的值疗锐,決定加載哪些partition的index屹逛,默認全加載
            if (pattern.matcher(partitionPath).matches()) {
                // 加載分區(qū)索引
                loadRecords(partitionPath, out);
            }
        }

        // wait for others bootstrap task send bootstrap complete.
        // 等待其他task啟動完畢
        waitForBootstrapReady(taskID);

        // 標記已啟動完畢
        alreadyBootstrap = true;
        LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
    }

    // send the trigger record
    // 把數(shù)據(jù)原封不動發(fā)往下游
    // 該算子不操作數(shù)據(jù)锭硼,僅僅是通過數(shù)據(jù)觸發(fā)加載索引的操作
    out.collect((O) value);
}

loadRecords方法加載partition的索引敞映。索引是Indexrecord格式小染,保存了record key钮呀,partition path(兩者合起來為HoodieKey)和所在fileSlice的對應關系删掀。

private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
    long start = System.currentTimeMillis();

    // 根據(jù)存儲格式患膛,創(chuàng)建對應的格式處理工具凡蚜,目前支持Parquet和Orc
    BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
    // 獲取table對應的avro schema
    Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();

    // 獲取并行度人断,最大并行度和taskID
    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    final int taskID = getRuntimeContext().getIndexOfThisSubtask();

    // 獲取時間線上最后一個已提交的instant
    Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
        .filterCompletedInstants().lastInstant();

    // 如果這個instant存在
    if (latestCommitTime.isPresent()) {
        // 獲取這個commit時間之前的所有FileSlice
        List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
            .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
            .collect(toList());

        for (FileSlice fileSlice : fileSlices) {
            // 判斷這個fileSlice是否歸本task加載
            // 如果不是則跳過
            if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
                continue;
            }
            LOG.info("Load records from {}.", fileSlice);

            // load parquet records
            // 加載FlieSlice中的數(shù)據(jù)文件
            fileSlice.getBaseFile().ifPresent(baseFile -> {
                // filter out crushed files
                // 根據(jù)文件類型,校驗文件是否正常
                if (!isValidFile(baseFile.getFileStatus())) {
                    return;
                }

                final List<HoodieKey> hoodieKeys;
                try {
                    // 獲取Partition對應的HoodieKey
                    hoodieKeys =
                        fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
                } catch (Exception e) {
                    throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
                }

                // 發(fā)送indexRecord(各個HoodieKey和fileSlice的對應關系)到下游朝蜘,這里是列存儲文件的index
                for (HoodieKey hoodieKey : hoodieKeys) {
                    out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
                }
            });

            // load avro log records
            // 加載所有avro格式log文件的路徑
            List<String> logPaths = fileSlice.getLogFiles()
                // filter out crushed files
                .filter(logFile -> isValidFile(logFile.getFileStatus()))
                .map(logFile -> logFile.getPath().toString())
                .collect(toList());
            // 掃描log文件恶迈,合并record key相同的數(shù)據(jù)
            HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
                                                                          writeConfig, hadoopConf);

            try {
                // 遍歷合并后的數(shù)據(jù),遍歷他們的record key
                // 發(fā)送IndexRecord到下游谱醇,這里處理的是log文件中數(shù)據(jù)的index
                for (String recordKey : scanner.getRecords().keySet()) {
                    out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
                }
            } catch (Exception e) {
                throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
            } finally {
                scanner.close();
            }
        }
    }

BucketAssignFunction

執(zhí)行數(shù)據(jù)分桶操作蝉绷。為每一條數(shù)據(jù)分配它的存儲位置鸭廷。如果開啟了索引加載(BootstrapFunction),BucketAssignFunction會把索引數(shù)據(jù)(IndexRecord)加載入operator狀態(tài)緩存中熔吗。

@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
    // 如果接收到的是索引數(shù)據(jù)
    // 如果啟用的加載索引辆床,上一節(jié)的BootstrapFunction會產(chǎn)生IndexRecord
    // 這里需要根據(jù)索引,更新recordKey和儲存位置的對應關系
    if (value instanceof IndexRecord) {
        IndexRecord<?> indexRecord = (IndexRecord<?>) value;
        // 設置operator StateHandler當前處理的key為record key
        this.context.setCurrentKey(indexRecord.getRecordKey());
        // 更新indexState為索引數(shù)據(jù)對應的位置
        // 將IndexRecord攜帶的recordKey和location信息對應存入indexState中
        this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
    } else {
        // 進入此分支傷命接收到的事HoodieRecord桅狠,開始處理數(shù)據(jù)過程
        processRecord((HoodieRecord<?>) value, out);
    }
}

數(shù)據(jù)處理過程位于processRecord方法讼载,邏輯如下所示:

private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
    // 1. put the record into the BucketAssigner;
    // 2. look up the state for location, if the record has a location, just send it out;
    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
    
    // 獲取HoodieKey,分別拿出recordKey和partitionPath
    final HoodieKey hoodieKey = record.getKey();
    final String recordKey = hoodieKey.getRecordKey();
    final String partitionPath = hoodieKey.getPartitionPath();
    // 封裝了HoodieRecord的存儲位置中跌,即這條HoodieRecord對應哪個文件
    final HoodieRecordLocation location;

    // Only changing records need looking up the index for the location,
    // append only records are always recognized as INSERT.
    // 獲取index中保存的location信息
    HoodieRecordGlobalLocation oldLoc = indexState.value();
    // 如果操作類型為UPSERT咨堤,DELETE或者UPSERT_PREPPED,isChangingRecords為true
    if (isChangingRecords && oldLoc != null) {
        // Set up the instant time as "U" to mark the bucket as an update bucket.
        // 如果index的partitionPath和當前HoodieRecord的不同
        if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
            // 由index.global.enabled配置項控制
            // 表示一個相同key的record到來但是partitionPath不同漩符,是否需要更新舊的partitionPath
            if (globalIndex) {
                // if partition path changes, emit a delete record for old partition path,
                // then update the index state using location with new partition path.
                // 創(chuàng)建一個刪除元素發(fā)給下游一喘,刪除老的partitionPath信息
                HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
                                                                  payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
                deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
                deleteRecord.seal();
                out.collect((O) deleteRecord);
            }
            // 通過BucketAssigner獲取新的存儲位置
            location = getNewRecordLocation(partitionPath);
            // 更新IndexState為新的partitionPath和location
            updateIndexState(partitionPath, location);
        } else {
            location = oldLoc.toLocal("U");
            // 加入更新數(shù)據(jù)的位置信息到bucketAssigner
            this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
        }
    } else {
        // 如果不是數(shù)據(jù)更新操作
        location = getNewRecordLocation(partitionPath);
        this.context.setCurrentKey(recordKey);
    }
    // always refresh the index
    // 確保數(shù)據(jù)更新操作刷新索引(indexState)
    if (isChangingRecords) {
        updateIndexState(partitionPath, location);
    }
    // 設置record的存放位置,發(fā)送給下游
    record.setCurrentLocation(location);
    out.collect((O) record);
}

StreamWriteFunction

用于寫入HoodieRecord到文件系統(tǒng)中嗜暴。

@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
    bufferRecord((HoodieRecord<?>) value);
}

processElement又調(diào)用了bufferRecord方法凸克。在存入數(shù)據(jù)到buffer之前,先檢查是否需要flush bucket和buffer闷沥。先提前判斷如果某條數(shù)據(jù)加入bucket后將超過了bucket大小限制萎战,會flush這個bucket。buffer為多個bucket的最大占用內(nèi)存數(shù)量總和舆逃,如果buffer空閑容量耗盡蚂维,Hudi挑一個當前數(shù)據(jù)寫入最多的bucket執(zhí)行flush。代碼如下所示:

private void bufferRecord(HoodieRecord<?> value) {
    // 根據(jù)HoodieRecord的partitionPath和fileId構建出bucketID
    final String bucketID = getBucketID(value);

    // 根據(jù)bucketID緩存了一組DataBucket路狮,保存在buckets變量
    // 如果bucketID對應的DataBucket不存在虫啥,這里創(chuàng)建一個新的并放入buckets中
    // bucket batch大小設置為write.batch.size
    // partitionPath和fileID與HoodieRecord一致
    DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
                                                     k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
    // 將HoodieRecord轉(zhuǎn)換為DataItem
    // DataItem為數(shù)據(jù)保存在buffer中的格式,在flush之前DataItem會再轉(zhuǎn)換回HoodieRecord
    final DataItem item = DataItem.fromHoodieRecord(value);

    // buffer中已存元素大小加上當前dataitem是否大于batch size奄妨,如果大于需要flush
    boolean flushBucket = bucket.detector.detect(item);
    // 檢查buffer size是否超過最大緩存容量
    // 最大緩存容量為write.task.max.size - 100MB - write.merge.max_memory
    boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
    // 如果需要flushBucket
    if (flushBucket) {
        // 如果bucket數(shù)據(jù)被writeClient成功寫入
        if (flushBucket(bucket)) {
            // tracer持有的緩存使用量減掉bucket容量
            this.tracer.countDown(bucket.detector.totalSize);
            // 清空bucket
            bucket.reset();
        }
    } else if (flushBuffer) {
        // 如果需要清空buffer孝鹊,找到大小最大的bucket然后flush它
        // find the max size bucket and flush it out
        // 找到所有的bucket,按照totalSize從大到小排序
        List<DataBucket> sortedBuckets = this.buckets.values().stream()
            .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
            .collect(Collectors.toList());
        // 取出第一個bucket展蒂,即totalSize最大的bucket
        final DataBucket bucketToFlush = sortedBuckets.get(0);
        // flush這個bucket
        if (flushBucket(bucketToFlush)) {
            this.tracer.countDown(bucketToFlush.detector.totalSize);
            bucketToFlush.reset();
        } else {
            LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
        }
    }
    // 將record加入bucket中
    bucket.records.add(item);
}

CompactionPlanOperator

如果符合數(shù)據(jù)壓縮的條件(Merge on Read表又活,并且啟用異步壓縮),CompactionPlanOperator將會生成數(shù)據(jù)壓縮計劃锰悼。CompactionPlanOperator不處理數(shù)據(jù)柳骄,只在checkpoint完成之后,schedule一個compact操作箕般。

@Override
public void notifyCheckpointComplete(long checkpointId) {
    try {
        // 獲取Hoodie表
        HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
        // 回滾之前沒進行完的壓縮操作
        CompactionUtil.rollbackCompaction(hoodieTable, writeClient, conf);
        // schedule一個新的壓縮操作
        scheduleCompaction(hoodieTable, checkpointId);
    } catch (Throwable throwable) {
        // make it fail safe
        LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
    }
}

scheduleCompaction方法:

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
    // the last instant takes the highest priority.
    // 獲取最近一個活躍的可被壓縮的instant
    Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
        .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
    if (!lastRequested.isPresent()) {
        // do nothing.
        LOG.info("No compaction plan for checkpoint " + checkpointId);
        return;
    }

    // 獲取這個instant的時間
    String compactionInstantTime = lastRequested.get().getTimestamp();
    // 如果當前正在壓縮的instant時間和最近一個活躍的可被壓縮的instant時間相同
    // 說明schedule的compact操作重復了
    if (this.compactionInstantTime != null
        && Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
        // do nothing
        LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
        return;
    }

    // generate compaction plan
    // should support configurable commit metadata
    // 創(chuàng)建HoodieCompactionPlan
    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
        table.getMetaClient(), compactionInstantTime);

    if (compactionPlan == null || (compactionPlan.getOperations() == null)
        || (compactionPlan.getOperations().isEmpty())) {
        // do nothing.
        LOG.info("No compaction plan for checkpoint " + checkpointId + " and instant " + compactionInstantTime);
    } else {
        this.compactionInstantTime = compactionInstantTime;
        // 獲取要壓縮的instant
        HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
        // Mark instant as compaction inflight
        // 標記該instant狀態(tài)為inflight(正在處理)
        table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
        table.getMetaClient().reloadActiveTimeline();

        // 創(chuàng)建壓縮操作
        List<CompactionOperation> operations = compactionPlan.getOperations().stream()
            .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
        LOG.info("CompactionPlanOperator compacting " + operations + " files");
        // 逐個發(fā)送壓縮操作到下游
        for (CompactionOperation operation : operations) {
            output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
        }
    }
}

CompactFunction

接前一步生成的壓縮計劃耐薯,執(zhí)行數(shù)據(jù)壓縮過程。

@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
    // 獲取要壓縮的instant
    final String instantTime = event.getCompactionInstantTime();
    // 獲取壓縮操作
    final CompactionOperation compactionOperation = event.getOperation();
    // 如果是異步壓縮,通過線程池執(zhí)行doCompaction方法
    if (asyncCompaction) {
        // executes the compaction task asynchronously to not block the checkpoint barrier propagate.
        executor.execute(
            () -> doCompaction(instantTime, compactionOperation, collector),
            "Execute compaction for instant %s from task %d", instantTime, taskID);
    } else {
        // executes the compaction task synchronously for batch mode.
        LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
        doCompaction(instantTime, compactionOperation, collector);
    }
}

doCompaction方法:

private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
    // 通過FlinkCompactHelpers執(zhí)行數(shù)據(jù)壓縮操作
    List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation);
    // 收集數(shù)據(jù)壓縮結果到下游
    collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}

到此為止曲初,F(xiàn)link寫入Hudi表的流程已分析完畢体谒。

本博客為作者原創(chuàng),歡迎大家參與討論和批評指正臼婆。如需轉(zhuǎn)載請注明出處抒痒。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市颁褂,隨后出現(xiàn)的幾起案子故响,更是在濱河造成了極大的恐慌,老刑警劉巖颁独,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彩届,死亡現(xiàn)場離奇詭異,居然都是意外死亡誓酒,警方通過查閱死者的電腦和手機樟蠕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來靠柑,“玉大人寨辩,你說我怎么就攤上這事〔⊥” “怎么了?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵骄瓣,是天一觀的道長停巷。 經(jīng)常有香客問我,道長榕栏,這世上最難降的妖魔是什么畔勤? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮扒磁,結果婚禮上庆揪,老公的妹妹穿的比我還像新娘。我一直安慰自己妨托,他們只是感情好缸榛,可當我...
    茶點故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著兰伤,像睡著了一般内颗。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敦腔,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天均澳,我揣著相機與錄音,去河邊找鬼。 笑死找前,一個胖子當著我的面吹牛糟袁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播躺盛,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼项戴,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了颗品?” 一聲冷哼從身側響起肯尺,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎躯枢,沒想到半個月后则吟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡锄蹂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年氓仲,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片得糜。...
    茶點故事閱讀 40,015評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡敬扛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出朝抖,到底是詐尸還是另有隱情啥箭,我是刑警寧澤,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布治宣,位于F島的核電站急侥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏侮邀。R本人自食惡果不足惜坏怪,卻給世界環(huán)境...
    茶點故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绊茧。 院中可真熱鬧铝宵,春花似錦、人聲如沸华畏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽亡笑。三九已至拼岳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間况芒,已是汗流浹背惜纸。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工叶撒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人耐版。 一個月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓祠够,卻偏偏與公主長得像,于是被迫代替她去往敵國和親粪牲。 傳聞我的和親對象是個殘疾皇子古瓤,可洞房花燭夜當晚...
    茶點故事閱讀 44,969評論 2 355

推薦閱讀更多精彩內(nèi)容