Storm Trident 之 Trident API介紹

Trident是什么

Trident是Storm上的高層次抽象平夜,它能夠在提供高吞吐量的能力同時(shí)(每秒幾百萬(wàn)消息),也提供了有狀態(tài)的流式處理和低延遲分布式查詢的能力哄陶。它類似Pig這種高級(jí)批處理工具颜说,Trident提供了joins喷楣、aggregations、grouping恩伺、functions以及filters等功能函數(shù)赴背。

Trident主要提供了以下功能:

  • 常見的流分析操作,比如join晶渠、aggregation等凰荚。具體就是Trident提供的API操作。
  • 一次性處理語(yǔ)義(exactly-once)褒脯。
  • 事務(wù)數(shù)據(jù)存儲(chǔ)(transaction)便瑟。

Trident核心數(shù)據(jù)模型是一系列批處理(Batch)的流,也就是說(shuō)雖然Storm Trident處理的是Stream番川,但是處理過(guò)程中Trident將Stream分隔成Batch來(lái)進(jìn)行處理到涂。

Batch

所以Stream會(huì)被切分成一個(gè)個(gè)Batch分布到集群中,所有應(yīng)用在Stream上的函數(shù)都會(huì)具體應(yīng)用到每個(gè)節(jié)點(diǎn)的Batch上中爽彤,來(lái)實(shí)現(xiàn)并行計(jì)算养盗。

為什么使用Trident

Storm Topology適合一些無(wú)統(tǒng)計(jì)、不需要Transaction(事務(wù))的應(yīng)用适篙,比如過(guò)濾往核、清洗數(shù)據(jù)等場(chǎng)景。Topology在開啟Ack的情況下嚷节,能夠保證數(shù)據(jù)不丟失但可能重復(fù)聂儒。
而Trident適合需要嚴(yán)格不丟不重復(fù)消息的場(chǎng)景虎锚,比如交易額統(tǒng)計(jì)。Trident通過(guò)事務(wù)來(lái)實(shí)現(xiàn)eactly-once衩婚,保證數(shù)據(jù)不丟不重復(fù)窜护。但同時(shí),使用Trident會(huì)使其性能有所下降非春。

Triden API

Trident API可以分為Spout操作和Bolt操作柱徙,對(duì)于Bolt操作提供常見的流數(shù)據(jù)分析操作。
Bolt Trident提供了五種類型的操作:

  • 本地分區(qū)操作(Partition-local operations)奇昙,操作應(yīng)用到本地每個(gè)分區(qū)上护侮,這部分操作不會(huì)產(chǎn)生網(wǎng)絡(luò)傳輸。
  • 重分區(qū)操作储耐,對(duì)數(shù)據(jù)流進(jìn)行重新分區(qū)羊初,但是不會(huì)改變數(shù)據(jù)內(nèi)容,這部分操作會(huì)有網(wǎng)絡(luò)傳輸什湘。
  • 聚合操作长赞,這部分操作會(huì)有網(wǎng)絡(luò)傳輸。
  • 流分組操作闽撤。
  • 合并(meger)和連接(join)操作得哆。

Trident Spout

Trident與Storm topology一樣也是使用Spout作為Trident拓?fù)涞臄?shù)據(jù)源。Trident Spout提供了更復(fù)雜的API腹尖,因?yàn)樗瓤梢垣@取事務(wù)數(shù)據(jù)源柳恐,也可以獲取非事務(wù)數(shù)據(jù)源。

對(duì)于非事務(wù)的Spout热幔,可以使用普通的Storm IRichSpout接口:

TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid",new RichSpout());

Trident拓?fù)渲欣稚瑁蠸pout都需要指定一個(gè)唯一的流的標(biāo)識(shí),比如這里的“myspoutid”(整個(gè)集群級(jí)別的唯一標(biāo)識(shí))绎巨。Trident使用該唯一標(biāo)識(shí)存儲(chǔ)Spout元數(shù)據(jù)近尚,比如txId(事務(wù)ID)以及其它Spout相關(guān)的信息。
可以通過(guò)如下配置场勤,來(lái)配置Zookeeper保存Spout的元數(shù)據(jù)戈锻。

transactional.zookeeper.servers:Zookeeper主機(jī)列表
transactional.zookeeper.port:Zookeeper集群端口
transactional.zookeeper.root:在Zookeeper存儲(chǔ)元數(shù)據(jù)的根目錄

