深入分析Flink的operator chain(算子鏈)機(jī)制

前言

這篇是昨晚沒寫完的鸣个,今晚補(bǔ)全發(fā)出來邻眷。

Flink算子鏈簡(jiǎn)介

“為什么我的Flink作業(yè)Web UI中只顯示出了一個(gè)框,并且Records Sent和Records Received指標(biāo)都是0?是我的程序?qū)懙糜袉栴}嗎臀突?”

筆者在Flink社區(qū)群里經(jīng)常能看到類似這樣的疑問。這種情況幾乎都不是程序有問題贾漏,而是因?yàn)镕link的operator chain——即算子鏈機(jī)制導(dǎo)致的候学,即提交的作業(yè)的執(zhí)行計(jì)劃中,所有算子的并發(fā)實(shí)例(即sub-task)都因?yàn)闈M足特定條件而串成了整體來執(zhí)行磕瓷,自然就觀察不到算子之間的數(shù)據(jù)流量了盒齿。

當(dāng)然上述是一種特殊情況。我們更常見到的是只有部分算子得到了算子鏈機(jī)制的優(yōu)化困食,如官方文檔中出現(xiàn)過多次的下圖所示边翁,注意Source和map()算子。

算子鏈機(jī)制的好處是顯而易見的:所有chain在一起的sub-task都會(huì)在同一個(gè)線程(即TaskManager的slot)中執(zhí)行硕盹,能夠減少不必要的數(shù)據(jù)交換符匾、序列化和上下文切換,從而提高作業(yè)的執(zhí)行效率瘩例。

鋪墊了這么多啊胶,接下來就通過源碼簡(jiǎn)單看看算子鏈產(chǎn)生的條件,以及它是如何在Flink Runtime中實(shí)現(xiàn)的垛贤。

邏輯計(jì)劃中的算子鏈

對(duì)Flink Runtime稍有了解的看官應(yīng)該知道焰坪,F(xiàn)link作業(yè)的執(zhí)行計(jì)劃會(huì)用三層圖結(jié)構(gòu)來表示,即:

  • StreamGraph——原始邏輯執(zhí)行計(jì)劃
  • JobGraph——優(yōu)化的邏輯執(zhí)行計(jì)劃(Web UI中看到的就是這個(gè))
  • ExecutionGraph——物理執(zhí)行計(jì)劃

算子鏈?zhǔn)窃趦?yōu)化邏輯計(jì)劃時(shí)加入的聘惦,也就是由StreamGraph生成JobGraph的過程中某饰。那么我們來到負(fù)責(zé)生成JobGraph的o.a.f.streaming.api.graph.StreamingJobGraphGenerator類,查看其核心方法createJobGraph()的源碼善绎。

private JobGraph createJobGraph() {
    // make sure that all vertices start immediately
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());
    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }
    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    setPhysicalEdges();
    // 略......

    return jobGraph;
}

可見黔漂,該方法會(huì)先計(jì)算出StreamGraph中各個(gè)節(jié)點(diǎn)的哈希碼作為唯一標(biāo)識(shí),并創(chuàng)建一個(gè)空的Map結(jié)構(gòu)保存即將被鏈在一起的算子的哈希碼禀酱,然后調(diào)用setChaining()方法炬守,如下源碼所示。

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
    }
}

可見是逐個(gè)遍歷StreamGraph中的Source節(jié)點(diǎn)剂跟,并調(diào)用createChain()方法减途。createChain()是邏輯計(jì)劃層創(chuàng)建算子鏈的核心方法酣藻,完整源碼如下,有點(diǎn)長(zhǎng)观蜗。

private List<StreamEdge> createChain(
        Integer startNodeId,
        Integer currentNodeId,
        Map<Integer, byte[]> hashes,
        List<Map<Integer, byte[]>> legacyHashes,
        int chainIndex,
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    if (!builtVertices.contains(startNodeId)) {
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
        }

        List<Tuple2<byte[], byte[]>> operatorHashes =
            chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

        byte[] primaryHashBytes = hashes.get(currentNodeId);
        OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
        }

        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }
        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {
            config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
        } else {
            chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);
        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;
    } else {
        return new ArrayList<>();
    }
}

