Flink Streaming File Sink

背景

Flink 支持將流數(shù)據(jù)以文件形式存儲(chǔ)到外部系統(tǒng)聘裁,典型使用場景是將數(shù)據(jù)寫入Hive表所在 HDFS存儲(chǔ)路徑,通過Hive 做查詢分析肠牲。隨著Flink文件寫入被業(yè)務(wù)廣泛使用土辩,暴露出很多問題遭赂,因此需要了解 Flink Streaming File sink 的實(shí)現(xiàn)邏輯。

案例

從Kafka消費(fèi)JSON數(shù)據(jù)臭增,轉(zhuǎn)換為 UserInfo 實(shí)體類數(shù)據(jù)流懂酱,最終以Parquet 格式寫入Hive表對應(yīng)的HDFS路徑。使用 Flink 1.12.1誊抛,Hadoop 2.8.0列牺, hive 2.3.8。

-----------------------------
- hive 建表語句
-----------------------------

create table userinfo(
    userid int,
    username string
) stored as parquet;

-----------------------------
- java 實(shí)體類
-----------------------------
public class UserInfo {
    private int userId;
    private String userName;

    public int getUserId() {
        return userId;
    }

    public void setUserId(int userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }


    @Override
    public String toString() {
        return "UserInfo{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                '}';
    }
}

-----------------------------
- Flink 文件寫入程序
-----------------------------
public class Kafka2Parquet {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test_001");

        env.setParallelism(1);
        env.enableCheckpointing(30000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
        env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/user/todd/checkpoint"));

        FlinkKafkaConsumer<String> dataStream = new FlinkKafkaConsumer("mqTest02", new SimpleStringSchema(), props);
        dataStream.setStartFromLatest();

        DataStream<UserInfo> userInfoDataStream = env.addSource(dataStream)
                .map(value -> JsonUtils.parseJson(value, UserInfo.class));
        
        // 1. 設(shè)置BulkFormat Builder  2.使用 CheckpointRollingPolicy
        StreamingFileSink<UserInfo> parquetSink = StreamingFileSink
                .forBulkFormat(new Path("hdfs://localhost:9000/user/hive/warehouse/userinfo"),  ParquetAvroWriters.forReflectRecord(UserInfo.class))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();

        userInfoDataStream.addSink(parquetSink);

        env.execute();
    }
}

-----------------------------
- 生成的parquet 文件名稱及路徑
-----------------------------
inprogress 臨時(shí)文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108

part 最終文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/part-0-0