下面是Trident Spout一些類型:

  • ITridentSpout:最通用API接口,可以支持事務(wù)和不透明事務(wù)語(yǔ)義和媳。一般會(huì)用這個(gè)API分區(qū)的特性格遭,而不是直接使用該接口。
  • IBatchSpout:非事務(wù)Spout留瞳,每次發(fā)射一個(gè)Batch的元組拒迅。
  • IPartitionedTridentSpout:事務(wù)Spout,從分區(qū)數(shù)據(jù)源讀取數(shù)據(jù),比如Kafka集群璧微。
  • IOpaquePartitionedTridentSpout:不透明事務(wù)Spout作箍,從分區(qū)數(shù)據(jù)源中讀取數(shù)據(jù)。

本地分區(qū)操作

本地分區(qū)操作不會(huì)產(chǎn)生網(wǎng)絡(luò)傳輸前硫,并且會(huì)獨(dú)立的應(yīng)用到batch的每個(gè)分區(qū)上胞得。

函數(shù)操作(Functions)

函數(shù)用于接受一個(gè)tuple,并且指定接收這個(gè)tuple的哪些field屹电,它會(huì)發(fā)射(emit)0個(gè)或多個(gè)tuple阶剑。輸出的tuple feild會(huì)被追加到原始tuple的后面,如果不輸出tuple就意味著這個(gè)tuple被過(guò)濾掉了危号。比如下面的實(shí)例:

class MyFunction extends BaseFunction {
        /**
         * 在每個(gè)元組上面執(zhí)行該邏輯函數(shù)个扰,并且發(fā)射0個(gè)或多個(gè)元組
         *
         * @param tuple     傳入的元組
         * @param collector 用于發(fā)射元組的收集器實(shí)例
         */
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            //tuple.getInteger(0)接收第一個(gè)Field
            for(int i=0; i< tuple.getInteger(0); i++) {
                collector.emit(new Values(i));
            }
        }
    }

假設(shè)我們有一個(gè)“mystream”流,其中每個(gè)tuple包含以下filed ['a','b','c']葱色,比如有以下元組。

[1,2,3]
[2,1,2]
[10,0,1]

我們將每個(gè)元組都經(jīng)過(guò)以下MyFunction操作:

//將每個(gè)tuple的字段"b"應(yīng)用于MyFunction娘香,并且產(chǎn)生新字段"d"追加到原tuple的字段中
mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))

得到數(shù)據(jù)為:

//[1,2,3]的emit苍狰,其中0和1是新字段“d”
[1,2,3,0]
[1,2,3,1]
//[2,1,2]的emit,其中0是新字段“d”
[2,1,2,0]
//[10,0,1]不滿足需求烘绽,過(guò)濾掉了

過(guò)濾操作(Filters)

接收一個(gè)tuple淋昭,并決定這個(gè)tuple是否應(yīng)該被保留。比如我們有以下Filter操作:

class MyFilter extends BaseFilter {
        /**
         * 確定是否應(yīng)該從流中過(guò)濾元組
         *
         * @param tuple 被評(píng)估的元組
         * @return 返回"false"則該元組被拋棄安接,返回"true"則該元組被保留
         */
        @Override
        public boolean isKeep(TridentTuple tuple) {
            //每個(gè)tuple中的第一個(gè)Field
            return tuple.getInteger(0) > 1;
        }
    }

同樣以Function中的實(shí)例進(jìn)行操作:

mystream.filter(new MyFilter())翔忽。

輸出數(shù)據(jù):

#[1,2,3]中的第一個(gè)字段不大于1,所以被過(guò)濾掉
[2,1,2]
[10,0,1]

map和flatMap操作

map接收一個(gè)tuple盏檐,將其作用在map函數(shù)上歇式,并且返回經(jīng)map函數(shù)處理過(guò)的tuple字段值。
比如下面的實(shí)例:

class UpperMap implements MapFunction {
        /**
         * 流中的每個(gè)trident元組調(diào)用
         *
         * @param input 接受trident tuple
         * @return 返回轉(zhuǎn)換之后的值
         */
        @Override
        public Values execute(TridentTuple input) {
            //只返回原tuple中第一個(gè)Filed的大寫字符串(其它filed被丟棄了)
            return new Values(input.getString(0).toUpperCase());
        }
    }

flatMap類似map胡野,但是它會(huì)分兩步執(zhí)行:執(zhí)行flat將所有元素展開材失,然后每個(gè)元素使用map函數(shù)。比如[[1,2],3,4]經(jīng)過(guò)flat操作后得到元素集合為[1,2,3,4]硫豆。比如我們有以下實(shí)例:

class SplitFlatMap implements FlatMapFunction {
        /**
         * 流中的每個(gè)trident元組調(diào)用
         *
         * @param input 接收的trident tuple
         * @return 一個(gè)可迭代的結(jié)果集
         */
        @Override
        public Iterable<Values> execute(TridentTuple input) {
            List<Values> resultValues = new ArrayList<>();
            //獲取一個(gè)Filed并將其以空格作為切割
            for(String word : input.getString(0).split(" ")){
                resultValues.add(new Values(word));
            }
            return resultValues;
        }
    }

我們通過(guò)上面的flatMap和Map就可以得到一個(gè)流的所有大寫詞組流了龙巨。

mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。

通常我們也可以將map或flatMap的輸出結(jié)果命名一個(gè)新字段:

mystream.flatMap(new SplitFlatMap(),new Fields("word"))

peek操作

peek操作一般用來(lái)debug熊响,比如查看上一步的操作結(jié)果旨别。假如我們有以下peek操作。

class PrintPeek implements Consumer {
        /**
         * 對(duì)于輸入的每個(gè)trident元組應(yīng)用以下操作
         *
         * @param input 接收的trident 元組
         */
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getString(0));
        }
    }

以下處理操作汗茄,能把轉(zhuǎn)換大寫之后的tuple打印打出來(lái):

mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());

min和minBy操作

返回一批(Batch)元組中的每個(gè)分區(qū)的最小值秸弛。
比如一批(Batch)元組有以下三個(gè)partition,它們對(duì)應(yīng)的Field為['device-id','count']。

Partiton 0:
[213,15]
[125,21]
[100,10]

Partition 1:
[123,20]
[215,32]
[183,25]

針對(duì)以上數(shù)據(jù)統(tǒng)計(jì)count最小的device-id:

mystream.minBy(new Fields("count"));

返回結(jié)果:

Partition 0:
[100,10]

Partition 1:
[123,20]

除了以上使用方式胆屿,我們還可以通過(guò)傳入比較器來(lái)使用min和minBy:

public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) 
public Stream min(Comparator<TridentTuple> comparator)
max和maxBy操作

max和maxBy操作同min/minBy操作奥喻,只不過(guò)返回最大值。

mystream.maxBy(new Fields("count"));

上面實(shí)例輸出結(jié)果為:

Partition 0:
[125,21]
Partition 1:
[215,32]

max和maxBy也提供了自定義比較器的方法:

public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) 
public Stream max(Comparator<TridentTuple> comparator) 

窗口操作(Window)

Trident流能夠處理具有相同窗口的元素非迹,對(duì)它們進(jìn)行聚合操作环鲤,然后將聚合結(jié)果向下發(fā)送。Storm支持兩種窗口操作:翻滾窗口(Tumbing window)和滑動(dòng)窗口(Sliding window)憎兽。

Tumbing window

元組根據(jù)處理時(shí)間或計(jì)數(shù)分組到一個(gè)窗口中冷离,任何元組只屬于其中一個(gè)窗口。

//返回一個(gè)元組流的聚合結(jié)果纯命,它是滾動(dòng)窗口內(nèi)每windowCount個(gè)數(shù)的聚合結(jié)果
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個(gè)元組流的聚合結(jié)果西剥,這些元組是一個(gè)窗口的聚合結(jié)果,該窗口在windowDuration的持續(xù)時(shí)間內(nèi)滾動(dòng)
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window

元組在每個(gè)滑動(dòng)間隔的窗口內(nèi)分組亿汞,一個(gè)元組可能屬于多個(gè)窗口瞭空。

//返回一個(gè)元組流的聚合結(jié)果,它是滑動(dòng)窗口每windowCount個(gè)元組樹的聚合結(jié)果疗我,并在slideCount之后滑動(dòng)窗口
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個(gè)元組流的聚合結(jié)果咆畏,該窗口在slidingInterval持續(xù)滑動(dòng),并在windowDuration處完成一個(gè)窗口
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common window

除了上面提供的滾動(dòng)窗口api和滑動(dòng)窗口api吴裤,Trident還提供了公用窗口api旧找,通過(guò)windowConfig可以支持任意窗口。

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)

Trident window api需要使用WindowsStoreFactor存儲(chǔ)接收到的元組和聚合值麦牺。目前钮蛛,Trident提供了HBaseWindowsStoreFactor的HBase實(shí)現(xiàn)。

partitionAggregate操作

partitionAggregate對(duì)一批(batch)元組的每個(gè)分區(qū)進(jìn)行聚合剖膳,與前面Function在元組后面追加不同F(xiàn)ield不同魏颓,partitionAggregate會(huì)使用發(fā)射出去的元組替換接收進(jìn)來(lái)的元組。比如以下實(shí)例:
比如有以下數(shù)據(jù)吱晒,對(duì)應(yīng)的Field分別為["a","b"]:

Partition 0:
["a":1]
["b":2]

Partition 1:
["c":2]
["d":2]

使用partitionAggregate進(jìn)行求和:

mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))

經(jīng)過(guò)partitionAggregate函數(shù)之后的結(jié)果為:

Partition 0:
["sum":3]

Partition 1:
["sum":4]

Trident API提供了三個(gè)聚合器接口:CombinerAggregator琼开、ReducerAggregator和Aggregator。

CombinerAggregator操作

CombinerAggregator只返回單個(gè)tuple枕荞,并且這個(gè)tuple只包含一個(gè)Field柜候。每個(gè)元組首先都經(jīng)過(guò)init函數(shù)進(jìn)行預(yù)處理,然后在執(zhí)行combine函數(shù)來(lái)計(jì)算接受到的tuple躏精,直到最后一個(gè)tuple到達(dá)渣刷。如果分區(qū)內(nèi)沒(méi)有tuple,則會(huì)通過(guò)zero函數(shù)發(fā)射結(jié)果矗烛。

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

比如以下實(shí)例:

class Count implements CombinerAggregator<Long> {
        @Override
        public Long init(TridentTuple tuple) {
            //計(jì)數(shù)辅柴,每個(gè)tuple代表一個(gè)數(shù)
            return 1L;
        }
        @Override
        public Long combine(Long val1, Long val2) {
            return val1 + val2;
        }
        @Override
        public Long zero() {
            return 0L;
        }
    }
ReducerAggregator操作

ReducerAggregator通過(guò)init方法提供一個(gè)初始值箩溃,然后每個(gè)輸入的tuple迭代這個(gè)值,最后產(chǎn)生一個(gè)唯一的tuple輸出碌嘀。

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

比如同樣使用RecuerAggregator來(lái)實(shí)現(xiàn)計(jì)數(shù)器:

 class  Count implements ReducerAggregator<Long> {
        @Override
        public Long init() {
            return 0L;
        }

        @Override
        public Long reduce(Long curr, TridentTuple tuple) {
            return curr + 1;
        }
    }
Aggregator

執(zhí)行聚合操作最通用的接口就是Aggregator了涣旨,它能夠發(fā)射任意數(shù)量的元組,每個(gè)元組可以包含任意數(shù)量的字段股冗。

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

它的執(zhí)行流程是:

  1. 在處理Batch之前調(diào)用init方法霹陡,它返回一個(gè)聚合的狀態(tài)值,傳遞給aggregate和complete方法止状。
  2. 為批處理分區(qū)中的每個(gè)tuple調(diào)用aggregate方法烹棉,此方法可以更新狀態(tài)值,也可以發(fā)射元組怯疤。
  3. 當(dāng)aggregator處理完Batch分區(qū)的所有元組后調(diào)用complete方法浆洗。

使用Aggregator來(lái)實(shí)現(xiàn)計(jì)數(shù)器:

class CountAgg extends BaseAggregator<CountAgg.CountState> {
        class CountState{
            long count = 0;
        }

        @Override
        public CountState init(Object batchId, TridentCollector collector) {
            return new CountState();
        }

        @Override
        public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
            state.count += 1;
        }

        @Override
        public void complete(CountState state, TridentCollector collector) {
            collector.emit(new Values(state.count));
        }
    }

狀態(tài)查詢(stateQuery)和分區(qū)持久化(partitionPersist)

stateQuery用于查詢狀態(tài)源,partitionPersist用于更新狀態(tài)源集峦。具體使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html

投影(projection)操作

projection操作用于只保留指定的字段伏社,比如元組有字段["a","b","c","d"],通過(guò)以下投影操作塔淤,輸出流只會(huì)包含["c","d"]洛口。

mystream.projection(new Fields("c","d"));

重分區(qū)操作

