背景
最近在用filesystem connector來寫hdfs,底層是通過StreamFileSink實(shí)現(xiàn)的。在看官方文檔時(shí)劣砍,有幾條注意事項(xiàng),其中第一條如下:
When using Hadoop < 2.7, please use the OnCheckpointRollingPolicy which rolls part files on every checkpoint. The reason is that if part files “traverse” the checkpoint interval, then, upon recovery from a failure the StreamingFileSink may use the truncate() method of the filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions and Flink will throw an exception.
當(dāng)使用hadoop小于2.7版本時(shí)扇救,請(qǐng)使用OnCheckpointRollingPolicy策略來滾動(dòng)文件刑枝。原因是part file有可能跨越多個(gè)Checkpoint,當(dāng)從失敗恢復(fù)時(shí)迅腔,StreamingFileSink
會(huì)使用truncate()
方法來丟棄進(jìn)行中文件當(dāng)中未提交的部分装畅。只有2.7+版本才支持truncate
方法。
具體什么場景下使用低于2.7的版本會(huì)出問題呢沧烈,于是做了驗(yàn)證掠兄。
驗(yàn)證
SQL任務(wù)
通過編譯不同版本的flink-hadoop-shaded包來測試,具體如何打包锌雀,有時(shí)間再開一片單獨(dú)說明蚂夕。
經(jīng)過測試同一個(gè)sql任務(wù)運(yùn)行在hadoop 2.6和2.7版本,都可以正常從Checkpoint恢復(fù)腋逆。
這就有點(diǎn)奇怪了婿牍,官網(wǎng)不是說會(huì)存在這樣的場景嗎?為什么sql任務(wù)不會(huì)有問題惩歉?具體原因往下面看等脂。
Streaming任務(wù)
寫了一個(gè)demo任務(wù)俏蛮,代碼如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("group.id", "test");
DataStream<String> src = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
//default策略
src.addSink(StreamingFileSink
.forRowFormat(
new Path("hdfs://xxx/zs_test"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder().build()).build());
/*Checkpoint策略
src.addSink(StreamingFileSink
.forRowFormat(
new Path("hdfs://xxx/zs_test"),
new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(OnCheckpointRollingPolicy.build()).build());
*/
env.execute("sink to hdfs");
}
Rolling Policy 就是用來決定文件什么時(shí)候從臨時(shí)的變成正式文件(in-progress→finished),有Default 和OnCheckpoint兩種慎菲。
同時(shí)StreamingFileSink支持兩種Format嫁蛇,RowFormat和BulkFormat。
先針對(duì)RowFormat在兩種不同策略下露该,對(duì)不同的hadoop版本的情況進(jìn)行了測試睬棚。結(jié)果是OnCheckpoint策略下2.6和2.7版本都可以正常恢復(fù)解幼,Default策略下在2.7版本可以恢復(fù)萨驶,2.6版本恢復(fù)不了。報(bào)錯(cuò)如下:
2020-10-22 16:59:11
java.io.IOException: Problem while truncating file: hdfs://xxxx/zs_test/2020-10-22--16/.part-2-5.inprogress.2848fb32-b428-45ab-8b85-f44f41f56e5d
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:167)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:90)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriterbeforeInvokeSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Truncation is not available in hadoop version < 2.7 , You are on Hadoop 2.6.0
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:197)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:165)
... 25 more
報(bào)錯(cuò)信息一目了然图甜,需要handoop 2.7+版本莉撇。
至此基本可以得出結(jié)論了。然而還是驗(yàn)證了BulkFormat的場景特铝,結(jié)果發(fā)現(xiàn)只支持OnCheckpoint 策略暑中,結(jié)果也是都可以恢復(fù)。
為什么
為什么只有在RowFormat+Default策略的場景下才會(huì)對(duì)hadoop版本有要求鲫剿,其他場景卻沒有鳄逾,看看源碼吧。
SQL
上面提到SQL任務(wù)時(shí)沒有這個(gè)問題的灵莲。為什么呢雕凹,通過源碼可以找到答案。正好也是對(duì)上一篇FileSystem Connector做一個(gè)補(bǔ)充政冻。
FileSystemTableSink#consumeDataStream
public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
RowDataPartitionComputer computer = new RowDataPartitionComputer(
defaultPartName,
schema.getFieldNames(),
schema.getFieldDataTypes(),
partitionKeys.toArray(new String[0]));
EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.build();
FileSystemFactory fsFactory = FileSystem::get;
FileSystemWithUserFactory fsWithUserFactory = FileSystem::getWithUser;
if (isBounded) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(computer);
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
builder.setFormatFactory(createOutputFormatFactory());
builder.setMetaStoreFactory(metaStoreFactory);
builder.setFileSystemFactory(fsFactory);
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitions);
builder.setTempPath(toStagingPath());
builder.setOutputFileConfig(outputFileConfig);
return dataStream.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
//流式任務(wù)都是無界的枚抵,所以走這里
Configuration conf = new Configuration();
properties.forEach(conf::setString);
Object writer = createWriter();//根據(jù)配置的數(shù)據(jù)格式format覺得是哪種Format writer,比如parquet明场、orc都是bucket writer
TableBucketAssigner assigner = new TableBucketAssigner(computer);
TableRollingPolicy rollingPolicy = new TableRollingPolicy(// 注意這里TableRollingPolicy汽摹,是繼承了CheckpointRollingPolicy的,所以sql任務(wù)都是用的Checkpoint策略
!(writer instanceof Encoder),
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
if (writer instanceof Encoder) {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
conf,
path,
partitionKeys,
tableIdentifier,
overwrite,
dataStream,
bucketsBuilder,
metaStoreFactory,
fsFactory,
fsWithUserFactory,
conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
}
}
如上代碼苦锨,Sql的filesystem connector默認(rèn)使用的是Checkpoint策略竖慧,這個(gè)策略的作用就是在做Checkpoint時(shí)將所有的in-progress文件都rename成正式可讀的文件。也就是說使用這個(gè)策略逆屡,只要Checkpoint成功了那么所有文件都是finished狀態(tài)圾旨,沒有in-progress。
Default策略魏蔗,是在滿足文件大小或時(shí)間間隔時(shí)砍的,在未來某個(gè)Checkpoint時(shí)進(jìn)行rename,所以一個(gè)in-progress文件可能跨越多個(gè)Checkpoint莺治,也就是包括in-progress狀態(tài)的文件
Streaming
Streaming任務(wù)就可以靈活選擇Format和策略了廓鞠,結(jié)論就是上面說的帚稠。
狀態(tài)恢復(fù)
當(dāng)從上一個(gè)成功的Checkpoint恢復(fù)時(shí),會(huì)調(diào)用initializeState
public void initializeState(FunctionInitializationContext context) throws Exception {
this.helper = new StreamingFileSinkHelper<>(
bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()),
context.isRestored(),
context.getOperatorStateStore(),
((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(),
bucketCheckInterval);
}
初始化StreamingFileSinkHelper會(huì)調(diào)buckets.initializeState
public StreamingFileSinkHelper(
Buckets<IN, ?> buckets,
boolean isRestored,
OperatorStateStore stateStore,
ProcessingTimeService procTimeService,
long bucketCheckInterval) throws Exception {
this.bucketCheckInterval = bucketCheckInterval;
this.buckets = buckets;
this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
this.procTimeService = procTimeService;
if (isRestored) {
buckets.initializeState(bucketStates, maxPartCountersState);
}
long currentProcessingTime = procTimeService.getCurrentProcessingTime();
procTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
}
跟進(jìn)去看
private void initializeActiveBuckets(final ListState<byte[]> bucketStates) throws Exception {
for (byte[] serializedRecoveredState : bucketStates.get()) {
final BucketState<BucketID> recoveredState =
SimpleVersionedSerialization.readVersionAndDeSerialize(
bucketStateSerializer, serializedRecoveredState);//反序列化出BucketState這里如果Checkpoint中沒有in-progress的文件床佳,InProgressFileRecoverable就是null滋早,否則不為null,這點(diǎn)很關(guān)鍵
handleRestoredBucketState(recoveredState);
}
}
private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
final BucketID bucketId = recoveredState.getBucketId();
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
}
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
subtaskIndex,
maxPartCounter,
bucketWriter,
rollingPolicy,
recoveredState,
outputFileConfig
);
updateActiveBucketId(bucketId, restoredBucket);
}
public Bucket<IN, BucketID> restoreBucket(
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.restore(
subtaskIndex,
initialPartCounter,
bucketWriter,
rollingPolicy,
bucketState,
outputFileConfig);
}
static <IN, BucketID> Bucket<IN, BucketID> restore(
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
}
private Bucket(
final int subtaskIndex,
final long initialPartCounter,
final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
this(
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
initialPartCounter,
partFileFactory,
rollingPolicy,
outputFileConfig);
restoreInProgressFile(bucketState);//恢復(fù)InProgressFile
commitRecoveredPendingFiles(bucketState);
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
if (!state.hasInProgressFileRecoverable()) {//
return;
}
// we try to resume the previous in-progress file
final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
if (bucketWriter.getProperties().supportsResume()) {
inProgressPart = bucketWriter.resumeInProgressFileFrom(
bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
}
}
關(guān)鍵點(diǎn)在于!state.hasInProgressFileRecoverable()砌们,如果沒有in-progress的文件杆麸,這里就直接return了,反之才會(huì)走到下面浪感。
public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
return resumeFrom(
bucketID,
recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),//recover
outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
creationTime);
}
看下hadoop recover的實(shí)現(xiàn)
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
if (recoverable instanceof HadoopFsRecoverable) {
return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
}
else {
throw new IllegalArgumentException(
"Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
}
}
HadoopRecoverableFsDataOutputStream(
FileSystem fs,
HadoopFsRecoverable recoverable) throws IOException {
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
safelyTruncateFile(fs, tempFile, recoverable);
out = fs.append(tempFile);
// sanity check
long pos = out.getPos();
if (pos != recoverable.offset()) {
IOUtils.closeQuietly(out);
throw new IOException("Truncate failed: " + tempFile +
" (requested=" + recoverable.offset() + " ,size=" + pos + ')');
}
}
private static void safelyTruncateFile(
final FileSystem fileSystem,
final Path path,
final HadoopFsRecoverable recoverable) throws IOException {
ensureTruncateInitialized();
waitUntilLeaseIsRevoked(fileSystem, path);
// truncate back and append
boolean truncated;
try {
truncated = private static void safelyTruncateFile(
final FileSystem fileSystem,
final Path path,
final HadoopFsRecoverable recoverable) throws IOException {
ensureTruncateInitialized();
waitUntilLeaseIsRevoked(fileSystem, path);
// truncate back and append
boolean truncated;
try {
truncated = truncate(fileSystem, path, recoverable.offset());
} catch (Exception e) {
throw new IOException("Problem while truncating file: " + path, e);
}
if (!truncated) {
// Truncate did not complete immediately, we must wait for
// the operation to complete and release the lease.
waitUntilLeaseIsRevoked(fileSystem, path);
}
}(fileSystem, path, recoverable.offset());
} catch (Exception e) {
throw new IOException("Problem while truncating file: " + path, e);
}
if (!truncated) {
// Truncate did not complete immediately, we must wait for
// the operation to complete and release the lease.
waitUntilLeaseIsRevoked(fileSystem, path);
}
}
對(duì)hadoop版本的判斷在truncate方法里
private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion());
}
if (truncateHandle != null) {
try {
return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
}
catch (InvocationTargetException e) {
ExceptionUtils.rethrowIOException(e.getTargetException());
}
catch (Throwable t) {
throw new IOException(
"Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
"This is most likely a dependency conflict or class loading problem.");
}
}
else {
throw new IllegalStateException("Truncation handle has not been initialized");
}
return false;
}
如果hadoop版本<2.7就會(huì)拋出異常昔头,和上面的異常信息吻合。到此算是明白了影兽。
結(jié)論
只有在BulkFormat+Default策略下才會(huì)有hadoop版本的要求揭斧。