核心類

  1. BulkWriter:用于不同格式的數(shù)據(jù)文件批量寫入拗窃,主要實(shí)現(xiàn)類ParquetBulkWriter瞎领、AvroBulkWriter泌辫、OrcBulkWriter、SequenceFileWriter代表了數(shù)據(jù)寫入的壓縮格式九默。
  2. RecoverableWriter: 具有失敗恢復(fù)能力的外部文件系統(tǒng)寫入器震放,主要實(shí)現(xiàn)類HadoopRecoverableWriter、S3RecoverableWriter驼修、LocalRecoverableWriter殿遂,代表對不同類型文件系統(tǒng)的操作。
  3. BulkPartWriter:InProgressFileWriter 實(shí)現(xiàn)類用來向inprogress文件寫數(shù)據(jù)乙各,持有BulkWriter墨礁。
  4. OutputStreamBasedPartFileWriter:Part Writer 基類,使用 RecoverableFsDataOutputStream 寫出數(shù)據(jù)觅丰。
  5. RecoverableFsDataOutputStream:文件系統(tǒng)輸出流,能夠從文件系統(tǒng)的指定偏移量進(jìn)行數(shù)據(jù)寫入妨退。
  6. BucketAssigner:負(fù)責(zé)將數(shù)據(jù)劃分到不同的Bucket妇萄,可以根據(jù)數(shù)據(jù)格式自定義Assigner。內(nèi)部集成 SimpleVersionedSerializer咬荷,用來對BucketID 做序列化/反序列化操作冠句。
    a. BucketAssigner 子類 DateTimeBucketAssigner 根據(jù)數(shù)據(jù)的ProcessTime 生成 yyyy-MM-dd--HH 格式的 Bucket 名稱。同時(shí)使用 SimpleVersionedStringSerializer 對Bucket 名稱序列化幸乒。
  7. Bucket:StreamingFileSink數(shù)據(jù)輸出的目錄懦底,每一條處理的數(shù)據(jù)根據(jù)BucketAssigner被分配到某個(gè)Bucket。主要功能:
    1. 維護(hù)一份 InProgressFile 文件罕扎,負(fù)責(zé)該文件的創(chuàng)建聚唐、數(shù)據(jù)寫入、提交寫入腔召。
    2. 在 StreamingFileSink 執(zhí)行Checkpoint時(shí)杆查,負(fù)責(zé)構(gòu)建 BucketState 進(jìn)而進(jìn)行該狀態(tài)序列化 。
    3. 在 StreamingFileSink Checkpoint 完成后臀蛛,重命名 InProgressFile 文件亲桦,
    4. 在 StreamingFileSink 從savepoint 啟動(dòng)時(shí),從 BucketState 恢復(fù) InProgressFile 相關(guān)信息浊仆。
  8. BucketState: Bucket的狀態(tài)信息客峭。通過BucketState能夠恢復(fù)Bucket inprogress 文件及當(dāng)前寫入偏移量,從而繼續(xù)向該 inprogress文件中追加內(nèi)容抡柿,同時(shí)能夠恢復(fù) Pending狀態(tài)文件信息舔琅,從而繼續(xù)執(zhí)行后續(xù)重名邏輯。
  9. Buckets:負(fù)責(zé)管理 StreamingFileSink 中所有活躍狀態(tài)的 Bucket洲劣。包括數(shù)據(jù)所在Bucket 分配搏明,Active Bucket 快照狀態(tài)存儲(chǔ)鼠锈。
  10. RollingPolicy:定義了buckt 生成新的in-progress文件、及將in-progress 文件變更為最終part文件的策略星著。 最常用的策略是CheckpointRollingPolicy购笆,在每次Checkpoint完成時(shí),根據(jù)in-progress文件生成part文件虚循。
  11. StreamingFileSinkHelper:StreamingFileSink 調(diào)用 StreamingFileSinkHelper 方法完成對Buckets數(shù)據(jù)的寫入及狀態(tài)存儲(chǔ)同欠。
  12. StreamingFileSink:根據(jù)BucketsBuilder構(gòu)造器創(chuàng)建Buckets,初始化時(shí)創(chuàng)建StreamingFileSinkHelper横缔,在Sink铺遂、checkpoint 方法中調(diào)用 StreamingFileSinkHelper 接口。

數(shù)據(jù)寫入

Flink寫文件流程為茎刚,先將數(shù)據(jù)寫入inprogress臨時(shí)文件襟锐,在滿足RollingPolicy時(shí),將inprogress臨時(shí)文件重命名為最終的part文件膛锭。
參考Flink1.12.1版本的代碼粮坞,學(xué)習(xí)下 Flink 將數(shù)據(jù)寫入文件的具體流程。


Flink 文件寫入.png
  1. StreamingFileSink 執(zhí)行 invoke() 方法處理數(shù)據(jù)初狰,是通過調(diào)用 StreamingFileSinkHelper onElement()方法對 Buckets 進(jìn)行操作莫杈。
functions.sink.filesystem.Buckets#onElement

public Bucket<IN, BucketID> onElement(
        final IN value,
        final long currentProcessingTime,
        @Nullable final Long elementTimestamp,
        final long currentWatermark)
        throws Exception {
    // note: 獲取當(dāng)前數(shù)據(jù)所在的 BucketID, 即被被分桶后的子文件夾名稱
    final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
    // note: 從已緩存的集合中獲取Bucket 或者 新建Bucket并緩存
    final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
    // note: 將數(shù)據(jù)寫入具體 bucket
    bucket.write(value, currentProcessingTime);
    return bucket;
}
  1. Buckets 處理數(shù)據(jù)時(shí),需要根據(jù)定義的 BucketAssigner 獲取數(shù)據(jù)所在的 Bucket 標(biāo)識奢入。上述案例使用了 DateTimeBucketAssigner 了解下它如何根據(jù)ProcessTime 獲取 BucketID筝闹。
