Flink源碼分析之深度解讀流式數(shù)據(jù)寫入hive

前言

前段時間我們講解了flink1.11中如何將流式數(shù)據(jù)寫入文件系統(tǒng)和hive [flink 1.11 使用sql將流式數(shù)據(jù)寫入hive]衰猛,今天我們來從源碼的角度深入分析一下霞揉。以便朋友們對flink流式數(shù)據(jù)寫入hive有一個深入的了解,以及在出現(xiàn)問題的時候知道該怎么調(diào)試童番。

其實我們可以想一下這個工作大概是什么流程芬失,首先要寫入hive故河,我們首先要從hive的元數(shù)據(jù)里拿到相關(guān)的hive表的信息巍虫,比如存儲的路徑是哪里彭则,以便往那個目錄寫數(shù)據(jù),還有存儲的格式是什么占遥,orc還是parquet俯抖,這樣我們需要調(diào)用對應(yīng)的實現(xiàn)類來進行寫入,其次這個表是否是分區(qū)表瓦胎,寫入數(shù)據(jù)是動態(tài)分區(qū)還是靜態(tài)分區(qū)芬萍,這些都會根據(jù)場景的不同而選擇不同的寫入策略。

寫入數(shù)據(jù)的時候肯定不會把所有數(shù)據(jù)寫入一個文件搔啊,那么文件的滾動策略是什么呢担忧?寫完了數(shù)據(jù)我們?nèi)绾胃耯ive的元數(shù)據(jù)信息,以便我們可以及時讀取到相應(yīng)的數(shù)據(jù)呢坯癣?

我畫了一個簡單的流程圖,大家可以先看下最欠,接下來我們帶著這些疑問示罗,一步步的從源碼里探索這些功能是如何實現(xiàn)的。

寫入hive流程圖

數(shù)據(jù)流處理

我們這次主要是分析flink如何將類似kafka的流式數(shù)據(jù)寫入到hive表芝硬,我們先來一段簡單的代碼:

        //構(gòu)造hive catalog
        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
        String version = "3.1.2";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase("db1");

        tEnv.createTemporaryView("kafka_source_table", dataStream);

        String insertSql = "insert into  hive.db1.fs_table SELECT userId, amount, " +
                           " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";
        tEnv.executeSql(insertSql);

系統(tǒng)在啟動的時候會首先解析sql蚜点,獲取相應(yīng)的屬性,然后會通過java的SPI機制加載TableFactory的所有子類拌阴,包含TableSourceFactory和TableSinkFactory绍绘,之后,會根據(jù)從sql中解析的屬性循環(huán)判斷使用哪個工廠類迟赃,具體的操作是在TableFactoryUtil類的方法里面實現(xiàn)的陪拘。

比如對于上面的sql,解析之后纤壁,發(fā)現(xiàn)是要寫入一個表名為hive.db1.fs_table的hive sink左刽。所以系統(tǒng)在調(diào)用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子類HiveTableFactory酌媒,然后調(diào)用相應(yīng)的createTableSink方法來創(chuàng)建相應(yīng)的sink欠痴,也就是HiveTableSink迄靠。

我們來簡單看下HiveTableSink的變量和結(jié)構(gòu)。

/**
 * Table sink to write to Hive tables.
 */
public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {

    private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);

    private final boolean userMrWriter;
    //是否有界喇辽,用來區(qū)分是批處理還是流處理
    private final boolean isBounded;
    private final JobConf jobConf;
    private final CatalogTable catalogTable;
    private final ObjectIdentifier identifier;
    private final TableSchema tableSchema;
    private final String hiveVersion;
    private final HiveShim hiveShim;

    private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();

    private boolean overwrite = false;
    private boolean dynamicGrouping = false;

我們看到它實現(xiàn)了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口掌挚,這三個接口決定了hive sink實現(xiàn)的功能,數(shù)據(jù)只能是append模式的菩咨,數(shù)據(jù)是可分區(qū)的吠式、并且數(shù)據(jù)是可以被覆蓋寫的。

類里面的這些變量旦委,看名字就大概知道是什么意思了奇徒,就不做解釋了,講一下HiveShim缨硝,我們在構(gòu)造方法里看到hiveShim是和hive 的版本有關(guān)的摩钙,所以其實這個類我們可以理解為對不同hive版本操作的一層封裝。

hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);

tablesink處理數(shù)據(jù)流的方法是consumeDataStream查辩,我們來重點分析下胖笛。

hive基本信息獲取

首先會通過hive的配置連接到hive的元數(shù)據(jù)庫,得到hive表的基本信息宜岛。

        String[] partitionColumns = getPartitionKeys().toArray(new String[0]);
        String dbName = identifier.getDatabaseName();
        String tableName = identifier.getObjectName();
        try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(
                new HiveConf(jobConf, HiveConf.class), hiveVersion)) {
            Table table = client.getTable(dbName, tableName);
            StorageDescriptor sd = table.getSd();
  • 獲取到hive的表的信息长踊,也就是Table對象。
  • 獲取表的一些存儲信息萍倡,StorageDescriptor對象身弊,這里面包含了hive表的存儲路徑、存儲格式等等列敲。

流阱佛、批判斷

接下來判斷寫入hive是批處理還是流處理

if (isBounded){
   ......   
   //batch 
    
} else {
   ......
   //streaming
    
}

由于這次我們主要分析flink的流處理,所以對于batch就暫且跳過戴而,進入else凑术,也就是流處理。

在這里所意,定義了一些基本的配置:

  • 桶分配器TableBucketAssigner淮逊,簡單來說就是如何確定數(shù)據(jù)的分區(qū),比如按時間扶踊,還是按照字段的值等等泄鹏。
  • 滾動策略,如何生成下一個文件秧耗,按照時間命满,還是文件的大小等等。
  • 構(gòu)造bulkFactory绣版,目前只有parquet和orc的列存儲格式使用bulkFactory
    //桶分配器
    TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
    
    //滾動策略
    TableRollingPolicy rollingPolicy = new TableRollingPolicy(
                        true,
                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                        conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

    //構(gòu)造bulkFactory
    Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);

createBulkWriterFactory方法主要是用于構(gòu)造寫入列存儲格式的工廠類胶台,目前只支持parquet和orc格式歼疮,首先定義用于構(gòu)造工廠類的一些參數(shù),比如字段的類型诈唬,名稱等等韩脏,之后根據(jù)不同類型構(gòu)造不同的工廠類。如果是parquet格式铸磅,最終構(gòu)造的是ParquetWriterFactory工廠類赡矢,如果是orc格式,根據(jù)hive的版本不同阅仔,分別構(gòu)造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory吹散。

寫入格式判斷

如果是使用MR的writer或者是行格式,進入if邏輯八酒,使用HadoopPathBasedBulkFormatBuilder空民,如果是列存儲格式,進入else邏輯羞迷,使用StreamingFileSink來寫入數(shù)據(jù).

                if (userMrWriter || !bulkFactory.isPresent()) {
                    HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
                    builder = new HadoopPathBasedBulkFormatBuilder<>(
                            new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
                            .withRollingPolicy(rollingPolicy)
                            .withOutputFileConfig(outputFileConfig);
                    LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
                } else {
                    builder = StreamingFileSink.forBulkFormat(
                            new org.apache.flink.core.fs.Path(sd.getLocation()),
                            new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
                            .withBucketAssigner(assigner)
                            .withRollingPolicy(rollingPolicy)
                            .withOutputFileConfig(outputFileConfig);
                    LOG.info("Hive streaming sink: Use native parquet&orc writer.");
                }

在大數(shù)據(jù)處理中界轩,列式存儲比行存儲有著更好的查詢效率,所以我們這次以列式存儲為主衔瓮,聊聊StreamingFileSink是如何寫入列式數(shù)據(jù)的浊猾。通過代碼我們看到在構(gòu)造buckets builder的時候,使用了前面剛生成的bucket assigner热鞍、輸出的配置葫慎、以及文件滾動的策略。

構(gòu)造分區(qū)提交算子

在HiveTableSink#consumeDataStream方法的最后薇宠,進入了FileSystemTableSink#createStreamingSink方法偷办,這個方法主要做了兩件事情,一個是創(chuàng)建了用于流寫入的算子StreamingFileWriter昼接,另一個是當(dāng)存在分區(qū)列并且在配置文件配置了分區(qū)文件提交策略的時候,構(gòu)造了一個用于提交分區(qū)文件的算子StreamingFileCommitter悴晰,這個算子固定的只有一個并發(fā)度慢睡。

        StreamingFileWriter fileWriter = new StreamingFileWriter(
                rollingCheckInterval,
                bucketsBuilder);
        DataStream<CommitMessage> writerStream = inputStream.transform(
                StreamingFileWriter.class.getSimpleName(),
                TypeExtractor.createTypeInfo(CommitMessage.class),
                fileWriter).setParallelism(inputStream.getParallelism());

        DataStream<?> returnStream = writerStream;

        // save committer when we don't need it.
        if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
            StreamingFileCommitter committer = new StreamingFileCommitter(
                    path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
            returnStream = writerStream
                    .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
                    .setParallelism(1)
                    .setMaxParallelism(1);
        }

我們看到在代碼中,inputStream經(jīng)過transform方法铡溪,最終將要提交的數(shù)據(jù)轉(zhuǎn)換成CommitMessage格式漂辐,然后發(fā)送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數(shù)據(jù)棕硫。

詳解StreamingFileWriter

這個StreamingFileWriter我們可以理解為一個算子級別的寫入文件的sink髓涯,它對StreamingFileSink進行了一些包裝,然后添加了一些其他操作哈扮,比如提交分區(qū)信息等等纬纪。我們簡單看下這個類的結(jié)構(gòu),并簡單聊聊各個方法的作用蚓再。

public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
        implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{
        
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        ......................... 
    }       

        @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
      ......................... 
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
      ......................... 
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
      ......................... 
    }

    /**
     * Commit up to this checkpoint id, also send inactive partitions to downstream for committing.
     */
    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
          ......................... 
    }
        
        @Override
    public void endInput() throws Exception {
          ......................... 
    }

    @Override
    public void dispose() throws Exception {
      ......................... 
    }       
            

           
}
        
  • initializeState :初始化狀態(tài)的方法,在這里構(gòu)造了要寫入文件的buckets包各,以及具體寫入文件的StreamingFileSinkHelper等等摘仅。
  • snapshotState:這個方法主要是進行每次checkpoint的時候調(diào)用。
  • processWatermark這個方法通過名字就能看出來问畅,是處理水印的娃属,比如往下游發(fā)送水印等等。
  • processElement:處理元素最核心的方法护姆,每來一條數(shù)據(jù)矾端,都會進入這個方法進行處理。
  • notifyCheckpointComplete卵皂,每次checkpoint完成的時候調(diào)用該方法秩铆。在這里,收集了一些要提交的分區(qū)的信息渐裂,用于分區(qū)提交豺旬。
  • endInput:不再有更多的數(shù)據(jù)進來,也就是輸入結(jié)束的時候調(diào)用柒凉。
  • dispose:算子的生命周期結(jié)束的時候調(diào)用族阅。

