Flink 源碼之StreamTask

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

前言

本篇我們一起分析下Flink中流處理作業(yè)的初始化和執(zhí)行邏輯淤刃。

AbstractInvokable

AbstractInvokableTaskManager中運行的所有任務的父類晒他。所有的讀取上游數(shù)據(jù),用戶數(shù)據(jù)處理邏輯(map逸贾,filter算子以及用戶自己編寫的processFunction等等)和發(fā)送處理過的數(shù)據(jù)到下游相關邏輯都在該類的invoke方法中得到執(zhí)行陨仅。

AbstractInvokable中與任務執(zhí)行相關的2個方法為:

  • invoke方法:啟動任務執(zhí)行的入口方法。實現(xiàn)類必須重寫這個方法耕陷。
  • cancel方法:任務被取消或者是用戶終止任務的時候被調用

它有兩個實現(xiàn)類:

  • BatchTask:所有批處理類型Task的基類掂名。
  • StreamTask:所有流處理類型Task的基類。

我們以流處理為重點哟沫,下面詳細介紹下StreamTask這個類饺蔑。

AbstractInvokable的創(chuàng)建

在開始分析StreamTask之前我們需要了解下它是在何處,如何被創(chuàng)建出來的嗜诀。

翻閱Task線程的處理邏輯猾警,不難發(fā)現(xiàn)它的invoke變量初始化位于TaskdoRun方法。

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

這一行代碼使用用戶代碼類加載器(userCodeClassLoader)隆敢,調用目標類唯一參數(shù)為Environment類型的構造方法发皿,創(chuàng)建出invokable對象。

private static AbstractInvokable loadAndInstantiateInvokable(
    ClassLoader classLoader, String className, Environment environment) throws Throwable {

    final Class<? extends AbstractInvokable> invokableClass;
    try {
        // 使用指定的classloader加載className對應的class拂蝎,并轉換為AbstractInvokable類型
        invokableClass =
            Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
    } catch (Throwable t) {
        throw new Exception("Could not load the task's invokable class.", t);
    }

    Constructor<? extends AbstractInvokable> statelessCtor;

    try {
        // 獲取構造函數(shù)
        statelessCtor = invokableClass.getConstructor(Environment.class);
    } catch (NoSuchMethodException ee) {
        throw new FlinkException("Task misses proper constructor", ee);
    }

    // instantiate the class
    try {
        //noinspection ConstantConditions  --> cannot happen
        // 傳入environment變量穴墅,創(chuàng)建出新的對象
        return statelessCtor.newInstance(environment);
    } catch (InvocationTargetException e) {
        // directly forward exceptions from the eager initialization
        throw e.getTargetException();
    } catch (Exception e) {
        throw new FlinkException("Could not instantiate the task's invokable class.", e);
    }
}

StreamTask

StreamTask類是所有流處理任務的基類。Task由TaskManager部署和執(zhí)行。Task是本地運行單元玄货。每一個Task包含了一個或多個operator皇钞。這些operator在同一個OperatorChain中。
StreamTask任務執(zhí)行生命周期包含:

  1. setInitialState:設置各個operator的初始狀態(tài)松捉。對應initializeState方法夹界。
  2. 調用 invoke方法。

其中invoke方法包含的邏輯可細分為:

  • 創(chuàng)建出task相關配置隘世,創(chuàng)建OperatorChain可柿。
  • 執(zhí)行operator的setup邏輯。
  • 執(zhí)行task相關的初始化邏輯丙者。
  • 加載并初始化operator的狀態(tài)复斥。
  • 調用各個operator的open方法。
  • 執(zhí)行各個operator內的數(shù)據(jù)處理邏輯械媒。
  • 關閉operator永票。
  • 銷毀operator。
  • 任務清理操作滥沫。

下面我們從代碼層面詳細分析下invoke方法的處理流程侣集。

invoke方法

本節(jié)我們分析StreamTask核心執(zhí)行邏輯invoke方法。invoke方法如下所示:

@Override
public final void invoke() throws Exception {
    try {
        // 調用作業(yè)執(zhí)行前相關準備邏輯
        beforeInvoke();

        // final check to exit early before starting to run
        // 如果任務被取消兰绣,拋出異常退出
        if (canceled) {
            throw new CancelTaskException();
        }

        // let the task do its work
        // 執(zhí)行用戶編寫的task邏輯
        runMailboxLoop();

        // if this left the run() method cleanly despite the fact that this was canceled,
        // make sure the "clean shutdown" is not attempted
        // 再次檢查如果任務被取消世分,拋出異常退出
        if (canceled) {
            throw new CancelTaskException();
        }

        // 執(zhí)行調用后相關邏輯
        afterInvoke();
    } catch (Throwable invokeException) {
        failing = !canceled;
        try {
            cleanUpInvoke();
        }
        // TODO: investigate why Throwable instead of Exception is used here.
        catch (Throwable cleanUpException) {
            Throwable throwable =
                ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
            ExceptionUtils.rethrowException(throwable);
        }
        ExceptionUtils.rethrowException(invokeException);
    }
    
    // 執(zhí)行invoke后清理操作
    cleanUpInvoke();
}

beforeInvoke方法

beforeInvoke方法主要為task的初始化操作,包含創(chuàng)建OperatorChain缀辩,讀取上游數(shù)據(jù)和下游數(shù)據(jù)輸出配置等臭埋。詳細內容如下:

protected void beforeInvoke() throws Exception {
    disposedOperators = false;
    LOG.debug("Initializing {}.", getName());

    // 創(chuàng)建出OperatorChain
    // OperatorChain是JobGraph生成時的一箱優(yōu)化措施
    // 將復合條件的多個StreamNode(對應數(shù)據(jù)變換操作)合并到一個chain中
    // 他們會被調度到同一個StreamTask中執(zhí)行
    operatorChain = new OperatorChain<>(this, recordWriter);
    // 獲取OperatorChain中第一個operator
    mainOperator = operatorChain.getMainOperator();

    // task specific initialization
    // 執(zhí)行task專屬的初始化工作
    // 這個是抽象方法
    // 具體邏輯需要在子類中實現(xiàn)
    init();

    // save the work of reloading state, etc, if the task is already canceled
    if (canceled) {
        throw new CancelTaskException();
    }

    // -------- Invoke --------
    LOG.debug("Invoking {}", getName());

    // we need to make sure that any triggers scheduled in open() cannot be
    // executed before all operators are opened
    // task動作必須在StreamTaskActionExecutor中執(zhí)行,防止出現(xiàn)并發(fā)執(zhí)行問題臀玄,影響checkpoint
    // 該executor實際為StreamTaskActionExecutor.IMMEDIATE瓢阴,即在當前線程直接運行
    actionExecutor.runThrowing(
        () -> {
            // 創(chuàng)建SequentialChannelStateReader,用于讀取checkpoint時保存的channel狀態(tài)
            SequentialChannelStateReader reader =
                getEnvironment()
                .getTaskStateManager()
                .getSequentialChannelStateReader();
            // 獲取ResultPartitionWriter狀態(tài)
            reader.readOutputData(
                getEnvironment().getAllWriters(),
                !configuration.isGraphContainingLoops());

            // 初始化OperatorChain中所有的operator
            // 調用他們的initializeState(初始化狀態(tài))和open(包含初始化動作)方法
            operatorChain.initializeStateAndOpenOperators(
                createStreamTaskStateInitializer());

            channelIOExecutor.execute(
                () -> {
                    try {
                        // 獲取InputGate狀態(tài)
                        reader.readInputData(getEnvironment().getAllInputGates());
                    } catch (Exception e) {
                        asyncExceptionHandler.handleAsyncException(
                            "Unable to read channel state", e);
                    }
                });

            for (InputGate inputGate : getEnvironment().getAllInputGates()) {
                // 在inputGate狀態(tài)被讀取之后執(zhí)行
                inputGate
                    .getStateConsumedFuture()
                    .thenRun(
                    () ->
                    // 在task線程中執(zhí)行
                    mainMailboxExecutor.execute(
                        // 執(zhí)行請求partition方法
                        inputGate::requestPartitions,
                        "Input gate request partitions"));
            }
        });
    // 水池狀態(tài)為正在執(zhí)行
    isRunning = true;
}

runMailboxLoop方法

runMailboxLoop方法啟動task的數(shù)據(jù)輸入和處理邏輯:

public void runMailboxLoop() throws Exception {
    mailboxProcessor.runMailboxLoop();
}

MailBoxProcessorStreamTask的構造函數(shù)中創(chuàng)建出來:

this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

mailboxProcessor.runMailboxLoop()方法可以理解為在actionExecutor線程池執(zhí)行processInput方法荣恐。

processInput方法從上游(StreamTaskNetworkInput叠穆,InputGate)讀取數(shù)據(jù)。這部分邏輯參見Flink 源碼之節(jié)點間通信硼被。

afterInvoke

afterInvoke方法內容如下渗磅,概括起來為task執(zhí)行完畢后的清理工作检访,關閉operator等仔掸。

protected void afterInvoke() throws Exception {
    LOG.debug("Finished task {}", getName());
    getCompletionFuture().exceptionally(unused -> null).join();

    final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();

    // close all operators in a chain effect way
    // 關閉OperatorChain中所有的operator
    // 從前向后依次調用各個operator的close方法
    operatorChain.closeOperators(actionExecutor);

    // make sure no further checkpoint and notification actions happen.
    // at the same time, this makes sure that during any "regular" exit where still
    actionExecutor.runThrowing(
        () -> {

            // make sure no new timers can come
            // 停止timer服務
            FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);

            // let mailbox execution reject all new letters from this point
            // 準備關閉mailboxProcessor,不再接受新的事件
            mailboxProcessor.prepareClose();

            // only set the StreamTask to not running after all operators have been closed!
            // See FLINK-7430
            // 設置task狀態(tài)為停止
            isRunning = false;
        });
    // processes the remaining mails; no new mails can be enqueued
    // 處理積壓的事件
    mailboxProcessor.drain();

    // make sure all timers finish
    // 等待所有的time都停止
    timersFinishedFuture.get();

    LOG.debug("Closed operators for task {}", getName());

    // make sure all buffered data is flushed
    // 處理掉buffer中的所有數(shù)據(jù)
    operatorChain.flushOutputs();

    // make an attempt to dispose the operators such that failures in the dispose call
    // still let the computation fail
    // 依次廢棄掉OperatorChain中的所有operator(順序為從頭到尾)
    disposeAllOperators();
}

StreamTask的子類

StreamTask是所有流處理計算任務的父類嘉汰,它本身是一個抽象類。為了處理不同類型的StreamOperator状勤,StreamTask有多種不同的實現(xiàn)鞋怀。幾個典型的實現(xiàn)如下:

  • OneInputStreamTask:處理OneInputStreamOperator,即只有一個輸入流的StreamOperator持搜。
  • TwoInputStreamTask:處理TwoInputStreamOperator密似,具有2個輸入流。
  • MultipleInputStreamTask:處理MultipleInputStreamOperator葫盼,具有多個輸入流残腌。
  • SourceStreamTask:處理StreamSource,即數(shù)據(jù)源贫导。

接下來我們重點關注這些類實現(xiàn)的抽象方法抛猫。

OneInputStreamTask的init方法

它的init方法主要流程為創(chuàng)建網(wǎng)絡輸入與輸出,創(chuàng)建inputProcessor用于從網(wǎng)絡輸入讀取數(shù)據(jù)孩灯,反序列化之后傳遞給網(wǎng)絡輸出闺金。最后初始化數(shù)據(jù)流監(jiān)控。代碼和分析如下:

@Override
public void init() throws Exception {
    // 獲取流作業(yè)配置
    StreamConfig configuration = getConfiguration();
    // 獲取網(wǎng)絡輸入流數(shù)量
    int numberOfInputs = configuration.getNumberOfNetworkInputs();

    if (numberOfInputs > 0) {
        // 創(chuàng)建一個CheckpointedInputGate
        // 該類型InputGate擁有一個CheckpointBarrierHandler峰档,用來處理接收到的CheckpointBarrier
        CheckpointedInputGate inputGate = createCheckpointedInputGate();
        // 監(jiān)控相關败匹,設置流入數(shù)據(jù)條數(shù)計數(shù)器
        Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
        // 創(chuàng)建StreamTaskNetworkOutput
        // 發(fā)送反序列化后的數(shù)據(jù)給task處理流程
        DataOutput<IN> output = createDataOutput(numRecordsIn);
        // 創(chuàng)建StreamTaskNetworkInput
        // 包裝了CheckpointedInputGate,從中讀取網(wǎng)絡接收到的原始數(shù)據(jù)并發(fā)給反序列化器
        StreamTaskInput<IN> input = createTaskInput(inputGate);

        // 讀取輸入流配置
        StreamConfig.InputConfig[] inputConfigs =
            configuration.getInputs(getUserCodeClassLoader());
        StreamConfig.InputConfig inputConfig = inputConfigs[0];
        // 如果要求對數(shù)據(jù)排序
        // 含義為數(shù)據(jù)按照key字段分組
        // 在一段時間內只會給task提供同一分組的數(shù)據(jù)
        // 不同組的數(shù)據(jù)不會頻繁交替出現(xiàn)
        if (requiresSorting(inputConfig)) {
            checkState(
                !configuration.isCheckpointingEnabled(),
                "Checkpointing is not allowed with sorted inputs.");
            input = wrapWithSorted(input);
        }

        // 注冊流入數(shù)據(jù)條數(shù)計數(shù)器監(jiān)控
        getEnvironment()
            .getMetricGroup()
            .getIOMetricGroup()
            .reuseRecordsInputCounter(numRecordsIn);

        // 創(chuàng)建inputProcessor
        // 從網(wǎng)絡讀取數(shù)據(jù)讥巡,反序列化后給output掀亩,然后把反序列化后的數(shù)據(jù)交給OperatorChain
        inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
    }
    // 創(chuàng)建watermark監(jiān)控
    mainOperator
        .getMetricGroup()
        .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
    // wrap watermark gauge since registered metrics must be unique
    getEnvironment()
        .getMetricGroup()
        .gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
}

其中創(chuàng)建CheckpointedInputGate的過程在 Flink 源碼之分布式快照 有介紹,請大家查閱欢顷。

TwoInputStreamTask的init方法

它的初始化方法和OneInputStreamTask的類似槽棍,只不過需要創(chuàng)建兩個InputGateTwoInputStreamTask對應CoOperator抬驴,即有兩個輸入流的operator(比如CoFlatmap)刹泄。

@Override
public void init() throws Exception {
    StreamConfig configuration = getConfiguration();
    ClassLoader userClassLoader = getUserCodeClassLoader();

    int numberOfInputs = configuration.getNumberOfNetworkInputs();

    ArrayList<IndexedInputGate> inputList1 = new ArrayList<>();
    ArrayList<IndexedInputGate> inputList2 = new ArrayList<>();

    List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);

    for (int i = 0; i < numberOfInputs; i++) {
        int inputType = inEdges.get(i).getTypeNumber();
        IndexedInputGate reader = getEnvironment().getInputGate(i);
        switch (inputType) {
            case 1:
                // 如果是輸入流1,加入到inputList1中
                inputList1.add(reader);
                break;
            case 2:
                // 如果是輸入流2怎爵,加入到inputList2中
                inputList2.add(reader);
                break;
            default:
                throw new RuntimeException("Invalid input type number: " + inputType);
        }
    }

    // 創(chuàng)建CheckpointedInputGate特石,包裝了UnionInputGate
    // 包裝了多個InputGate姆蘸,ID相同的channel會被合并
    // 這里創(chuàng)建出兩個UnionInputGate逞敷,每個UnionInputGate合并了多個inputType相同的InputGate
    // 最后根據(jù)這個InputGate推捐,創(chuàng)建出StreamTwoInputProcessor
    createInputProcessor(
        inputList1, inputList2, gateIndex -> inEdges.get(gateIndex).getPartitioner());

    // 監(jiān)控相關部分堪簿,這里省略
    // ...
}

MultipleInputStreamTask和上面的邏輯類似椭更,不再贅述虑瀑。

SourceStreamTask的init方法