bucketassigners.DateTimeBucketAssigner#getBucketId

public String getBucketId(IN element, BucketAssigner.Context context) {
    if (dateTimeFormatter == null) {
        // note: 將Processing Time 轉(zhuǎn)換為 yyyy-MM-dd--HH 格式
        dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
    }
    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}
  1. 根據(jù) BucketID 從Buckets 中拿到有效的Bucket。
filesystem.Buckets#getOrCreateBucketForBucketId  

private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId)
        throws IOException {
    Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
    if (bucket == null) {
        // note: 構(gòu)建buckt所在完整路徑腥光,例如 hdfs://localhost:9000/user/hive/warehouse/userinfo/2020-08-08--14
        final Path bucketPath = assembleBucketPath(bucketId);
        // note: 創(chuàng)建 Bucket 并由activeBuckets緩存
        bucket =
                bucketFactory.getNewBucket(
                        subtaskIndex,
                        bucketId,
                        bucketPath,
                        maxPartCounter,
                        bucketWriter,
                        rollingPolicy,
                        fileLifeCycleListener,
                        outputFileConfig);
        activeBuckets.put(bucketId, bucket);

        notifyBucketCreate(bucket);
    }
    return bucket;
}
  1. 初次向Bucket寫入數(shù)據(jù)关顷,需要?jiǎng)?chuàng)建part的臨時(shí)文件及用來向文件寫數(shù)據(jù)的InProgressFileWriter對象,同時(shí)創(chuàng)建BulkWriter武福,用來進(jìn)行數(shù)據(jù)寫入解寝。
    1. 創(chuàng)建臨時(shí)文件:根據(jù)inprocess文件生成規(guī)則,在HadoopRecoverableFsDataOutputStream初始化時(shí)創(chuàng)建并返回針對該文件的DataOutputStream艘儒。
    2. 創(chuàng)建BulkWriter:當(dāng)前案例中由ParquetWriterFactory工廠類創(chuàng)建ParquetBulkWriter聋伦,并傳遞臨時(shí)文件對應(yīng)的DataOutputStream。
sinkfilesystem.Bucket#write 
void write(IN element, long currentTime) throws IOException {
    if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
        inProgressPart = rollPartFile(currentTime);
    }
    //note: BulkPartWriter使用 BulkWriter 寫入數(shù)據(jù)界睁。
    inProgressPart.write(element, currentTime);
}

private InProgressFileWriter<IN, BucketID> rollPartFile(final long currentTime)
        throws IOException {
    // note: 關(guān)閉part文件觉增。
    closePartFile();
    final Path partFilePath = assembleNewPartPath();
    // note: 創(chuàng)建InProgressFileWriter
    return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
}

// 創(chuàng)建BulkPartWriter 使用BulkWriter進(jìn)行數(shù)據(jù)寫入,
OutputStreamBasedBucketWriter#openNewInProgressFile
public InProgressFileWriter<IN, BucketID> openNewInProgressFile(
        final BucketID bucketID, final Path path, final long creationTime)
        throws IOException {
    // note: 根據(jù)的inprocess文件路徑翻斟,由recoverableWriter創(chuàng)建逾礁。
    return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
}
public InProgressFileWriter<IN, BucketID> openNew(
         final BucketID bucketId,
         final RecoverableFsDataOutputStream stream,
         final Path path,
         final long creationTime)
         throws IOException {

     final BulkWriter<IN> writer = writerFactory.create(stream);
     return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 }
}

// 創(chuàng)建hdfs文件系統(tǒng)臨時(shí)文件,針對該文件創(chuàng)建 RecoverableFsDataOutputStream
HadoopRecoverableWriter#open
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
   final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
   final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
   return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
}

HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
        throws IOException {
    // note: 確保hadoop 支持 truncate方法
    ensureTruncateInitialized();

    this.fs = checkNotNull(fs);
    this.targetFile = checkNotNull(targetFile);
    this.tempFile = checkNotNull(tempFile);
    // 創(chuàng)建臨時(shí)文件
    this.out = fs.create(tempFile);
}
  1. 將數(shù)據(jù)以parquet格式寫入臨時(shí)文件,調(diào)用鏈路比較長中間涉及不少工廠類及代理類嘹履,最終調(diào)用的還是parquet框架本身的API腻扇。
filesystem.BulkPartWriter#write
public void write(IN element, long currentTime) throws IOException {
   writer.addElement(element);
   markWrite(currentTime);
}

parquet.ParquetBulkWriter#addElement
public void addElement(T datum) throws IOException {
   // note: org.apache.parquet.hadoop.ParquetWriter
   parquetWriter.write(datum); 
}

checkpoint 過程

在生產(chǎn)環(huán)境中大多使用 OnCheckpointRollingPolicy 策略,即在執(zhí)行Checkpoint時(shí)存儲(chǔ)BucketState砾嫉,提交已寫入的數(shù)據(jù)記錄已寫入數(shù)據(jù)的偏移量幼苛,在CK完成后將 inprogress 文件重命名為最終 part 文件。
根據(jù)Checkpoint生命周期方法焕刮,了解執(zhí)行過程舶沿。


StreamingFileSink 處理流程
  1. initializeState 創(chuàng)建StreamingFileSinkHelper,做一些初始化工作配并。如果從已有的狀態(tài)快照啟動(dòng)括荡,會(huì)對BucketStates進(jìn)行恢復(fù),稍后詳細(xì)介紹快照恢復(fù)的邏輯溉旋,先看狀態(tài)快照中存儲(chǔ)了什么信息畸冲,及后續(xù)邏輯。
  2. snapshotState 狀態(tài)快照存儲(chǔ)观腊。Buckets 的 snapshotState() 會(huì)保存序列化后的 BucketState 及當(dāng)前子任務(wù)處理的最大part文件個(gè)數(shù)邑闲。
public void snapshotState(
    final long checkpointId,
    final ListState<byte[]> bucketStatesContainer,
    final ListState<Long> partCounterStateContainer)
    throws Exception {
// note: 清理歷史狀態(tài)信息
bucketStatesContainer.clear();
partCounterStateContainer.clear();
    
// note: 將 BucketState 以二進(jìn)制格式存儲(chǔ)到 bucketStatesContainer
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
// note: 存儲(chǔ)當(dāng)前任務(wù)處理的最大文件數(shù)    
partCounterStateContainer.add(maxPartCounter);
}

private void snapshotActiveBuckets(
        final long checkpointId, final ListState<byte[]> bucketStatesContainer)
        throws Exception {
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
        //note: 每個(gè)正在使用的Bucket會(huì)生成BucketState
        final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
        // note: 將BucketState 序列化后存儲(chǔ)到 ListState
        final byte[] serializedBucketState =
                SimpleVersionedSerialization.writeVersionAndSerialize(
                        bucketStateSerializer, bucketState);

        bucketStatesContainer.add(serializedBucketState);
    }
}

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
    //note: 關(guān)閉 inProgressPart, 填充BucketState 使用的屬性信息
    prepareBucketForCheckpointing(checkpointId);
    
    //note: ck 期間有數(shù)據(jù)寫入。
    if (inProgressPart != null) {
        inProgressFileRecoverable = inProgressPart.persist();
        inProgressFileCreationTime = inProgressPart.getCreationTime();
        this.inProgressFileRecoverablesPerCheckpoint.put(
                checkpointId, inProgressFileRecoverable);
    }
    // note: 構(gòu)建出BucketState
    return new BucketState<>(
            bucketId,
            bucketPath,
            inProgressFileCreationTime,
            inProgressFileRecoverable,
            pendingFileRecoverablesPerCheckpoint);
}

private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
    if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
        closePartFile();
    }
    //  note: closePartFile()會(huì)將生成的 pendingFileRecoverable 寫入pendingFileRecoverablesForCurrentCheckpoint
    if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
        pendingFileRecoverablesPerCheckpoint.put(
                checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
        pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
    }
}