Repartition操作運(yùn)行一個(gè)函數(shù)來(lái)改變?cè)M在任務(wù)之間的分布,調(diào)整分區(qū)數(shù)也可能會(huì)導(dǎo)致Repartition操作凯沪。重分區(qū)操作會(huì)引發(fā)網(wǎng)絡(luò)傳輸。下面是重分區(qū)的相關(guān)函數(shù):

  • shuffle:使用隨機(jī)算法來(lái)均衡tuple到每個(gè)分區(qū)买优。
  • broadcast:每個(gè)tuple被廣播到所有分區(qū)上妨马,使用DRPC時(shí)使用這種方法比較多,比如每個(gè)分區(qū)上做stateQuery杀赢。
  • global:所有tuple都發(fā)送到一個(gè)分區(qū)上烘跺,這個(gè)分區(qū)用來(lái)處理stream。
  • batchGlobal:一個(gè)batch中的所有tuple會(huì)發(fā)送到一個(gè)分區(qū)中脂崔,不同batch的元組會(huì)被發(fā)送到不同分區(qū)上滤淳。
  • partition:通過(guò)一個(gè)自定義的分區(qū)函數(shù)來(lái)進(jìn)行分區(qū),這個(gè)自定義函數(shù)需要實(shí)現(xiàn)org.apache.storm.grouping.CustomStreamGrouping砌左。

聚合操作

Trident提供了aggregate和persistentAggregate方法脖咐,aggregate運(yùn)行在每個(gè)batch中,而persistentAggregate將聚合所有Batch汇歹,并將結(jié)果保存在一個(gè)狀態(tài)源上屁擅。
我們前面講的aggregate、CombinerAggregator和ReducerAggregator運(yùn)行在patitionAggregation上是本地分區(qū)操作产弹。如果直接作用于流上派歌,則是對(duì)全局進(jìn)行聚合。
在對(duì)全局流進(jìn)行聚合時(shí),Aggregator和ReducerAggregator會(huì)首先重分區(qū)到一個(gè)單分區(qū)胶果,然后在該分區(qū)上執(zhí)行聚合函數(shù)匾嘱。而CombinerAggregator則會(huì)首先聚合每個(gè)分區(qū),然后重分區(qū)到單個(gè)分區(qū)早抠,在網(wǎng)絡(luò)傳輸中完成聚合操作霎烙。所以我們應(yīng)該盡量用CombinerAggregator,因?yàn)樗痈咝А?/p>

mystream.aggregate(new Count(),new Fields("count"));

流分組操作

groupBy操作會(huì)重新分區(qū)流贝或,對(duì)指定字段執(zhí)行partitionBy操作吼过,指定字段相同的元組被劃分到相同的分區(qū)。goupBy操作如下圖:

Group

如果在流分組中運(yùn)行聚合器咪奖,聚合會(huì)在每個(gè)group中運(yùn)行盗忱,而不是對(duì)整個(gè)Batch操作。

合并和連接

Trident可以允許我們將不同流組合在一起羊赵,通過(guò)TridentTopology.merge()方法操作趟佃。

//合并流會(huì)以第一個(gè)流的輸出字段來(lái)命名
topology.mege(stream1,stream2,stream3);

另一種合并流的方式是連接,類似于SQL那樣的連接昧捷,要求輸入是有限的闲昭。所以Trident的join只適用于來(lái)自Spout的每個(gè)小Bath之間。
比如有一個(gè)流包含["key1","val1","val2"]靡挥,另一個(gè)流包含["key2","val1","val2"]序矩,通過(guò)以下連接操作:

//Trident需要join之后的流重新命名,因?yàn)檩斎肓骺赡艽嬖谥貜?fù) 字段跋破。
mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c"簸淀,"d"))
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市毒返,隨后出現(xiàn)的幾起案子租幕,更是在濱河造成了極大的恐慌,老刑警劉巖拧簸,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劲绪,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡盆赤,警方通過(guò)查閱死者的電腦和手機(jī)贾富,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)牺六,“玉大人祷安,你說(shuō)我怎么就攤上這事⊥闷颍” “怎么了汇鞭?”我有些...
    開封第一講書人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵凉唐,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我霍骄,道長(zhǎng)台囱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任读整,我火速辦了婚禮簿训,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘米间。我一直安慰自己强品,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開白布屈糊。 她就那樣靜靜地躺著的榛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逻锐。 梳的紋絲不亂的頭發(fā)上夫晌,一...
    開封第一講書人閱讀 51,165評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音昧诱,去河邊找鬼晓淀。 笑死,一個(gè)胖子當(dāng)著我的面吹牛盏档,可吹牛的內(nèi)容都是我干的凶掰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蜈亩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼懦窘!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起勺拣,我...
    開封第一講書人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鱼填,沒(méi)想到半個(gè)月后药有,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡苹丸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年愤惰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赘理。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宦言,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出商模,到底是詐尸還是另有隱情奠旺,我是刑警寧澤蜘澜,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站响疚,受9級(jí)特大地震影響鄙信,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜忿晕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一装诡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧践盼,春花似錦鸦采、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至谅河,卻和暖如春咱旱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绷耍。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工吐限, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人褂始。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓诸典,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親崎苗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狐粱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容