先解釋一下方法開頭創(chuàng)建的3個(gè)List結(jié)構(gòu):

  • transitiveOutEdges:當(dāng)前算子鏈在JobGraph中的出邊列表臊恋,同時(shí)也是createChain()方法的最終返回值;
  • chainableOutputs:當(dāng)前能夠鏈在一起的StreamGraph邊列表墓捻;
  • nonChainableOutputs:當(dāng)前不能夠鏈在一起的StreamGraph邊列表抖仅。

接下來,從Source開始遍歷StreamGraph中當(dāng)前節(jié)點(diǎn)的所有出邊砖第,調(diào)用isChainable()方法判斷是否可以被鏈在一起(這個(gè)判斷邏輯稍后會(huì)講到)撤卢。可以鏈接的出邊被放入chainableOutputs列表梧兼,否則放入nonChainableOutputs列表放吩。

對(duì)于chainableOutputs中的邊,就會(huì)以這些邊的直接下游為起點(diǎn)羽杰,繼續(xù)遞歸調(diào)用createChain()方法延展算子鏈渡紫。對(duì)于nonChainableOutputs中的邊,由于當(dāng)前算子鏈的延展已經(jīng)到頭考赛,就會(huì)以這些“斷點(diǎn)”為起點(diǎn)惕澎,繼續(xù)遞歸調(diào)用createChain()方法試圖創(chuàng)建新的算子鏈。也就是說颜骤,邏輯計(jì)劃中整個(gè)創(chuàng)建算子鏈的過程都是遞歸的唧喉,亦即實(shí)際返回時(shí),是從Sink端開始返回的忍抽。

然后要判斷當(dāng)前節(jié)點(diǎn)是不是算子鏈的起始節(jié)點(diǎn)八孝。如果是,則調(diào)用createJobVertex()方法為算子鏈創(chuàng)建一個(gè)JobVertex(即JobGraph中的節(jié)點(diǎn))鸠项,也就形成了我們?cè)赪eb UI中看到的JobGraph效果:

最后干跛,還需要將各個(gè)節(jié)點(diǎn)的算子鏈數(shù)據(jù)寫入各自的StreamConfig中,算子鏈的起始節(jié)點(diǎn)要額外保存下transitiveOutEdges祟绊。StreamConfig在后文的物理執(zhí)行階段會(huì)再次用到楼入。

形成算子鏈的條件

來看看isChainable()方法的代碼。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
    StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();
}

由此可得久免,上下游算子能夠chain在一起的條件還是非常苛刻的(老生常談了)扭弧,列舉如下:

  • 上下游算子實(shí)例處于同一個(gè)SlotSharingGroup中(之后再提)阎姥;
  • 下游算子的鏈接策略(ChainingStrategy)為ALWAYS——既可以與上游鏈接,也可以與下游鏈接鸽捻。我們常見的map()呼巴、filter()等都屬此類泽腮;
  • 上游算子的鏈接策略為HEAD或ALWAYS。HEAD策略表示只能與下游鏈接衣赶,這在正常情況下是Source算子的專屬诊赊;
  • 兩個(gè)算子間的物理分區(qū)邏輯是ForwardPartitioner,可參見之前寫過的《聊聊Flink DataStream的八種物理分區(qū)邏輯》府瞄;
  • 兩個(gè)算子間的shuffle方式不是批處理模式碧磅;
  • 上下游算子實(shí)例的并行度相同;
  • 沒有禁用算子鏈遵馆。

禁用算子鏈

用戶可以在一個(gè)算子上調(diào)用startNewChain()方法強(qiáng)制開始一個(gè)新的算子鏈鲸郊,或者調(diào)用disableOperatorChaining()方法指定它不參與算子鏈。代碼位于SingleOutputStreamOperator類中货邓,都是通過改變算子的鏈接策略實(shí)現(xiàn)的秆撮。

@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
    return setChainingStrategy(ChainingStrategy.NEVER);
}

