背景
Flink 支持將流數(shù)據(jù)以文件形式存儲(chǔ)到外部系統(tǒng)聘裁,典型使用場景是將數(shù)據(jù)寫入Hive表所在 HDFS存儲(chǔ)路徑,通過Hive 做查詢分析肠牲。隨著Flink文件寫入被業(yè)務(wù)廣泛使用土辩,暴露出很多問題遭赂,因此需要了解 Flink Streaming File sink 的實(shí)現(xiàn)邏輯。
案例
從Kafka消費(fèi)JSON數(shù)據(jù)臭增,轉(zhuǎn)換為 UserInfo 實(shí)體類數(shù)據(jù)流懂酱,最終以Parquet 格式寫入Hive表對應(yīng)的HDFS路徑。使用 Flink 1.12.1誊抛,Hadoop 2.8.0列牺, hive 2.3.8。
-----------------------------
- hive 建表語句
-----------------------------
create table userinfo(
userid int,
username string
) stored as parquet;
-----------------------------
- java 實(shí)體類
-----------------------------
public class UserInfo {
private int userId;
private String userName;
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
@Override
public String toString() {
return "UserInfo{" +
"userId=" + userId +
", userName='" + userName + '\'' +
'}';
}
}
-----------------------------
- Flink 文件寫入程序
-----------------------------
public class Kafka2Parquet {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test_001");
env.setParallelism(1);
env.enableCheckpointing(30000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000));
env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/user/todd/checkpoint"));
FlinkKafkaConsumer<String> dataStream = new FlinkKafkaConsumer("mqTest02", new SimpleStringSchema(), props);
dataStream.setStartFromLatest();
DataStream<UserInfo> userInfoDataStream = env.addSource(dataStream)
.map(value -> JsonUtils.parseJson(value, UserInfo.class));
// 1. 設(shè)置BulkFormat Builder 2.使用 CheckpointRollingPolicy
StreamingFileSink<UserInfo> parquetSink = StreamingFileSink
.forBulkFormat(new Path("hdfs://localhost:9000/user/hive/warehouse/userinfo"), ParquetAvroWriters.forReflectRecord(UserInfo.class))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
userInfoDataStream.addSink(parquetSink);
env.execute();
}
}
-----------------------------
- 生成的parquet 文件名稱及路徑
-----------------------------
inprogress 臨時(shí)文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108
part 最終文件
hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/part-0-0
核心類
- BulkWriter:用于不同格式的數(shù)據(jù)文件批量寫入拗窃,主要實(shí)現(xiàn)類ParquetBulkWriter瞎领、AvroBulkWriter泌辫、OrcBulkWriter、SequenceFileWriter代表了數(shù)據(jù)寫入的壓縮格式九默。
- RecoverableWriter: 具有失敗恢復(fù)能力的外部文件系統(tǒng)寫入器震放,主要實(shí)現(xiàn)類HadoopRecoverableWriter、S3RecoverableWriter驼修、LocalRecoverableWriter殿遂,代表對不同類型文件系統(tǒng)的操作。
- BulkPartWriter:InProgressFileWriter 實(shí)現(xiàn)類用來向inprogress文件寫數(shù)據(jù)乙各,持有BulkWriter墨礁。
- OutputStreamBasedPartFileWriter:Part Writer 基類,使用 RecoverableFsDataOutputStream 寫出數(shù)據(jù)觅丰。
- RecoverableFsDataOutputStream:文件系統(tǒng)輸出流,能夠從文件系統(tǒng)的指定偏移量進(jìn)行數(shù)據(jù)寫入妨退。
- BucketAssigner:負(fù)責(zé)將數(shù)據(jù)劃分到不同的Bucket妇萄,可以根據(jù)數(shù)據(jù)格式自定義Assigner。內(nèi)部集成 SimpleVersionedSerializer咬荷,用來對BucketID 做序列化/反序列化操作冠句。
a. BucketAssigner 子類 DateTimeBucketAssigner 根據(jù)數(shù)據(jù)的ProcessTime 生成 yyyy-MM-dd--HH 格式的 Bucket 名稱。同時(shí)使用 SimpleVersionedStringSerializer 對Bucket 名稱序列化幸乒。 - Bucket:StreamingFileSink數(shù)據(jù)輸出的目錄懦底,每一條處理的數(shù)據(jù)根據(jù)BucketAssigner被分配到某個(gè)Bucket。主要功能:
- 維護(hù)一份 InProgressFile 文件罕扎,負(fù)責(zé)該文件的創(chuàng)建聚唐、數(shù)據(jù)寫入、提交寫入腔召。
- 在 StreamingFileSink 執(zhí)行Checkpoint時(shí)杆查,負(fù)責(zé)構(gòu)建 BucketState 進(jìn)而進(jìn)行該狀態(tài)序列化 。
- 在 StreamingFileSink Checkpoint 完成后臀蛛,重命名 InProgressFile 文件亲桦,
- 在 StreamingFileSink 從savepoint 啟動(dòng)時(shí),從 BucketState 恢復(fù) InProgressFile 相關(guān)信息浊仆。
- BucketState: Bucket的狀態(tài)信息客峭。通過BucketState能夠恢復(fù)Bucket inprogress 文件及當(dāng)前寫入偏移量,從而繼續(xù)向該 inprogress文件中追加內(nèi)容抡柿,同時(shí)能夠恢復(fù) Pending狀態(tài)文件信息舔琅,從而繼續(xù)執(zhí)行后續(xù)重名邏輯。
- Buckets:負(fù)責(zé)管理 StreamingFileSink 中所有活躍狀態(tài)的 Bucket洲劣。包括數(shù)據(jù)所在Bucket 分配搏明,Active Bucket 快照狀態(tài)存儲(chǔ)鼠锈。
- RollingPolicy:定義了buckt 生成新的in-progress文件、及將in-progress 文件變更為最終part文件的策略星著。 最常用的策略是CheckpointRollingPolicy购笆,在每次Checkpoint完成時(shí),根據(jù)in-progress文件生成part文件虚循。
- StreamingFileSinkHelper:StreamingFileSink 調(diào)用 StreamingFileSinkHelper 方法完成對Buckets數(shù)據(jù)的寫入及狀態(tài)存儲(chǔ)同欠。
- StreamingFileSink:根據(jù)BucketsBuilder構(gòu)造器創(chuàng)建Buckets,初始化時(shí)創(chuàng)建StreamingFileSinkHelper横缔,在Sink铺遂、checkpoint 方法中調(diào)用 StreamingFileSinkHelper 接口。
數(shù)據(jù)寫入
Flink寫文件流程為茎刚,先將數(shù)據(jù)寫入inprogress臨時(shí)文件襟锐,在滿足RollingPolicy時(shí),將inprogress臨時(shí)文件重命名為最終的part文件膛锭。
參考Flink1.12.1版本的代碼粮坞,學(xué)習(xí)下 Flink 將數(shù)據(jù)寫入文件的具體流程。
- StreamingFileSink 執(zhí)行 invoke() 方法處理數(shù)據(jù)初狰,是通過調(diào)用 StreamingFileSinkHelper onElement()方法對 Buckets 進(jìn)行操作莫杈。
functions.sink.filesystem.Buckets#onElement
public Bucket<IN, BucketID> onElement(
final IN value,
final long currentProcessingTime,
@Nullable final Long elementTimestamp,
final long currentWatermark)
throws Exception {
// note: 獲取當(dāng)前數(shù)據(jù)所在的 BucketID, 即被被分桶后的子文件夾名稱
final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
// note: 從已緩存的集合中獲取Bucket 或者 新建Bucket并緩存
final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
// note: 將數(shù)據(jù)寫入具體 bucket
bucket.write(value, currentProcessingTime);
return bucket;
}
- Buckets 處理數(shù)據(jù)時(shí),需要根據(jù)定義的 BucketAssigner 獲取數(shù)據(jù)所在的 Bucket 標(biāo)識奢入。上述案例使用了 DateTimeBucketAssigner 了解下它如何根據(jù)ProcessTime 獲取 BucketID筝闹。
bucketassigners.DateTimeBucketAssigner#getBucketId
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
// note: 將Processing Time 轉(zhuǎn)換為 yyyy-MM-dd--HH 格式
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}
- 根據(jù) BucketID 從Buckets 中拿到有效的Bucket。
filesystem.Buckets#getOrCreateBucketForBucketId
private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId)
throws IOException {
Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
if (bucket == null) {
// note: 構(gòu)建buckt所在完整路徑腥光,例如 hdfs://localhost:9000/user/hive/warehouse/userinfo/2020-08-08--14
final Path bucketPath = assembleBucketPath(bucketId);
// note: 創(chuàng)建 Bucket 并由activeBuckets緩存
bucket =
bucketFactory.getNewBucket(
subtaskIndex,
bucketId,
bucketPath,
maxPartCounter,
bucketWriter,
rollingPolicy,
fileLifeCycleListener,
outputFileConfig);
activeBuckets.put(bucketId, bucket);
notifyBucketCreate(bucket);
}
return bucket;
}
- 初次向Bucket寫入數(shù)據(jù)关顷,需要?jiǎng)?chuàng)建part的臨時(shí)文件及用來向文件寫數(shù)據(jù)的InProgressFileWriter對象,同時(shí)創(chuàng)建BulkWriter武福,用來進(jìn)行數(shù)據(jù)寫入解寝。
- 創(chuàng)建臨時(shí)文件:根據(jù)inprocess文件生成規(guī)則,在HadoopRecoverableFsDataOutputStream初始化時(shí)創(chuàng)建并返回針對該文件的DataOutputStream艘儒。
- 創(chuàng)建BulkWriter:當(dāng)前案例中由ParquetWriterFactory工廠類創(chuàng)建ParquetBulkWriter聋伦,并傳遞臨時(shí)文件對應(yīng)的DataOutputStream。
sinkfilesystem.Bucket#write
void write(IN element, long currentTime) throws IOException {
if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
inProgressPart = rollPartFile(currentTime);
}
//note: BulkPartWriter使用 BulkWriter 寫入數(shù)據(jù)界睁。
inProgressPart.write(element, currentTime);
}
private InProgressFileWriter<IN, BucketID> rollPartFile(final long currentTime)
throws IOException {
// note: 關(guān)閉part文件觉增。
closePartFile();
final Path partFilePath = assembleNewPartPath();
// note: 創(chuàng)建InProgressFileWriter
return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
}
// 創(chuàng)建BulkPartWriter 使用BulkWriter進(jìn)行數(shù)據(jù)寫入,
OutputStreamBasedBucketWriter#openNewInProgressFile
public InProgressFileWriter<IN, BucketID> openNewInProgressFile(
final BucketID bucketID, final Path path, final long creationTime)
throws IOException {
// note: 根據(jù)的inprocess文件路徑翻斟,由recoverableWriter創(chuàng)建逾礁。
return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
}
public InProgressFileWriter<IN, BucketID> openNew(
final BucketID bucketId,
final RecoverableFsDataOutputStream stream,
final Path path,
final long creationTime)
throws IOException {
final BulkWriter<IN> writer = writerFactory.create(stream);
return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
}
}
// 創(chuàng)建hdfs文件系統(tǒng)臨時(shí)文件,針對該文件創(chuàng)建 RecoverableFsDataOutputStream
HadoopRecoverableWriter#open
public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath);
final org.apache.hadoop.fs.Path tempFile = generateStagingTempFilePath(fs, targetFile);
return new HadoopRecoverableFsDataOutputStream(fs, targetFile, tempFile);
}
HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
throws IOException {
// note: 確保hadoop 支持 truncate方法
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
// 創(chuàng)建臨時(shí)文件
this.out = fs.create(tempFile);
}
- 將數(shù)據(jù)以parquet格式寫入臨時(shí)文件,調(diào)用鏈路比較長中間涉及不少工廠類及代理類嘹履,最終調(diào)用的還是parquet框架本身的API腻扇。
filesystem.BulkPartWriter#write
public void write(IN element, long currentTime) throws IOException {
writer.addElement(element);
markWrite(currentTime);
}
parquet.ParquetBulkWriter#addElement
public void addElement(T datum) throws IOException {
// note: org.apache.parquet.hadoop.ParquetWriter
parquetWriter.write(datum);
}
checkpoint 過程
在生產(chǎn)環(huán)境中大多使用 OnCheckpointRollingPolicy 策略,即在執(zhí)行Checkpoint時(shí)存儲(chǔ)BucketState砾嫉,提交已寫入的數(shù)據(jù)記錄已寫入數(shù)據(jù)的偏移量幼苛,在CK完成后將 inprogress 文件重命名為最終 part 文件。
根據(jù)Checkpoint生命周期方法焕刮,了解執(zhí)行過程舶沿。
- initializeState 創(chuàng)建StreamingFileSinkHelper,做一些初始化工作配并。如果從已有的狀態(tài)快照啟動(dòng)括荡,會(huì)對BucketStates進(jìn)行恢復(fù),稍后詳細(xì)介紹快照恢復(fù)的邏輯溉旋,先看狀態(tài)快照中存儲(chǔ)了什么信息畸冲,及后續(xù)邏輯。
- snapshotState 狀態(tài)快照存儲(chǔ)观腊。Buckets 的 snapshotState() 會(huì)保存序列化后的 BucketState 及當(dāng)前子任務(wù)處理的最大part文件個(gè)數(shù)邑闲。
public void snapshotState(
final long checkpointId,
final ListState<byte[]> bucketStatesContainer,
final ListState<Long> partCounterStateContainer)
throws Exception {
// note: 清理歷史狀態(tài)信息
bucketStatesContainer.clear();
partCounterStateContainer.clear();
// note: 將 BucketState 以二進(jìn)制格式存儲(chǔ)到 bucketStatesContainer
snapshotActiveBuckets(checkpointId, bucketStatesContainer);
// note: 存儲(chǔ)當(dāng)前任務(wù)處理的最大文件數(shù)
partCounterStateContainer.add(maxPartCounter);
}
private void snapshotActiveBuckets(
final long checkpointId, final ListState<byte[]> bucketStatesContainer)
throws Exception {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
//note: 每個(gè)正在使用的Bucket會(huì)生成BucketState
final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
// note: 將BucketState 序列化后存儲(chǔ)到 ListState
final byte[] serializedBucketState =
SimpleVersionedSerialization.writeVersionAndSerialize(
bucketStateSerializer, bucketState);
bucketStatesContainer.add(serializedBucketState);
}
}
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
//note: 關(guān)閉 inProgressPart, 填充BucketState 使用的屬性信息
prepareBucketForCheckpointing(checkpointId);
//note: ck 期間有數(shù)據(jù)寫入。
if (inProgressPart != null) {
inProgressFileRecoverable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
this.inProgressFileRecoverablesPerCheckpoint.put(
checkpointId, inProgressFileRecoverable);
}
// note: 構(gòu)建出BucketState
return new BucketState<>(
bucketId,
bucketPath,
inProgressFileCreationTime,
inProgressFileRecoverable,
pendingFileRecoverablesPerCheckpoint);
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
closePartFile();
}
// note: closePartFile()會(huì)將生成的 pendingFileRecoverable 寫入pendingFileRecoverablesForCurrentCheckpoint
if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
pendingFileRecoverablesPerCheckpoint.put(
checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
}
}
closePartFile() 主要處理工作包含:
- 使用 BulkWriter 將數(shù)據(jù) flush 到外部文件恕沫。
- 基于當(dāng)前 inprogressPart 創(chuàng)建出PendingFileRecoverable對象监憎,其中封裝了 HadoopFsRecoverable 對包含了targetFile(part文件)纱意、tempFile(inprogress文件)婶溯、offset(數(shù)據(jù)當(dāng)前寫入的偏移量)屬性,是最重要的狀態(tài)信息偷霉。在checkpoint完成后迄委,會(huì)將tempFile命名為targetFile。
- 關(guān)閉inprogressPart类少,填充 pendingFileRecoverablesForCurrentCheckpoint信息叙身,代表當(dāng)前CK正在處理的 inprogressPart文件。
private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
if (inProgressPart != null) {
//note: 和inProgressPart文件一一對應(yīng)
pendingFileRecoverable = inProgressPart.closeForCommit();
//note: 存儲(chǔ)到LIST
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
inProgressPart = null;
}
return pendingFileRecoverable;
}
filesystem.BulkPartWriter#closeForCommit
public PendingFileRecoverable closeForCommit() throws IOException {
// note: BulkWriter
writer.flush();
writer.finish();
// note: OutputStreamBasedPartFileWriter#closeForCommit
return super.closeForCommit();
}
OutputStreamBasedPartFileWriter#closeForCommit
public PendingFileRecoverable closeForCommit() throws IOException {
// note: 創(chuàng)建OutputStreamBasedPendingFileRecoverable封裝HadoopRecoverableFsDataOutputStream
return new OutputStreamBasedPendingFileRecoverable(
currentPartStream.closeForCommit().getRecoverable());
}
HadoopRecoverableFsDataOutputStream#closeForCommit
public Committer closeForCommit() throws IOException {
final long pos = getPos();
close();
// note: 構(gòu)建HadoopFsRecoverable硫狞,最終會(huì)調(diào)用commit方法完成文件rename
return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
}
- notifyCheckpointComplete checkpoint完成后回調(diào)該方法完成對inprogress文件的rename信轿,如果該方法執(zhí)行失敗不會(huì)撤銷已生成的checkpoint。
注意:HadoopFsCommitter執(zhí)行commit對文件進(jìn)行重命名時(shí)残吩,并不會(huì)覆蓋已有的part文件财忽,此時(shí)數(shù)據(jù)準(zhǔn)確性沒辦法保障。
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
pendingFileRecoverablesPerCheckpoint
.headMap(checkpointId, true)
.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
entry.getValue()) {
// note: 從pendingFileRecoverable 生成PendingFile 執(zhí)行commit(), 對progress文件重命名泣侮。
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
}
it.remove();
}
}
OutputStreamBasedBucketWriter#recoverPendingFile
public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable)
throws IOException {
final RecoverableWriter.CommitRecoverable commitRecoverable;
if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
commitRecoverable =
((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable)
.getCommitRecoverable();
} else if (pendingFileRecoverable
instanceof OutputStreamBasedInProgressFileRecoverable) {
commitRecoverable =
((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable)
.getResumeRecoverable();
} else {
throw new IllegalArgumentException(
"can not recover from the pendingFileRecoverable");
}
return new OutputStreamBasedPendingFile(
// note: 最終調(diào)用HadoopFsCommitter的commit方法即彪。
recoverableWriter.recoverForCommit(commitRecoverable));
}
HadoopFsCommitter#commit
public void commit() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
final FileStatus srcStatus;
try {
srcStatus = fs.getFileStatus(src);
} catch (IOException e) {
throw new IOException("Cannot clean commit: Staging file does not exist.");
}
if (srcStatus.getLen() != expectedLength) {
// something was done to this file since the committer was created.
// this is not the "clean" case
throw new IOException("Cannot clean commit: File has trailing junk data.");
}
try {
// note: 文件被重命名為最終的part文件。
fs.rename(src, dest);
} catch (IOException e) {
throw new IOException(
"Committing file by rename failed: " + src + " to " + dest, e);
}
}
- initializeState 時(shí)狀態(tài)恢復(fù)主要包含以下幾個(gè)主要流程:
- restoreInProgressFile 恢復(fù)正在處理的inprogress文件活尊。從 inProgressFileRecoverable 獲取inprogress文件名稱及已寫入數(shù)據(jù)的偏移量隶校,重新構(gòu)建BulkWriter漏益。
- commitRecoveredPendingFiles 提交pending狀態(tài)文件。pending狀態(tài)文件數(shù)據(jù)已經(jīng)寫入文件系統(tǒng)深胳,只是還未執(zhí)行最終的commit操作對文件執(zhí)行重命名绰疤,則繼續(xù)執(zhí)行后續(xù)重命名操作。
- updateActiveBucketId 如果activeBuckets 包含 restoredBucket 提交該restoredBucket稠屠,否則存儲(chǔ)到activeBuckets峦睡。
/** Constructor to restore a bucket from checkpointed state. */
private Bucket(
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
@Nullable final FileLifeCycleListener<BucketID> fileListener,
final OutputFileConfig outputFileConfig)
throws IOException {
this(
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
initialPartCounter,
partFileFactory,
rollingPolicy,
fileListener,
outputFileConfig);
restoreInProgressFile(bucketState);
commitRecoveredPendingFiles(bucketState);
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
if (!state.hasInProgressFileRecoverable()) {
return;
}
// we try to resume the previous in-progress file
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
state.getInProgressFileRecoverable();
// note: bucketWriter 是否具有恢復(fù)能力
if (bucketWriter.getProperties().supportsResume()) {
// note: 恢復(fù)inProgressPart
inProgressPart =
bucketWriter.resumeInProgressFileFrom(
bucketId,
inProgressFileRecoverable,
state.getInProgressFileCreationTime());
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
}
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
// we commit pending files for checkpoints that precess the last successful one, from which
// we are recovering
for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables :
state.getPendingFileRecoverablesPerCheckpoint().values()) {
for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable :
pendingFileRecoverables) {
bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
}
}
}
Buckets#updateActiveBucketId
private void updateActiveBucketId(
final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
// note: 當(dāng)前流程沒有 bucketLifeCycleListener,沒有要處理的狀態(tài)信息权埠,直接返回榨了。
if (!restoredBucket.isActive()) {
notifyBucketInactive(restoredBucket);
return;
}
// note: 當(dāng)前 activeBuckets 已經(jīng)包含restoredBucket所屬的Bucket,則將restoredBucket 進(jìn)行提交攘蔽。否則存儲(chǔ)到activeBuckets
final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
if (bucket != null) {
bucket.merge(restoredBucket);
} else {
activeBuckets.put(bucketId, restoredBucket);
}
}
void merge(final Bucket<IN, BucketID> bucket) throws IOException {
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
if (pendingFileRecoverable != null) {
pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
}
}
常見問題
part文件格式
part 文件名稱生成規(guī)則: bucketPath_partPrefix_subtaskIndex_currentPartCounter_partSuffix龙屉。
- bucketPath:bucket路徑,例如:hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19满俗。
- partPrefix:part文件前綴转捕,通過OutputFileConfig設(shè)置,默認(rèn)為part唆垃。
- subtaskIndex: Sink Operator并行寫入時(shí)五芝,某一子任務(wù)當(dāng)前索引,從0開始辕万。
- currentPartCounter:子任務(wù)生成的part文件數(shù)量枢步,從0開始。
例如:part-0-0 代表第一個(gè)task生成的第一個(gè)完整的part文件渐尿。
inprogress 臨時(shí)文件名稱規(guī)則:.part 文件名稱.inprogress.UUID
- inprogress: 正在寫入的臨時(shí)文件的標(biāo)識醉途。
- UUID: 隨機(jī)生成的UUID。
例如: hdfs://localhost:9000/user/hive/warehouse/userinfo/2021-08-09--19/.part-0-0.inprogress.18296793-9fde-4376-b6fc-7c47512bd108 代表 形成part-0-0產(chǎn)生的臨時(shí)文件砖茸。
數(shù)據(jù)準(zhǔn)確性保障
-
非checkpoint/savepoint啟動(dòng)隘擎,當(dāng)文件系統(tǒng)已經(jīng)存在部分part文件,從kafka起始位置重新消費(fèi)數(shù)據(jù)可能會(huì)導(dǎo)致數(shù)據(jù)缺失或者增多凉夯,因?yàn)閞ename操作并不會(huì)覆蓋已有的part文件(應(yīng)該是個(gè)BUG货葬,提了個(gè)jira還沒回復(fù)HadoopFsCommitter, file rename failure)。
假設(shè)存在part-0-0歷史文件劲够,從起始位置消費(fèi)數(shù)據(jù)會(huì)生成新的part-0-0文件震桶,新文件存儲(chǔ)的數(shù)據(jù)條數(shù)無法保證和歷史文件一致,可能多也可能少再沧。
-
notifyCheckpointComplete執(zhí)行時(shí)尼夺,F(xiàn)link程序被Kill,從最新狀態(tài)快照啟動(dòng),不會(huì)丟失數(shù)據(jù)淤堵。
假設(shè).part-0-0.inprogress.xxxx 對應(yīng)的bucket已經(jīng)執(zhí)行完snapshotState方法寝衫,則數(shù)據(jù)已經(jīng)被flush到文件系統(tǒng),在notifyCheckpointComplete階段將完成對該inprogress文件的重命名拐邪,如果此時(shí)程序突然被kill慰毅,該inprocess文件是有數(shù)據(jù)的,只是文件沒有被最終重命名扎阶。此時(shí)汹胃,從CK啟動(dòng),則會(huì)先完成會(huì) inprocess 文件的重命名东臀。