簡述StreamingFileSink

StreamingFileSink我們來簡單的描述下,通過名字我們就能看出來膝捞,這是一個用于將流式數(shù)據(jù)寫入文件系統(tǒng)的sink坦刀,它集成了checkpoint提供exactly once語義。

在StreamingFileSink里有一個bucket的概念蔬咬,我們可以理解為數(shù)據(jù)寫入的目錄鲤遥,每個bucket下可以寫入多個文件。它提供了一個BucketAssigner的概念用于生成bucket林艘,進來的每一個數(shù)據(jù)在寫入的時候都會判斷下要寫入哪個bucket盖奈,默認(rèn)的實現(xiàn)是DateTimeBucketAssigner,每小時生成一個bucket狐援。

它根據(jù)不同的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應(yīng)的處理钢坦。

此外,該sink還提供了一個RollingPolicy用于決定數(shù)據(jù)的滾動策略啥酱,比如文件到達多大或者經(jīng)過多久就關(guān)閉當(dāng)前文件爹凹,開啟下一個新文件。

具體的寫入ORC格式的數(shù)據(jù)镶殷,可以參考下這個文章:
flink 1.11 流式數(shù)據(jù)ORC格式寫入file 禾酱,由于我們這次主要是講整體寫入hive的流程,這個sink就不做太具體的講解了。

分區(qū)信息提交

StreamingFileWriter#notifyCheckpointComplete 調(diào)用commitUpToCheckpoint在checkpoint完成的時候觸發(fā)了分區(qū)的提交操作颤陶。

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        helper.commitUpToCheckpoint(checkpointId);
        CommitMessage message = new CommitMessage(
                checkpointId,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks(),
                new ArrayList<>(inactivePartitions));
        output.collect(new StreamRecord<>(message));
        inactivePartitions.clear();
    }