@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
    return setChainingStrategy(ChainingStrategy.HEAD);
}

如果要在整個(gè)運(yùn)行時(shí)環(huán)境中禁用算子鏈,調(diào)用StreamExecutionEnvironment.disableOperatorChaining()方法即可换况。

物理計(jì)劃中的算子鏈

在JobGraph轉(zhuǎn)換成ExecutionGraph并交由TaskManager執(zhí)行之后职辨,會(huì)生成調(diào)度執(zhí)行的基本任務(wù)單元——StreamTask,負(fù)責(zé)執(zhí)行具體的StreamOperator邏輯戈二。在StreamTask.invoke()方法中舒裤,初始化了狀態(tài)后端、checkpoint存儲(chǔ)和定時(shí)器服務(wù)之后挽拂,可以發(fā)現(xiàn):

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

構(gòu)造出了一個(gè)OperatorChain實(shí)例惭每,這就是算子鏈在實(shí)際執(zhí)行時(shí)的形態(tài)。解釋一下OperatorChain中的幾個(gè)主要屬性亏栈。

private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
  • headOperator:算子鏈的第一個(gè)算子台腥,對(duì)應(yīng)JobGraph中的算子鏈起始節(jié)點(diǎn);
  • allOperators:算子鏈中的所有算子绒北,倒序排列黎侈,即headOperator位于該數(shù)組的末尾;
  • streamOutputs:算子鏈的輸出闷游,可以有多個(gè)峻汉;
  • chainEntryPoint:算子鏈的“入口點(diǎn)”,它的含義將在后文說明脐往。

由上可知休吠,所有StreamTask都會(huì)創(chuàng)建OperatorChain。如果一個(gè)算子無法進(jìn)入算子鏈业簿,也會(huì)形成一個(gè)只有headOperator的單個(gè)算子的OperatorChain瘤礁。

OperatorChain構(gòu)造方法中的核心代碼如下。

for (int i = 0; i < outEdgesInOrder.size(); i++) {
    StreamEdge outEdge = outEdgesInOrder.get(i);
    RecordWriterOutput<?> streamOutput = createStreamOutput(
        recordWriters.get(i),
        outEdge,
        chainedConfigs.get(outEdge.getSourceId()),
        containingTask.getEnvironment());
    this.streamOutputs[i] = streamOutput;
    streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
    containingTask,
    configuration,
    chainedConfigs,
    userCodeClassloader,
    streamOutputMap,
    allOps);

if (operatorFactory != null) {
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
    headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
    headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
    headOperator = null;
}

// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);

首先會(huì)遍歷算子鏈整體的所有出邊梅尤,并調(diào)用createStreamOutput()方法創(chuàng)建對(duì)應(yīng)的下游輸出RecordWriterOutput柜思。然后就會(huì)調(diào)用createOutputCollector()方法創(chuàng)建物理的算子鏈岩调,并返回chainEntryPoint,這個(gè)方法比較重要赡盘,部分代碼如下号枕。

private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators) {
    List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

    // create collectors for the network outputs
    for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
        @SuppressWarnings("unchecked")
        RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }

    // Create collectors for the chained outputs
    for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
        int outputId = outputEdge.getTargetId();
        StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
        WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
            containingTask,
            chainedOpConfig,
            chainedConfigs,
            userCodeClassloader,
            streamOutputs,
            allOperators,
            outputEdge.getOutputTag());
        allOutputs.add(new Tuple2<>(output, outputEdge));
    }
    // 以下略......
}

