Storm 測試

本文將學習如何使用java創(chuàng)建Storm拓撲

Storm集群的組件

Storm集群類似于Hadoop集群绝页,只不過 Hadoop 上運行"MapReduce jobs"迄汛, Storm 上運行"topologies"。
兩者最大的差別是槽畔,MapReducejobs 最終是完成的栈妆,而 topologies 是一直處理消息(或直到你殺死它)。

集群 任務名稱 任務時效性
Storm topologies(拓撲) 一直處理消息(或直到你殺死它)
Hadoop MapReduce jobs 最終是完成的

Storm集群上有兩種節(jié)點:master 和 worker 節(jié)點

  • master:
    運行一個名為 Nimbus 的守護進程,
    負責在集群周圍分發(fā)代碼厢钧,
    為機器分配任務以及監(jiān)控故障鳞尔。
    (類似 Hadoop 的 JobTracker)
  • worker:
    運行一個名為 Supervisor 的守護進程,
    負責監(jiān)聽、并根據(jù)需要啟動早直、停止 "Nimbus" 分配給其的任務铅檩。
    每個工作進程都執(zhí)行拓撲的子集。 運行拓撲由分布在許多計算機上的許多工作進程組成莽鸿。

Nimbus 和 Supervisors 之間的協(xié)調(diào)是通過 Zookeeper 實現(xiàn)的昧旨。
此外,Nimbus 守護程序和 Supervisors 守護程序是 fail-fast 和 stateless;
所有狀態(tài)都保存在Zookeeper或本地磁盤上祥得。這意味著你可以通過 kill -9 殺死 Nimbus 或者 Supervisors 兔沃,但是它們會像沒事一樣重新開始。
這種設計使Storm集群非常穩(wěn)定级及。

Storm集群的組件.png

Topologies

要想在 Storm 上進行實時計算乒疏,你需要創(chuàng)建一個 topologies 。
topologies 是一個計算圖饮焦,topologies中的每個節(jié)點包含計算邏輯怕吴,并且通過節(jié)點之間的連接定義了數(shù)據(jù)在節(jié)點之間的流動方向。

運行拓撲很簡單县踢。首先转绷,將所有代碼和依賴項打包到一個jar中。然后硼啤,運行如下命令:

storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
額 這個命令沒啥好解釋的....

Streams

Stream 是一個無限的元組序列议经,是 Storm 抽象的核心。
Storm 提供了以分布式和可靠的方式進行 Stream 傳換的原語谴返。
比如將微博 Stream 轉(zhuǎn)換為轉(zhuǎn)換熱門主題 Stream煞肾。

Storm 為進行 Stream 轉(zhuǎn)換提供 spouts 和 bolt 兩個基本原語。

  • spouts
    spouts 是 Stream 的來源嗓袱。
    例如籍救,spout可以讀取Kestrel隊列中的元組并將其作為 Stream 發(fā)出∏ǎ或者 spout 可以連接到Twitter API并發(fā)出推文流蝙昙。
  • bolt
    bolt會消耗任意數(shù)量的輸入流闪萄,進行一些處理,并可能發(fā)出新的流耸黑。
    像從推文流計算趨勢主題流之類的復雜流轉(zhuǎn)換桃煎,需要多個步驟篮幢,因此需要多個 bolt 大刊。
    bolt 可以執(zhí)行任何操作,包括運行函數(shù)三椿,過濾元組缺菌,進行流聚合,進行流連接搜锰,與數(shù)據(jù)庫對話等等伴郁。

spout 和 bolt 網(wǎng)絡被打包成一個 topology ,這是提交給 Storm 集群執(zhí)行的頂級抽象蛋叼。
拓撲是流轉(zhuǎn)換的圖形焊傅,其中每個節(jié)點都是一個 spout 或 bolt 。
圖中的表示哪些 bolt 訂閱了哪些流狈涮。
當一個 spout 或 bolt 向一個流發(fā)出一個元組時狐胎,它會將元組發(fā)送給訂閱該流的每個 bolt 。

Streams 抽象.png

拓撲中節(jié)點之間的鏈接指示應如何傳遞元組歌馍。
如上圖握巢,Spout A 和Bolt B 之間有鏈接,Spout A 到 Bolt C 之間有鏈接松却,以及從 Bolt B 到 Bolt C 之間有鏈接暴浦。
那么每次 Spout A 發(fā)出一個元組時,它都會將元組發(fā)送給 Bolt B 和 Bolt C .所有 Bolt B 的輸出元組也將送給 Bolt C.