在這里颗管,我們看到,使用inactivePartitions構(gòu)造了CommitMessage對象指郁,然后使用output.collect將這個提交數(shù)據(jù)收集起來忙上,也就是上文我們提到的這里收集到的這個數(shù)據(jù)將會發(fā)給StreamingFileCommitter算子來處理。

而inactivePartitions里面的數(shù)據(jù)是什么時候添加進來的呢闲坎,也就是什么時候才會生成要提交的分區(qū)呢疫粥?我們跟蹤一下代碼,發(fā)現(xiàn)是給寫入文件的buckets添加了一個監(jiān)聽器,在bucket成為非活躍狀態(tài)之后腰懂,觸發(fā)監(jiān)聽器梗逮,然后將對應(yīng)的bucket id 添加到inactivePartitions集合。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        ..........................
        buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {

            @Override
            public void bucketCreated(Bucket<RowData, String> bucket) {
            }

            @Override
            public void bucketInactive(Bucket<RowData, String> bucket) {
                inactivePartitions.add(bucket.getBucketId());
            }
        });
    }

而通知bucket變?yōu)榉腔顒訝顟B(tài)又是什么情況會觸發(fā)呢绣溜?從代碼注釋我們看到慷彤,到目前為止該bucket已接收的所有記錄都已提交后,則該bucket將變?yōu)榉腔顒訝顟B(tài)怖喻。

提交分區(qū)算子

這是一個單并行度的算子底哗,用于提交寫入文件系統(tǒng)的分區(qū)信息。具體的處理步驟如下:

  • 從上游收集要提交的分區(qū)信息
  • 判斷某一個checkpoint下锚沸,所有的子任務(wù)是否都已經(jīng)接收了分區(qū)的數(shù)據(jù)
  • 獲取分區(qū)提交觸發(fā)器跋选。(目前支持partition-time和process-time)
  • 使用分區(qū)提交策略去依次提交分區(qū)信息(可以配置多個分區(qū)策略)

這里我們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每個提交數(shù)據(jù)進行處理的。

    @Override
    public void processElement(StreamRecord<CommitMessage> element) throws Exception {
        CommitMessage message = element.getValue();
        for (String partition : message.partitions) {
            trigger.addPartition(partition);
        }

        if (taskTracker == null) {
            taskTracker = new TaskTracker(message.numberOfTasks);
        }
        boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);
        if (needCommit) {
            commitPartitions(message.checkpointId);
        }
    }
    

我們看到哗蜈,從上游接收到CommitMessage元素前标,然后從里面得到要提交的分區(qū),添加到PartitionCommitTrigger里(變量trigger)距潘,然后通過taskTracker來判斷一下炼列,該checkpoint每個子任務(wù)是否已經(jīng)接收到了分區(qū)數(shù)據(jù),最后通過commitPartitions方法來提交分區(qū)信息音比。

進入commitPartitions方法俭尖,看看是如何提交分區(qū)的。

    private void commitPartitions(long checkpointId) throws Exception {
        List<String> partitions = checkpointId == Long.MAX_VALUE ?
                trigger.endInput() :
                trigger.committablePartitions(checkpointId);
        if (partitions.isEmpty()) {
            return;
        }

        try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
            for (String partition : partitions) {
                LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
                LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
                Path path = new Path(locationPath, generatePartitionPath(partSpec));
                PartitionCommitPolicy.Context context = new PolicyContext(
                        new ArrayList<>(partSpec.values()), path);
                for (PartitionCommitPolicy policy : policies) {
                    if (policy instanceof MetastoreCommitPolicy) {
                        ((MetastoreCommitPolicy) policy).setMetastore(metaStore);
                    }
                    policy.commit(context);
                }
            }
        }
    }

從trigger中獲取該checkpoint下的所有要提交的分區(qū)洞翩,放到一個List集合partitions中稽犁,在提交的分區(qū)不為空的情況下,循環(huán)遍歷要配置的分區(qū)提交策略PartitionCommitPolicy菱农,然后提交分區(qū)缭付。

分區(qū)提交觸發(fā)器