該方法從上一節(jié)提到的StreamConfig中分別取出出邊和鏈接邊的數(shù)據(jù),并創(chuàng)建各自的Output陨享。出邊的Output就是將數(shù)據(jù)發(fā)往算子鏈之外下游的RecordWriterOutput葱淳,而鏈接邊的輸出要靠createChainedOperator()方法。

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
        StreamTask<?, ?> containingTask,
        StreamConfig operatorConfig,
        Map<Integer, StreamConfig> chainedConfigs,
        ClassLoader userCodeClassloader,
        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
        List<StreamOperator<?>> allOperators,
        OutputTag<IN> outputTag) {
    // create the output that the operator writes to first. this may recursively create more operators
    WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
        containingTask,
        operatorConfig,
        chainedConfigs,
        userCodeClassloader,
        streamOutputs,
        allOperators);

    // now create the operator and give it the output collector to write its output to
    StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
    OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
            containingTask, operatorConfig, chainedOperatorOutput);

    allOperators.add(chainedOperator);

    WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
    if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
        currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
    }
    else {
        TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
        currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
    }

    // wrap watermark gauges since registered metrics must be unique
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
    chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
    return currentOperatorOutput;
}

我們一眼就可以看到霉咨,這個(gè)方法遞歸調(diào)用了上述createOutputCollector()方法蛙紫,與邏輯計(jì)劃階段類似,通過不斷延伸Output來產(chǎn)生chainedOperator(即算子鏈中除了headOperator之外的算子)途戒,并逆序返回坑傅,這也是allOperators數(shù)組中的算子順序?yàn)榈剐虻脑颉?/p>

chainedOperator產(chǎn)生之后,將它們通過ChainingOutput連接起來喷斋,形成如下圖所示的結(jié)構(gòu)唁毒。

圖片來自:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

最后來看看ChainingOutput.collect()方法是如何輸出數(shù)據(jù)流的。

@Override
public void collect(StreamRecord<T> record) {
    if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }
    pushToOperator(record);
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
        // we are only responsible for emitting to the side-output specified by our
        // OutputTag.
        return;
    }
    pushToOperator(record);
}

protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        operator.setKeyContextElement1(castRecord);
        operator.processElement(castRecord);
    }
    catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

可見是通過調(diào)用鏈接算子的processElement()方法星爪,直接將數(shù)據(jù)推給下游處理了浆西。也就是說,OperatorChain完全可以看做一個(gè)由headOperator和streamOutputs組成的單個(gè)算子顽腾,其內(nèi)部的chainedOperator和ChainingOutput都像是被黑盒遮蔽近零,同時(shí)沒有引入任何overhead。

打通了算子鏈在執(zhí)行層的邏輯抄肖,看官應(yīng)該會(huì)明白chainEntryPoint的含義了久信。由于它位于遞歸返回的終點(diǎn),所以它就是流入算子鏈的起始Output漓摩,即上圖中指向headOperator的RecordWriterOutput裙士。

The End

上半年過去了,希望下半年能夠多些好事發(fā)生管毙。

民那晚安腿椎。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市夭咬,隨后出現(xiàn)的幾起案子啃炸,更是在濱河造成了極大的恐慌,老刑警劉巖卓舵,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件南用,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)训枢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來忘巧,“玉大人恒界,你說我怎么就攤上這事⊙庾欤” “怎么了十酣?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)际长。 經(jīng)常有香客問我耸采,道長(zhǎng),這世上最難降的妖魔是什么工育? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任虾宇,我火速辦了婚禮,結(jié)果婚禮上如绸,老公的妹妹穿的比我還像新娘嘱朽。我一直安慰自己,他們只是感情好怔接,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布搪泳。 她就那樣靜靜地躺著,像睡著了一般扼脐。 火紅的嫁衣襯著肌膚如雪岸军。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天瓦侮,我揣著相機(jī)與錄音艰赞,去河邊找鬼。 笑死脏榆,一個(gè)胖子當(dāng)著我的面吹牛猖毫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播须喂,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼吁断,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了坞生?” 一聲冷哼從身側(cè)響起仔役,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎是己,沒想到半個(gè)月后又兵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沛厨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了宙地。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡逆皮,死狀恐怖宅粥,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情电谣,我是刑警寧澤秽梅,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站剿牺,受9級(jí)特大地震影響企垦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜晒来,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一钞诡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧湃崩,春花似錦臭增、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至整陌,卻和暖如春拗窃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背泌辫。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工随夸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人震放。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓宾毒,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親殿遂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子诈铛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348