Flink實(shí)戰(zhàn)之Flink必須依賴Hadoop2.7+惹骂?

背景

最近在用filesystem connector來寫hdfs,底層是通過StreamFileSink實(shí)現(xiàn)的。在看官方文檔時(shí)劣砍,有幾條注意事項(xiàng),其中第一條如下:

When using Hadoop < 2.7, please use the OnCheckpointRollingPolicy which rolls part files on every checkpoint. The reason is that if part files “traverse” the checkpoint interval, then, upon recovery from a failure the StreamingFileSink may use the truncate() method of the filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions and Flink will throw an exception.

當(dāng)使用hadoop小于2.7版本時(shí)扇救,請(qǐng)使用OnCheckpointRollingPolicy策略來滾動(dòng)文件刑枝。原因是part file有可能跨越多個(gè)Checkpoint,當(dāng)從失敗恢復(fù)時(shí)迅腔,StreamingFileSink會(huì)使用truncate()方法來丟棄進(jìn)行中文件當(dāng)中未提交的部分装畅。只有2.7+版本才支持truncate方法。

具體什么場景下使用低于2.7的版本會(huì)出問題呢沧烈,于是做了驗(yàn)證掠兄。

驗(yàn)證

SQL任務(wù)

通過編譯不同版本的flink-hadoop-shaded包來測試,具體如何打包锌雀,有時(shí)間再開一片單獨(dú)說明蚂夕。
經(jīng)過測試同一個(gè)sql任務(wù)運(yùn)行在hadoop 2.6和2.7版本,都可以正常從Checkpoint恢復(fù)腋逆。

這就有點(diǎn)奇怪了婿牍,官網(wǎng)不是說會(huì)存在這樣的場景嗎?為什么sql任務(wù)不會(huì)有問題惩歉?具體原因往下面看等脂。

Streaming任務(wù)

寫了一個(gè)demo任務(wù)俏蛮,代碼如下:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(60000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxx:9092");
        properties.setProperty("group.id", "test");
        DataStream<String> src = env
                .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

        //default策略
        src.addSink(StreamingFileSink
                .forRowFormat(
                        new Path("hdfs://xxx/zs_test"),
                        new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(DefaultRollingPolicy.builder().build()).build());

        /*Checkpoint策略
        src.addSink(StreamingFileSink
                .forRowFormat(
                        new Path("hdfs://xxx/zs_test"),
                        new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build()).build());
                */
        env.execute("sink to hdfs");
    }

Rolling Policy 就是用來決定文件什么時(shí)候從臨時(shí)的變成正式文件(in-progress→finished),有Default 和OnCheckpoint兩種慎菲。
同時(shí)StreamingFileSink支持兩種Format嫁蛇,RowFormat和BulkFormat。
先針對(duì)RowFormat在兩種不同策略下露该,對(duì)不同的hadoop版本的情況進(jìn)行了測試睬棚。結(jié)果是OnCheckpoint策略下2.6和2.7版本都可以正常恢復(fù)解幼,Default策略下在2.7版本可以恢復(fù)萨驶,2.6版本恢復(fù)不了。報(bào)錯(cuò)如下:

2020-10-22 16:59:11
java.io.IOException: Problem while truncating file: hdfs://xxxx/zs_test/2020-10-22--16/.part-2-5.inprogress.2848fb32-b428-45ab-8b85-f44f41f56e5d
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:167)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:90)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:83)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriterOutputStreamBasedBucketWriter.resumeInProgressFileFrom(OutputStreamBasedPartFileWriter.java:91) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:134) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:121) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:74) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:427) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambdabeforeInvoke0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutorSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Truncation is not available in hadoop version < 2.7 , You are on Hadoop 2.6.0
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.truncate(HadoopRecoverableFsDataOutputStream.java:197)
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:165)
... 25 more

報(bào)錯(cuò)信息一目了然图甜,需要handoop 2.7+版本莉撇。
至此基本可以得出結(jié)論了。然而還是驗(yàn)證了BulkFormat的場景特铝,結(jié)果發(fā)現(xiàn)只支持OnCheckpoint 策略暑中,結(jié)果也是都可以恢復(fù)。

為什么

為什么只有在RowFormat+Default策略的場景下才會(huì)對(duì)hadoop版本有要求鲫剿,其他場景卻沒有鳄逾,看看源碼吧。

SQL

上面提到SQL任務(wù)時(shí)沒有這個(gè)問題的灵莲。為什么呢雕凹,通過源碼可以找到答案。正好也是對(duì)上一篇FileSystem Connector做一個(gè)補(bǔ)充政冻。
FileSystemTableSink#consumeDataStream