@Override
protected void init() {
    // we check if the source is actually inducing the checkpoints, rather
    // than the trigger
    // 獲取數(shù)據(jù)源數(shù)據(jù)產(chǎn)生邏輯SourceFunction
    SourceFunction<?> source = mainOperator.getUserFunction();
    // 如果source實現(xiàn)了這個接口舌狗,說明接收到CheckpointCoordinator發(fā)來的觸發(fā)checkpoint消息之時source不觸發(fā)checkpoint
    // checkpoint的觸發(fā)由輸入數(shù)據(jù)控制
    if (source instanceof ExternallyInducedSource) {
        externallyInducedCheckpoints = true;

        // 創(chuàng)建checkpoint觸發(fā)鉤子
        ExternallyInducedSource.CheckpointTrigger triggerHook =
            new ExternallyInducedSource.CheckpointTrigger() {

            @Override
            public void triggerCheckpoint(long checkpointId) throws FlinkException {
                // TODO - we need to see how to derive those. We should probably not
                // encode this in the
                // TODO -   source's trigger message, but do a handshake in this task
                // between the trigger
                // TODO -   message from the master, and the source's trigger
                // notification
                final CheckpointOptions checkpointOptions =
                    CheckpointOptions.forConfig(
                    CheckpointType.CHECKPOINT,
                    CheckpointStorageLocationReference.getDefault(),
                    configuration.isExactlyOnceCheckpointMode(),
                    configuration.isUnalignedCheckpointsEnabled(),
                    configuration.getAlignmentTimeout());
                final long timestamp = System.currentTimeMillis();

                final CheckpointMetaData checkpointMetaData =
                    new CheckpointMetaData(checkpointId, timestamp);

                try {
                    // 調用StreamTask的異步觸發(fā)checkpoint方法
                    SourceStreamTask.super
                        .triggerCheckpointAsync(
                        checkpointMetaData, checkpointOptions)
                        .get();
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    throw new FlinkException(e.getMessage(), e);
                }
            }
        };

        ((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
    }
    // 配置checkpoint啟動延遲時間監(jiān)控
    getEnvironment()
        .getMetricGroup()
        .getIOMetricGroup()
        .gauge(
        MetricNames.CHECKPOINT_START_DELAY_TIME,
        this::getAsyncCheckpointStartDelayNanos);
}

StreamTask從上游獲取數(shù)據(jù)

StreamTask從上游獲取數(shù)據(jù)的調用鏈為:

  • StreamTask.processInput
  • inputProcessor.processInput
  • StreamTaskNetworkInput.emitNext
  • inputGate.pollNext
  • inputChannel.getNextBuffer

StreamTask通過InputGate從上游其他Task獲取到數(shù)據(jù)铭污。每個InputGate包含一個或多個InputChannel嘹狞,根據(jù)數(shù)據(jù)是否走網(wǎng)絡通信磅网,這些InputChannel分為RemoteInputChannelLocalInputChannel涧偷。其中RemoteInputChannel使用Netty通過網(wǎng)絡從上游task的ResultSubPartition獲取數(shù)據(jù)燎潮,適用與本task和上游task運行在不同集群節(jié)點的情況确封。和它相反的是LocalInputChannel,適用于本task和上游task運行在同一節(jié)點的情況颜曾,從上游task獲取數(shù)據(jù)不需要走網(wǎng)絡通信稠诲。

這部分邏輯的詳細分析诡曙,參見 Flink 源碼之節(jié)點間通信

數(shù)據(jù)傳遞給OperatorChain

這一段邏輯我們從StreamTaskNetworkInputprocessElement方法開始分析聚请。

StreamTaskprocessInput方法為處理數(shù)據(jù)邏輯的入口驶赏。這個方法調用了StreamOneInputProcessor的同名方法煤傍,命令StreamTaskNetworkInput一直循環(huán)不停的從InputGate中獲取數(shù)據(jù)蚯姆。對于獲取到的數(shù)據(jù)龄恋,需要先交給反序列化器郭毕,將二進制數(shù)據(jù)反序列化為StreamRecord對象显押。接著交給processElement方法處理乘碑。

上面邏輯的分析請參見 Flink 源碼之節(jié)點間通信 讀取數(shù)據(jù)章節(jié)兽肤。

下面是processElement方法。該方法位于AbstractStreamTaskNetworkInput睦疫。參數(shù)中的output實際上就是StreamTaskNetworkOutput`對象蛤育。

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
    // 首先判斷元素的類型瓦糕,可能是數(shù)據(jù)咕娄,watermark圣勒,延遲標記或者是流狀態(tài)
    if (recordOrMark.isRecord()) {
        output.emitRecord(recordOrMark.asRecord());
    } else if (recordOrMark.isWatermark()) {
        statusWatermarkValve.inputWatermark(
            recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
    } else if (recordOrMark.isLatencyMarker()) {
        output.emitLatencyMarker(recordOrMark.asLatencyMarker());
    } else if (recordOrMark.isStreamStatus()) {
        statusWatermarkValve.inputStreamStatus(
            recordOrMark.asStreamStatus(),
            flattenedChannelIndices.get(lastChannel),
            output);
    } else {
        throw new UnsupportedOperationException("Unknown type of StreamElement");
    }
}

StreamTaskNetworkOutput接收反序列化處理過的數(shù)據(jù)圣贸,發(fā)送給OperatorChain的第一個operator吁峻。

private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {

    // 創(chuàng)建的時候傳入的是OperatorChain的mainOperator用含,即第一個operator
    private final OneInputStreamOperator<IN, ?> operator;

    private final WatermarkGauge watermarkGauge;
    private final Counter numRecordsIn;

    private StreamTaskNetworkOutput(
        OneInputStreamOperator<IN, ?> operator,
        StreamStatusMaintainer streamStatusMaintainer,
        WatermarkGauge watermarkGauge,
        Counter numRecordsIn) {
        super(streamStatusMaintainer);

        this.operator = checkNotNull(operator);
        this.watermarkGauge = checkNotNull(watermarkGauge);
        this.numRecordsIn = checkNotNull(numRecordsIn);
    }

    // 發(fā)送數(shù)據(jù)
    @Override
    public void emitRecord(StreamRecord<IN> record) throws Exception {
        numRecordsIn.inc();
        operator.setKeyContextElement1(record);
        // 調用processElement方法,處理數(shù)據(jù)
        operator.processElement(record);
    }

    // 發(fā)送watermark
    @Override
    public void emitWatermark(Watermark watermark) throws Exception {
        watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
        // 調用processWatermark方法肠缔,處理watermark
        operator.processWatermark(watermark);
    }

    // 發(fā)送延遲標記明未,被用于統(tǒng)計數(shù)據(jù)在整個Flink處理流程中的耗時
    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        operator.processLatencyMarker(latencyMarker);
    }
}

OperatorChain的邏輯在后續(xù)博客中單獨分析趟妥。

本博客為作者原創(chuàng)披摄,歡迎大家參與討論和批評指正。如需轉載請注明出處义辕。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市傀蚌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌撩幽,老刑警劉巖窜醉,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件酱虎,死亡現(xiàn)場離奇詭異擂涛,居然都是意外死亡撒妈,警方通過查閱死者的電腦和手機狰右,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門棋蚌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來谷暮,“玉大人湿弦,你說我怎么就攤上這事颊埃。” “怎么了班利?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵趾浅,是天一觀的道長皿哨。 經(jīng)常有香客問我纽谒,道長鼓黔,這世上最難降的妖魔是什么澳化? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任缎谷,我火速辦了婚禮列林,結果婚禮上希痴,老公的妹妹穿的比我還像新娘砌创。我一直安慰自己,他們只是感情好刽辙,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布扫倡。 她就那樣靜靜地躺著疚鲤,像睡著了一般缘挑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诲宇,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天姑蓝,我揣著相機與錄音纺荧,去河邊找鬼宙暇。 笑死占贫,一個胖子當著我的面吹牛型奥,可吹牛的內容都是我干的桩引。 我是一名探鬼主播收夸,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼咽瓷!你這毒婦竟也來了舰讹?” 一聲冷哼從身側響起钻洒,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤素标,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后寓免,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體袜香,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年疾就,在試婚紗的時候發(fā)現(xiàn)自己被綠了艺蝴。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猜敢。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡鼠冕,死狀恐怖懈费,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情博脑,我是刑警寧澤憎乙,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站叉趣,受9級特大地震影響泞边,放射性物質發(fā)生泄漏。R本人自食惡果不足惜疗杉,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一阵谚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦梢什、人聲如沸闻牡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間坟桅,已是汗流浹背蓬戚。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留旭绒,地道東北人忽匈。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓折柠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內容