Storm拓撲中的每個節(jié)點并行執(zhí)行晓锻。
在拓撲中歌焦,你可以為每個節(jié)點指定所需的并行度,Storm將在集群中生成該數(shù)量的線程以執(zhí)行砚哆。

拓撲會一直執(zhí)行(或直到你殺死它)同规。
Storm會自動重新分配失敗的任務。
此外窟社,Storm保證不會丟失數(shù)據(jù)券勺,即使計算機出現(xiàn)故障并且消息丟失也是如此。


Data model

Storm使用元組作為其數(shù)據(jù)模型灿里。
元組是一個命名的值列表关炼,元組中的字段可以是任何類型的對象。
Storm支持所有原始類型匣吊,字符串和字節(jié)數(shù)組作為元組字段值儒拂。
要使用其他類型的對象寸潦,需要為該類型實現(xiàn)一個序列化程序。

拓撲中的每個節(jié)點都必須聲明它發(fā)出的元組的輸出字段社痛。
例如下面代碼中的bolt 聲明它發(fā)出2元組见转,字段為 "double" 和 "triple"

package com.aaa.test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author lillcol
 * 2019/7/18-11:46
 */
public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollector _collector;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));//聲明["double", "triple"]組件的輸出字段
    }
}

一個簡單的拓撲(A simple topology)

如何實現(xiàn)一個簡單的拓撲?
本地 idea測試
sbt構(gòu)建

// libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"

定義一個Spout蒜哀,此處采用隨機數(shù)

package com.test.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

/**
 * @author lillcol
 * 2019/7/18-12:03
 */
public class TestWordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        //Spouts負責向拓撲中發(fā)送新消息
        Utils.sleep(100);
        //每隔100ms就會從列表中隨機選一個單詞發(fā)出
        final String[] words = new String[]{"hellow", "lillcol", "study", "storm"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        collector.emit(new Values(word));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

定義Bolt斩箫,功能接收到的信息追加"levelUp!"

package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author lillcol
 * 2019/7/18-12:04
 */
public class ExclamationBolt extends BaseRichBolt {
    OutputCollector collector;

   //prepare方法為 Bolt 提供了一個OutputCollector用于從 Bolt 中發(fā)出元組 。
   //元組可以隨時的從prepare撵儿,execute乘客,cleanup,甚至在另一個線程中異步發(fā)出淀歇。
   //當前prepare實現(xiàn)只是將OutputCollector作為實例變量保存易核,以便稍后在execute方法中使用。
    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector =collector;
    }
    //execute方法從一個Bolt的輸入接收一個元組浪默。
    //此execute取數(shù)組的第一個字段并發(fā)出追加字符串“l(fā)evelUp!” 得字符串牡直。
    //如果您實現(xiàn)了一個訂閱多個輸入源的bolt,您可以通過使用Tuple#getSourceComponent方法找出Tuple來自哪個組件纳决。
    @Override
    public void execute(Tuple input) {
        String sourceComponent = input.getSourceComponent();
        //輸入元組作為第一個參數(shù)傳遞emit
        collector.emit(input, new Values(input.getString(0) + "levelUp!"));
        System.out.println(input.getString(0));
        // 輸入元組在最后一行被激活碰逸。這些是Storm的可靠性API的一部分,用于保證不會丟失數(shù)據(jù)
        collector.ack(input);
    }

    //declareOutputFields方法聲明ExclamationBolt發(fā)出1元組岳链,其中一個字段稱為“word”花竞。
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

//如果implements IRichBol
//還需要重寫下面兩個方法
//當Bolt被關(guān)閉時調(diào)用cleanup方法,并且應該清除所有打開的資源掸哑。
//無法保證在集群上調(diào)用此方法:例如约急,如果任務正在運行的計算機爆炸,則無法調(diào)用該方法苗分。
 @Override
    public void cleanup() {
    }
//getComponentConfiguration方法允許配置此組件運行方式的各個方面    
@Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
//但是一般情況下我們不需要這兩個方法厌蔽,所以我們可以通過繼承BaseRichBolt來定義Bolt

定義調(diào)用類

package com.test.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.shade.org.apache.jute.Utils;
import org.apache.storm.topology.TopologyBuilder;


/**
 * @author lillcol
 * 2019/7/18-12:03
 */
public class SimpleTopology {
    public static void main(String[] args) throws Exception {
        SimpleTopology topology = new SimpleTopology();
        topology.runLocal(60);

    }

    public void runLocal(int waitSeconds) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //第一個參數(shù)是給Spout一個id "words", 
        //第二個參數(shù)是要調(diào)用的Spout類
        //第三個參數(shù)是節(jié)點所需的并行度摔癣,是可選的奴饮。它指示應在群集中執(zhí)行該組件的線程數(shù)。如果省略它择浊,Storm將只為該節(jié)點分配一個線程戴卜。
        topologyBuilder.setSpout("words", new TestWordSpout(), 1);
        //Bolt的參數(shù)與Spout
        //只是要通過shuffleGrouping 指定數(shù)據(jù)來源"words")
        //“shuffle grouping”意味著元組應該從輸入任務隨機分配到bolt的任務中。
        topologyBuilder.setBolt("DoubleAndTripleBolt1", new ExclamationBolt(), 1)
                .shuffleGrouping("words");
        //一個Bolt可以接收多個數(shù)據(jù)來源琢岩,是要多次調(diào)用shuffleGrouping即可       
        topologyBuilder.setBolt("DoubleAndTripleBolt2", new ExclamationBolt(), 1)
                .shuffleGrouping("DoubleAndTripleBolt1")
                .shuffleGrouping("words");

        //loacl 測試
        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word_count", config, topologyBuilder.createTopology());

        org.apache.storm.utils.Utils.sleep(1000*10);
        cluster.killTopology("word_count");
        cluster.shutdown();
    }
}