目前系統(tǒng)提供了兩種分區(qū)提交的觸發(fā)器柿估,PartitionTimeCommitTigger和ProcTimeCommitTigger循未,分別用于處理什么時候提交分區(qū)。

  • ProcTimeCommitTigger 主要依賴于分區(qū)的創(chuàng)建時間和delay,當(dāng)處理時間大于'partition creation time' + 'delay'的時候的妖,將提交這個分區(qū)
  • PartitionTimeCommitTigger 依賴于水印绣檬,當(dāng)水印的值大于 partition-time + delay的時候提交這個分區(qū)。

分區(qū)提交策略

目前系統(tǒng)提供了一個接口PartitionCommitPolicy嫂粟,用于提交分區(qū)的信息娇未,目前系統(tǒng)提供了以下幾種方案,

  • 一種是METASTORE星虹,主要是用于提交hive的分區(qū)零抬,比如創(chuàng)建hive分區(qū)等等
  • 還有一種是SUCCESS_FILE,也就是往對應(yīng)的分區(qū)目錄下寫一個success文件宽涌。
  • 此外平夜,系統(tǒng)還提供了一個對外的自定義實現(xiàn),用于用戶自定義分區(qū)提交卸亮,比如提交分區(qū)之后合并小文件等等忽妒。自定義提交策略的時候,需要實現(xiàn)PartitionCommitPolicy接口兼贸,并將提交策略置為custom段直。

我在網(wǎng)上也看到過一些實現(xiàn)該接口用于合并小文件的示例,但是我個人覺得其實有點不太完美溶诞,因為這個合并小文件可能會涉及很多的問題:

  • 合并的時候如何保證事務(wù)鸯檬,保證合并的同時如何有讀操作不會發(fā)生臟讀
  • 事務(wù)的一致性,如果合并出錯了怎么回滾
  • 合并小文件的性能是否跟得上很澄,目前flink只提供了一個單并行度的提交算子京闰。
  • 如何多并發(fā)合并寫入

所以暫時我也沒有想到一個完美的方案用于flink來合并小文件。

總結(jié)

通過上述的描述甩苛,我們簡單聊了一下flink是如何將流式數(shù)據(jù)寫入hive的蹂楣,但是可能每個人在做的過程中還是會遇到各種各種的環(huán)境問題導(dǎo)致的寫入失敗,比如window和linux系統(tǒng)的差異讯蒲,hdfs版本的差異痊土,系統(tǒng)時區(qū)的配置等等,在遇到一些個性化的問題之后墨林,就可能需要大家去針對自己的問題去個性化的debug了赁酝。

更多干貨信息,歡迎關(guān)注我的公眾號【大數(shù)據(jù)技術(shù)與應(yīng)用實戰(zhàn)】

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末旭等,一起剝皮案震驚了整個濱河市酌呆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌搔耕,老刑警劉巖隙袁,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡菩收,警方通過查閱死者的電腦和手機梨睁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來娜饵,“玉大人坡贺,你說我怎么就攤上這事∠湮瑁” “怎么了遍坟?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長晴股。 經(jīng)常有香客問我政鼠,道長,這世上最難降的妖魔是什么队魏? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任公般,我火速辦了婚禮,結(jié)果婚禮上胡桨,老公的妹妹穿的比我還像新娘官帘。我一直安慰自己,他們只是感情好昧谊,可當(dāng)我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布刽虹。 她就那樣靜靜地躺著,像睡著了一般呢诬。 火紅的嫁衣襯著肌膚如雪涌哲。 梳的紋絲不亂的頭發(fā)上儒溉,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天筹裕,我揣著相機與錄音,去河邊找鬼灭红。 笑死狗唉,一個胖子當(dāng)著我的面吹牛初烘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播分俯,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼肾筐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了缸剪?” 一聲冷哼從身側(cè)響起吗铐,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎杏节,沒想到半個月后唬渗,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體讥此,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年谣妻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片卒稳。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡蹋半,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出充坑,到底是詐尸還是另有隱情减江,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布捻爷,位于F島的核電站辈灼,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏也榄。R本人自食惡果不足惜巡莹,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望甜紫。 院中可真熱鬧降宅,春花似錦、人聲如沸囚霸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拓型。三九已至额嘿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間劣挫,已是汗流浹背册养。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留压固,地道東北人捕儒。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像邓夕,于是被迫代替她去往敵國和親刘莹。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,629評論 2 354