public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
        RowDataPartitionComputer computer = new RowDataPartitionComputer(
                defaultPartName,
                schema.getFieldNames(),
                schema.getFieldDataTypes(),
                partitionKeys.toArray(new String[0]));

        EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
        OutputFileConfig outputFileConfig = OutputFileConfig.builder()
                .withPartPrefix("part-" + UUID.randomUUID().toString())
                .build();
        FileSystemFactory fsFactory = FileSystem::get;
        FileSystemWithUserFactory fsWithUserFactory = FileSystem::getWithUser;

        if (isBounded) {
            FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
            builder.setPartitionComputer(computer);
            builder.setDynamicGrouped(dynamicGrouping);
            builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
            builder.setFormatFactory(createOutputFormatFactory());
            builder.setMetaStoreFactory(metaStoreFactory);
            builder.setFileSystemFactory(fsFactory);
            builder.setOverwrite(overwrite);
            builder.setStaticPartitions(staticPartitions);
            builder.setTempPath(toStagingPath());
            builder.setOutputFileConfig(outputFileConfig);
            return dataStream.writeUsingOutputFormat(builder.build())
                    .setParallelism(dataStream.getParallelism());
        } else {
        //流式任務(wù)都是無界的枚抵,所以走這里
            Configuration conf = new Configuration();
            properties.forEach(conf::setString);
            Object writer = createWriter();//根據(jù)配置的數(shù)據(jù)格式format覺得是哪種Format writer,比如parquet明场、orc都是bucket writer
            TableBucketAssigner assigner = new TableBucketAssigner(computer);
            TableRollingPolicy rollingPolicy = new TableRollingPolicy(// 注意這里TableRollingPolicy汽摹,是繼承了CheckpointRollingPolicy的,所以sql任務(wù)都是用的Checkpoint策略
                    !(writer instanceof Encoder),
                    conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                    conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

            BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
            if (writer instanceof Encoder) {
                //noinspection unchecked
                bucketsBuilder = StreamingFileSink.forRowFormat(
                        path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
                        .withBucketAssigner(assigner)
                        .withOutputFileConfig(outputFileConfig)
                        .withRollingPolicy(rollingPolicy);
            } else {
                //noinspection unchecked
                bucketsBuilder = StreamingFileSink.forBulkFormat(
                        path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
                        .withBucketAssigner(assigner)
                        .withOutputFileConfig(outputFileConfig)
                        .withRollingPolicy(rollingPolicy);
            }
            return createStreamingSink(
                    conf,
                    path,
                    partitionKeys,
                    tableIdentifier,
                    overwrite,
                    dataStream,
                    bucketsBuilder,
                    metaStoreFactory,
                    fsFactory,
                    fsWithUserFactory,
                    conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
        }
    }

如上代碼苦锨,Sql的filesystem connector默認(rèn)使用的是Checkpoint策略竖慧,這個(gè)策略的作用就是在做Checkpoint時(shí)將所有的in-progress文件都rename成正式可讀的文件。也就是說使用這個(gè)策略逆屡,只要Checkpoint成功了那么所有文件都是finished狀態(tài)圾旨,沒有in-progress。
Default策略魏蔗,是在滿足文件大小或時(shí)間間隔時(shí)砍的,在未來某個(gè)Checkpoint時(shí)進(jìn)行rename,所以一個(gè)in-progress文件可能跨越多個(gè)Checkpoint莺治,也就是包括in-progress狀態(tài)的文件

Streaming

Streaming任務(wù)就可以靈活選擇Format和策略了廓鞠,結(jié)論就是上面說的帚稠。

狀態(tài)恢復(fù)

當(dāng)從上一個(gè)成功的Checkpoint恢復(fù)時(shí),會(huì)調(diào)用initializeState

public void initializeState(FunctionInitializationContext context) throws Exception {
        this.helper = new StreamingFileSinkHelper<>(
                bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()),
                context.isRestored(),
                context.getOperatorStateStore(),
                ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(),
                bucketCheckInterval);
    }

初始化StreamingFileSinkHelper會(huì)調(diào)buckets.initializeState

public StreamingFileSinkHelper(
            Buckets<IN, ?> buckets,
            boolean isRestored,
            OperatorStateStore stateStore,
            ProcessingTimeService procTimeService,
            long bucketCheckInterval) throws Exception {
        this.bucketCheckInterval = bucketCheckInterval;
        this.buckets = buckets;
        this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        this.procTimeService = procTimeService;

        if (isRestored) {
            buckets.initializeState(bucketStates, maxPartCountersState);
        }

        long currentProcessingTime = procTimeService.getCurrentProcessingTime();
        procTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
    }

跟進(jìn)去看

private void initializeActiveBuckets(final ListState<byte[]> bucketStates) throws Exception {
        for (byte[] serializedRecoveredState : bucketStates.get()) {
            final BucketState<BucketID> recoveredState =
                    SimpleVersionedSerialization.readVersionAndDeSerialize(
                            bucketStateSerializer, serializedRecoveredState);//反序列化出BucketState這里如果Checkpoint中沒有in-progress的文件床佳,InProgressFileRecoverable就是null滋早,否則不為null,這點(diǎn)很關(guān)鍵
            handleRestoredBucketState(recoveredState);
        }
    }

private void handleRestoredBucketState(final BucketState<BucketID> recoveredState) throws Exception {
        final BucketID bucketId = recoveredState.getBucketId();

        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} restoring: {}", subtaskIndex, recoveredState);
        }

        final Bucket<IN, BucketID> restoredBucket = bucketFactory
                .restoreBucket(
                        subtaskIndex,
                        maxPartCounter,
                        bucketWriter,
                        rollingPolicy,
                        recoveredState,
                        outputFileConfig
                );

        updateActiveBucketId(bucketId, restoredBucket);
    }