運行結(jié)果:
study
study
studylevelUp!
study
study
studylevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
hellow
hellow
hellowlevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
. . .

異常

可能出現(xiàn)異常1:

java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at java.lang.Class.getMethod0(Class.java:3018)
    at java.lang.Class.getMethod(Class.java:1784)
    at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
    at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more
Error: A JNI error has occurred, please check your installation and try again

這個是因為在sbt構(gòu)建的時候  % "provided" 意思是已提供相關(guān)jar投剥,但是我們idea測試的時候并沒有相關(guān)jar
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"

所以不能用上面的語句,改成下面的即可
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"

maven 對應著改就可以了

可能出現(xiàn)異常2:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
    at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
    at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
    at org.apache.storm.LocalCluster.<clinit>(LocalCluster.java:128)
    at com.test.storm.SimpleTopology.runLocal(SimpleTopology.java:28)
    at com.test.storm.SimpleTopology.main(SimpleTopology.java:16)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
    at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
    ... 7 more
    
錯誤報的很明顯
log4j-over-slf4j.jar AND slf4j-log4j12.jar  沖突了
我的解決辦法是在測試的時候隨便刪掉一個担孔,但是生產(chǎn)的時候在可能沖突的依賴中把它去掉

Storm 的 的hellow word(word count)
//定義Spout WordReader
package com.test.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-9:17
 */