closePartFile() 主要處理工作包含:

  1. 使用 BulkWriter 將數(shù)據(jù) flush 到外部文件恕沫。
  2. 基于當(dāng)前 inprogressPart 創(chuàng)建出PendingFileRecoverable對象监憎,其中封裝了 HadoopFsRecoverable 對包含了targetFile(part文件)纱意、tempFile(inprogress文件)婶溯、offset(數(shù)據(jù)當(dāng)前寫入的偏移量)屬性,是最重要的狀態(tài)信息偷霉。在checkpoint完成后迄委,會(huì)將tempFile命名為targetFile。
  3. 關(guān)閉inprogressPart类少,填充 pendingFileRecoverablesForCurrentCheckpoint信息叙身,代表當(dāng)前CK正在處理的 inprogressPart文件。

private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
    InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
    if (inProgressPart != null) {
        //note: 和inProgressPart文件一一對應(yīng)
        pendingFileRecoverable = inProgressPart.closeForCommit();
        //note: 存儲(chǔ)到LIST
        pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
        inProgressPart = null;
    }
    return pendingFileRecoverable;
}

filesystem.BulkPartWriter#closeForCommit
public PendingFileRecoverable closeForCommit() throws IOException {
      // note: BulkWriter   
      writer.flush();
      writer.finish();
      // note: OutputStreamBasedPartFileWriter#closeForCommit
      return super.closeForCommit();
  }
  
OutputStreamBasedPartFileWriter#closeForCommit  
public PendingFileRecoverable closeForCommit() throws IOException {
    // note: 創(chuàng)建OutputStreamBasedPendingFileRecoverable封裝HadoopRecoverableFsDataOutputStream
    return new OutputStreamBasedPendingFileRecoverable(
            currentPartStream.closeForCommit().getRecoverable());
}

HadoopRecoverableFsDataOutputStream#closeForCommit  
public Committer closeForCommit() throws IOException {
      final long pos = getPos();
      close();
      // note: 構(gòu)建HadoopFsRecoverable硫狞,最終會(huì)調(diào)用commit方法完成文件rename
      return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
}
  1. notifyCheckpointComplete checkpoint完成后回調(diào)該方法完成對inprogress文件的rename信轿,如果該方法執(zhí)行失敗不會(huì)撤銷已生成的checkpoint。

注意:HadoopFsCommitter執(zhí)行commit對文件進(jìn)行重命名時(shí)残吩,并不會(huì)覆蓋已有的part文件财忽,此時(shí)數(shù)據(jù)準(zhǔn)確性沒辦法保障。

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
    Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
            pendingFileRecoverablesPerCheckpoint
                    .headMap(checkpointId, true)
                    .entrySet()
                    .iterator();
    while (it.hasNext()) {
        Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
        for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
                entry.getValue()) {
            // note: 從pendingFileRecoverable 生成PendingFile 執(zhí)行commit(), 對progress文件重命名泣侮。
            bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
        }
        it.remove();
    }
}

OutputStreamBasedBucketWriter#recoverPendingFile
public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable)
        throws IOException {
    final RecoverableWriter.CommitRecoverable commitRecoverable;

    if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
        commitRecoverable =
                ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable)
                        .getCommitRecoverable();
    } else if (pendingFileRecoverable
            instanceof OutputStreamBasedInProgressFileRecoverable) {
        commitRecoverable =
                ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable)
                        .getResumeRecoverable();
    } else {
        throw new IllegalArgumentException(
                "can not recover from the pendingFileRecoverable");
    }
    return new OutputStreamBasedPendingFile(
        // note: 最終調(diào)用HadoopFsCommitter的commit方法即彪。
        recoverableWriter.recoverForCommit(commitRecoverable));
}