public Bucket<IN, BucketID> restoreBucket(
            final int subtaskIndex,
            final long initialPartCounter,
            final BucketWriter<IN, BucketID> bucketWriter,
            final RollingPolicy<IN, BucketID> rollingPolicy,
            final BucketState<BucketID> bucketState,
            final OutputFileConfig outputFileConfig) throws IOException {

        return Bucket.restore(
                subtaskIndex,
                initialPartCounter,
                bucketWriter,
                rollingPolicy,
                bucketState,
                outputFileConfig);
    }

static <IN, BucketID> Bucket<IN, BucketID> restore(
            final int subtaskIndex,
            final long initialPartCounter,
            final BucketWriter<IN, BucketID> bucketWriter,
            final RollingPolicy<IN, BucketID> rollingPolicy,
            final BucketState<BucketID> bucketState,
            final OutputFileConfig outputFileConfig) throws IOException {
        return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
    }
    
private Bucket(
            final int subtaskIndex,
            final long initialPartCounter,
            final BucketWriter<IN, BucketID> partFileFactory,
            final RollingPolicy<IN, BucketID> rollingPolicy,
            final BucketState<BucketID> bucketState,
            final OutputFileConfig outputFileConfig) throws IOException {

        this(
                subtaskIndex,
                bucketState.getBucketId(),
                bucketState.getBucketPath(),
                initialPartCounter,
                partFileFactory,
                rollingPolicy,
                outputFileConfig);

        restoreInProgressFile(bucketState);//恢復(fù)InProgressFile
        commitRecoveredPendingFiles(bucketState);
    }
    
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
        if (!state.hasInProgressFileRecoverable()) {//
            return;
        }

        // we try to resume the previous in-progress file
        final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();

        if (bucketWriter.getProperties().supportsResume()) {
            inProgressPart = bucketWriter.resumeInProgressFileFrom(
                    bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
        } else {
            // if the writer does not support resume, then we close the
            // in-progress part and commit it, as done in the case of pending files.
            bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
        }
    }

關(guān)鍵點(diǎn)在于!state.hasInProgressFileRecoverable()砌们,如果沒有in-progress的文件杆麸,這里就直接return了,反之才會(huì)走到下面浪感。

public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
            final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
            return resumeFrom(
                bucketID,
                recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),//recover
                outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
                creationTime);
        }

看下hadoop recover的實(shí)現(xiàn)

public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
        if (recoverable instanceof HadoopFsRecoverable) {
            return new HadoopRecoverableFsDataOutputStream(fs, (HadoopFsRecoverable) recoverable);
        }
        else {
            throw new IllegalArgumentException(
                    "Hadoop File System cannot recover a recoverable for another file system: " + recoverable);
        }
    }

HadoopRecoverableFsDataOutputStream(
            FileSystem fs,
            HadoopFsRecoverable recoverable) throws IOException {

        ensureTruncateInitialized();

        this.fs = checkNotNull(fs);
        this.targetFile = checkNotNull(recoverable.targetFile());
        this.tempFile = checkNotNull(recoverable.tempFile());

        safelyTruncateFile(fs, tempFile, recoverable);

        out = fs.append(tempFile);

        // sanity check
        long pos = out.getPos();
        if (pos != recoverable.offset()) {
            IOUtils.closeQuietly(out);
            throw new IOException("Truncate failed: " + tempFile +
                    " (requested=" + recoverable.offset() + " ,size=" + pos + ')');
        }
    }