public class WordReader extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;

    /**
     * open方法江锨,接收三個參數(shù):
     * 第一個是創(chuàng)建Topology的配置吃警,
     * 第二個是所有的Topology數(shù)據(jù)
     * 第三個是用來把Spout的數(shù)據(jù)發(fā)射給bolt
     **/
    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        try {
            //獲取創(chuàng)建Topology時指定的要讀取的文件路徑
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["
                    + conf.get("wordFile") + "]");
        }
        //初始化發(fā)射器
        this.collector = collector;
    }

    /**
     * nextTuple是Spout最主要的方法:
     * 在這里我們讀取文本文件,并把它的每一行發(fā)射出去(給bolt)
     * 這個方法會不斷被調(diào)用啄育,為了降低它對CPU的消耗酌心,當任務完成時讓它sleep一下
     **/
    @Override
    public void nextTuple() {
//如果要看到tail效果,去掉這個  if (completed) 語句塊挑豌,我測試的時候不去掉看不到效果只有報錯
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return;
        }
        String str;
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        try {
            while ((str = bufferedReader.readLine()) != null) {
                //發(fā)送一行
                collector.emit(new Values(str), str);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            completed = true;
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

//定義Bolt WordSplit 實現(xiàn)切割
package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-9:38
 */
public class WordSplit implements IRichBolt {
    private OutputCollector collector;


    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    /**
     * execute是bolt中最重要的方法:
     * 當接收到一個tuple時安券,此方法被調(diào)用
     * 這個方法的作用就是把接收到的每一行切分成單個單詞,并把單詞發(fā)送出去(給下一個bolt處理)
     **/
    @Override
    public void execute(Tuple input) {
        String line = input.getString(0);
        String[] words = line.split(",| |\\|");
        for (String word : words) {
            word = word.trim();
            if (!word.isEmpty()) {
                List a = new ArrayList();
                a.add(input);
                collector.emit(a, new Values(word));
            }
        }
        collector.ack(input);
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

//定義Bolt WordCounter 實現(xiàn)統(tǒng)計
package com.test.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * @author lillcol
 * 2019/7/19-10:01
 */
public class WordCounter implements IRichBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void execute(Tuple input) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            counters.put(str, counters.get(str) + 1);
        }
//如果要看到tail效果浮毯,應該在這里打印統(tǒng)計信息
// System.out.println(str+":"+counters.get(str) );
        collector.ack(input);
    }

//這里只是最后一次打印完疫,要tail效果不應該在這里打印統(tǒng)計信息泰鸡。
    @Override
    public void cleanup() {
        System.out.println("--Word Counter [" + name + "-" + id + "] --");
        for (Map.Entry<String, Integer> entry : counters.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
        }
        counters.clear();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

//定義主類
package com.test.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

/**
 * @author lillcol
 * 2019/7/19-10:33
 */
public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordReader",new WordReader(),1);
        topologyBuilder.setBolt("WordSplit",new WordSplit(),1)
                .shuffleGrouping("wordReader");
        topologyBuilder.setBolt("",new WordCounter(),2)
                .shuffleGrouping("WordSplit");

        //配置
        Config config = new Config();
        config.put("wordsFile","D:\\stromFile");
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //創(chuàng)建一個本地模式cluster
        LocalCluster localCluster = new LocalCluster();
        //提交Topology
        localCluster.submitTopology("WordCountTopology",config,topologyBuilder.createTopology());

        Thread.sleep(2000);//這個時間要控制好债蓝,太短看不到效果
        localCluster.shutdown();

    }
}
//輸出結(jié)果
11:35:16.845 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[2, 2]
11:35:16.845 [Thread-37--executor[2, 2]] INFO  o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-2] --
Thread[SLOT_1027:73
40673ms:1
11:34:31.865:1
30724ms:1
11:34:23.065:1
11:34:27.365:1
. . .


11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shut down executor :[2, 2]
11:35:16.846 [SLOT_1024] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor :[1, 1]
11:35:16.846 [Thread-38--executor[1, 1]] INFO  o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-1] --
Thread[SLOT_1027:87
11:34:31.465:1
29524ms:1
26024ms:1
. . . 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市盛龄,隨后出現(xiàn)的幾起案子饰迹,更是在濱河造成了極大的恐慌,老刑警劉巖余舶,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啊鸭,死亡現(xiàn)場離奇詭異,居然都是意外死亡匿值,警方通過查閱死者的電腦和手機赠制,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挟憔,“玉大人钟些,你說我怎么就攤上這事“硖罚” “怎么了政恍?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長达传。 經(jīng)常有香客問我篙耗,道長,這世上最難降的妖魔是什么宪赶? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任宗弯,我火速辦了婚禮,結(jié)果婚禮上搂妻,老公的妹妹穿的比我還像新娘蒙保。我一直安慰自己,他們只是感情好叽讳,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布追他。 她就那樣靜靜地躺著坟募,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邑狸。 梳的紋絲不亂的頭發(fā)上懈糯,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天,我揣著相機與錄音单雾,去河邊找鬼赚哗。 笑死,一個胖子當著我的面吹牛硅堆,可吹牛的內(nèi)容都是我干的屿储。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼渐逃,長吁一口氣:“原來是場噩夢啊……” “哼够掠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起茄菊,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤疯潭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后面殖,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竖哩,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年脊僚,在試婚紗的時候發(fā)現(xiàn)自己被綠了相叁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡辽幌,死狀恐怖增淹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情舶衬,我是刑警寧澤埠通,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站逛犹,受9級特大地震影響端辱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜虽画,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一舞蔽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧码撰,春花似錦渗柿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽颊亮。三九已至,卻和暖如春陨溅,著一層夾襖步出監(jiān)牢的瞬間终惑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工门扇, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留雹有,地道東北人。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓臼寄,卻偏偏與公主長得像霸奕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子吉拳,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355