HadoopFsCommitter#commit
public void commit() throws IOException {
    final Path src = recoverable.tempFile();
    final Path dest = recoverable.targetFile();
    final long expectedLength = recoverable.offset();

    final FileStatus srcStatus;
    try {
        srcStatus = fs.getFileStatus(src);
    } catch (IOException e) {
        throw new IOException("Cannot clean commit: Staging file does not exist.");
    }

    if (srcStatus.getLen() != expectedLength) {
        // something was done to this file since the committer was created.
        // this is not the "clean" case
        throw new IOException("Cannot clean commit: File has trailing junk data.");
    }

    try {
        // note: 文件被重命名為最終的part文件。
        fs.rename(src, dest);
    } catch (IOException e) {
        throw new IOException(
                "Committing file by rename failed: " + src + " to " + dest, e);
    }
}

  1. initializeState 時(shí)狀態(tài)恢復(fù)主要包含以下幾個(gè)主要流程:
    1. restoreInProgressFile 恢復(fù)正在處理的inprogress文件活尊。從 inProgressFileRecoverable 獲取inprogress文件名稱及已寫入數(shù)據(jù)的偏移量隶校,重新構(gòu)建BulkWriter漏益。
    2. commitRecoveredPendingFiles 提交pending狀態(tài)文件。pending狀態(tài)文件數(shù)據(jù)已經(jīng)寫入文件系統(tǒng)深胳,只是還未執(zhí)行最終的commit操作對文件執(zhí)行重命名绰疤,則繼續(xù)執(zhí)行后續(xù)重命名操作。
    3. updateActiveBucketId 如果activeBuckets 包含 restoredBucket 提交該restoredBucket稠屠,否則存儲(chǔ)到activeBuckets峦睡。
/** Constructor to restore a bucket from checkpointed state. */
private Bucket(
        final int subtaskIndex,
        final long initialPartCounter,
        final BucketWriter<IN, BucketID> partFileFactory,
        final RollingPolicy<IN, BucketID> rollingPolicy,
        final BucketState<BucketID> bucketState,
        @Nullable final FileLifeCycleListener<BucketID> fileListener,
        final OutputFileConfig outputFileConfig)
        throws IOException {

    this(
            subtaskIndex,
            bucketState.getBucketId(),
            bucketState.getBucketPath(),
            initialPartCounter,
            partFileFactory,
            rollingPolicy,
            fileListener,
            outputFileConfig);
    restoreInProgressFile(bucketState);
    commitRecoveredPendingFiles(bucketState);
}


private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
    if (!state.hasInProgressFileRecoverable()) {
        return;
    }
    // we try to resume the previous in-progress file
    final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
            state.getInProgressFileRecoverable();
   
    //  note: bucketWriter 是否具有恢復(fù)能力
    if (bucketWriter.getProperties().supportsResume()) {
        // note: 恢復(fù)inProgressPart
        inProgressPart =
                bucketWriter.resumeInProgressFileFrom(
                        bucketId,
                        inProgressFileRecoverable,
                        state.getInProgressFileCreationTime());
    } else {
        // if the writer does not support resume, then we close the
        // in-progress part and commit it, as done in the case of pending files.
        bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
    }
}

private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {

    // we commit pending files for checkpoints that precess the last successful one, from which
    // we are recovering
    for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables :
            state.getPendingFileRecoverablesPerCheckpoint().values()) {
        for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
                pendingFileRecoverables) {
            bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
        }
    }
}

Buckets#updateActiveBucketId
private void updateActiveBucketId(
        final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
    // note: 當(dāng)前流程沒有 bucketLifeCycleListener,沒有要處理的狀態(tài)信息权埠,直接返回榨了。
    if (!restoredBucket.isActive()) {
        notifyBucketInactive(restoredBucket);
        return;
    }
    // note: 當(dāng)前 activeBuckets 已經(jīng)包含restoredBucket所屬的Bucket,則將restoredBucket 進(jìn)行提交攘蔽。否則存儲(chǔ)到activeBuckets 
    final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
    if (bucket != null) {
        bucket.merge(restoredBucket);
    } else {
        activeBuckets.put(bucketId, restoredBucket);
    }
}

void merge(final Bucket<IN, BucketID> bucket) throws IOException {
    InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
    if (pendingFileRecoverable != null) {
        pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
    }
}

常見問題

part文件格式