private static void safelyTruncateFile(
            final FileSystem fileSystem,
            final Path path,
            final HadoopFsRecoverable recoverable) throws IOException {

        ensureTruncateInitialized();

        waitUntilLeaseIsRevoked(fileSystem, path);

        // truncate back and append
        boolean truncated;
        try {
            truncated = private static void safelyTruncateFile(
            final FileSystem fileSystem,
            final Path path,
            final HadoopFsRecoverable recoverable) throws IOException {

        ensureTruncateInitialized();

        waitUntilLeaseIsRevoked(fileSystem, path);

        // truncate back and append
        boolean truncated;
        try {
            truncated = truncate(fileSystem, path, recoverable.offset());
        } catch (Exception e) {
            throw new IOException("Problem while truncating file: " + path, e);
        }

        if (!truncated) {
            // Truncate did not complete immediately, we must wait for
            // the operation to complete and release the lease.
            waitUntilLeaseIsRevoked(fileSystem, path);
        }
    }(fileSystem, path, recoverable.offset());
        } catch (Exception e) {
            throw new IOException("Problem while truncating file: " + path, e);
        }

        if (!truncated) {
            // Truncate did not complete immediately, we must wait for
            // the operation to complete and release the lease.
            waitUntilLeaseIsRevoked(fileSystem, path);
        }
    }

對(duì)hadoop版本的判斷在truncate方法里

private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
        if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
            throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion());
        }

        if (truncateHandle != null) {
            try {
                return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
            }
            catch (InvocationTargetException e) {
                ExceptionUtils.rethrowIOException(e.getTargetException());
            }
            catch (Throwable t) {
                throw new IOException(
                        "Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
                                "This is most likely a dependency conflict or class loading problem.");
            }
        }
        else {
            throw new IllegalStateException("Truncation handle has not been initialized");
        }
        return false;
    }

如果hadoop版本<2.7就會(huì)拋出異常昔头,和上面的異常信息吻合。到此算是明白了影兽。

結(jié)論

只有在BulkFormat+Default策略下才會(huì)有hadoop版本的要求揭斧。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市峻堰,隨后出現(xiàn)的幾起案子讹开,更是在濱河造成了極大的恐慌,老刑警劉巖捐名,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旦万,死亡現(xiàn)場離奇詭異,居然都是意外死亡桐筏,警方通過查閱死者的電腦和手機(jī)纸型,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門拇砰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來梅忌,“玉大人,你說我怎么就攤上這事除破∧恋” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵瑰枫,是天一觀的道長踱葛。 經(jīng)常有香客問我,道長光坝,這世上最難降的妖魔是什么尸诽? 我笑而不...
    開封第一講書人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮盯另,結(jié)果婚禮上性含,老公的妹妹穿的比我還像新娘。我一直安慰自己鸳惯,他們只是感情好商蕴,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開白布叠萍。 她就那樣靜靜地躺著,像睡著了一般绪商。 火紅的嫁衣襯著肌膚如雪苛谷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,772評(píng)論 1 290
  • 那天格郁,我揣著相機(jī)與錄音腹殿,去河邊找鬼。 笑死理张,一個(gè)胖子當(dāng)著我的面吹牛赫蛇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播雾叭,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼悟耘,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了织狐?” 一聲冷哼從身側(cè)響起暂幼,我...
    開封第一講書人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎移迫,沒想到半個(gè)月后旺嬉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡厨埋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年邪媳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片荡陷。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡雨效,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出废赞,到底是詐尸還是另有隱情徽龟,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布唉地,位于F島的核電站据悔,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏耘沼。R本人自食惡果不足惜极颓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望群嗤。 院中可真熱鬧菠隆,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至既峡,卻和暖如春羡榴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背运敢。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來泰國打工校仑, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人传惠。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓迄沫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親卦方。 傳聞我的和親對(duì)象是個(gè)殘疾皇子羊瘩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 基礎(chǔ)概念考察 一、 簡單介紹一下 Flink Flink 是一個(gè)框架和分布式處理引擎盼砍,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有...
    Tim在路上閱讀 804評(píng)論 0 9
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年尘吗,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    Yobhel閱讀 1,840評(píng)論 0 33
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無閱讀 3,241評(píng)論 2 11
  • 一浇坐、Flink簡介 1.1 Flink是什么 Apache Flink是一個(gè)開源的分布式睬捶,高性能,高可用近刘,準(zhǔn)確的流...
    這一刻_776b閱讀 9,508評(píng)論 0 0
  • 漸變的面目拼圖要我怎么拼? 我是疲乏了還是投降了疆拘? 不是不允許自己墜落蜕猫, 我沒有滴水不進(jìn)的保護(hù)膜寂曹。 就是害怕變得面...
    悶熱當(dāng)乘涼閱讀 4,237評(píng)論 0 13