part 文件名稱生成規(guī)則: bucketPath_partPrefix_subtaskIndex_currentPartCounter_partSuffix龙屉。

  • bucketPath:bucket路徑,例如:hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19满俗。
  • partPrefix:part文件前綴转捕,通過OutputFileConfig設(shè)置,默認(rèn)為part唆垃。
  • subtaskIndex: Sink Operator并行寫入時(shí)五芝,某一子任務(wù)當(dāng)前索引,從0開始辕万。
  • currentPartCounter:子任務(wù)生成的part文件數(shù)量枢步,從0開始。

例如:part-0-0 代表第一個(gè)task生成的第一個(gè)完整的part文件渐尿。

inprogress 臨時(shí)文件名稱規(guī)則:.part 文件名稱.inprogress.UUID

  • inprogress: 正在寫入的臨時(shí)文件的標(biāo)識醉途。
  • UUID: 隨機(jī)生成的UUID。

例如: hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108 代表 形成part-0-0產(chǎn)生的臨時(shí)文件砖茸。

數(shù)據(jù)準(zhǔn)確性保障

  1. 非checkpoint/savepoint啟動(dòng)隘擎,當(dāng)文件系統(tǒng)已經(jīng)存在部分part文件,從kafka起始位置重新消費(fèi)數(shù)據(jù)可能會(huì)導(dǎo)致數(shù)據(jù)缺失或者增多凉夯,因?yàn)閞ename操作并不會(huì)覆蓋已有的part文件(應(yīng)該是個(gè)BUG货葬,提了個(gè)jira還沒回復(fù)HadoopFsCommitter, file rename failure)。

    假設(shè)存在part-0-0歷史文件劲够,從起始位置消費(fèi)數(shù)據(jù)會(huì)生成新的part-0-0文件震桶,新文件存儲(chǔ)的數(shù)據(jù)條數(shù)無法保證和歷史文件一致,可能多也可能少再沧。

  2. notifyCheckpointComplete執(zhí)行時(shí)尼夺,F(xiàn)link程序被Kill,從最新狀態(tài)快照啟動(dòng),不會(huì)丟失數(shù)據(jù)淤堵。

    假設(shè).part-0-0.inprogress.xxxx 對應(yīng)的bucket已經(jīng)執(zhí)行完snapshotState方法寝衫,則數(shù)據(jù)已經(jīng)被flush到文件系統(tǒng),在notifyCheckpointComplete階段將完成對該inprogress文件的重命名拐邪,如果此時(shí)程序突然被kill慰毅,該inprocess文件是有數(shù)據(jù)的,只是文件沒有被最終重命名扎阶。此時(shí)汹胃,從CK啟動(dòng),則會(huì)先完成會(huì) inprocess 文件的重命名东臀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末着饥,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子惰赋,更是在濱河造成了極大的恐慌宰掉,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赁濒,死亡現(xiàn)場離奇詭異轨奄,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拒炎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進(jìn)店門挪拟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人击你,你說我怎么就攤上這事玉组。” “怎么了果漾?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵球切,是天一觀的道長谷誓。 經(jīng)常有香客問我绒障,道長,這世上最難降的妖魔是什么捍歪? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任户辱,我火速辦了婚禮,結(jié)果婚禮上糙臼,老公的妹妹穿的比我還像新娘庐镐。我一直安慰自己,他們只是感情好变逃,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布必逆。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪名眉。 梳的紋絲不亂的頭發(fā)上粟矿,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天,我揣著相機(jī)與錄音损拢,去河邊找鬼陌粹。 笑死,一個(gè)胖子當(dāng)著我的面吹牛福压,可吹牛的內(nèi)容都是我干的掏秩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼荆姆,長吁一口氣:“原來是場噩夢啊……” “哼蒙幻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起胆筒,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤杆煞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后腐泻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體决乎,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年派桩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了构诚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,727評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡铆惑,死狀恐怖范嘱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情员魏,我是刑警寧澤丑蛤,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站撕阎,受9級特大地震影響受裹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜虏束,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一棉饶、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧镇匀,春花似錦照藻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽群发。三九已至,卻和暖如春发乔,著一層夾襖步出監(jiān)牢的瞬間也物,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工列疗, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留滑蚯,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓抵栈,卻偏偏與公主長得像告材,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子古劲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容