storm筆記:Trident狀態(tài)

storm筆記:Trident應(yīng)用中說了下Trident的使用弱判,這里說下Trident幾種狀態(tài)的變化及其對應(yīng)API的使用。

本文內(nèi)容來自Trident State锥惋,部分內(nèi)容根據(jù)實際情況做出修改昌腰。

Trident中有對狀態(tài)數(shù)據(jù)進(jìn)行讀取和寫入操作的一流抽象工具。狀態(tài)既可以保存在拓?fù)鋬?nèi)部膀跌,比如保存在內(nèi)容中并由HDFS存儲遭商,也可以通過外部存儲(比如Memcached或Cassandra)存儲在數(shù)據(jù)庫中。而對于Trident的API而言捅伤,這兩種機制沒有任何區(qū)別劫流。

Trident以容錯的方式管理狀態(tài),以便在重試或失敗時的狀態(tài)更新是冪等的。在大數(shù)據(jù)處理中祠汇,數(shù)據(jù)處理的冪等性是非常重要的一個指標(biāo)仍秤,這樣能夠保證每個消息即使處理了多次,結(jié)果也像是只處理了一次一樣座哩。

在進(jìn)行狀態(tài)更新時可能需要各種級別的容錯能力徒扶,在這之前,我們來看一個例子說明實現(xiàn)“恰好一次”語義所需的技巧根穷。比如谓着,正在對流中的數(shù)據(jù)進(jìn)行計數(shù)聚合操作,每次處理新的元組時滔金,都會將運行的計數(shù)結(jié)果存儲在數(shù)據(jù)庫中霉涨。

如果發(fā)生故障時,元組將重新執(zhí)行計數(shù)操作尘惧。這就會在執(zhí)行狀態(tài)更新時出現(xiàn)問題康栈,因為這個時候不知道是不是已經(jīng)更新過該元組狀態(tài)。也許還沒有處理該元組數(shù)據(jù)喷橙,這個時候就需要增加計數(shù)啥么。也許已經(jīng)處理該元組,并成功增加計數(shù)贰逾,但是在下一步的時候出現(xiàn)問題悬荣,這種情況下,就不應(yīng)該增加計數(shù)疙剑。也有可能是處理元組正常氯迂,更新計數(shù)是異常,這個時候就需要更新計數(shù)言缤。

所以說嚼蚀,如果只是在數(shù)據(jù)庫中存儲計數(shù)信息,就不知道元組是否已經(jīng)處理過管挟。因此轿曙,就需要更多的信息作為輔助。Trident提供了下面三個性質(zhì)僻孝,來實現(xiàn)“恰好一次”的處理:

  1. 元組都是以小批次處理
  2. 每批元組都會給出一個唯一ID拳芙,稱為事務(wù)ID(transaction id,txid)皮璧。如果批次重復(fù)處理舟扎,txid也會相同。
  3. 狀態(tài)的更新操作是按照元組批次的順序執(zhí)行的悴务。也就是說睹限,在批次2狀態(tài)更新成功之前譬猫,不會進(jìn)行批次3的狀態(tài)更新。

根據(jù)這些特性羡疗,就可以通過檢查到該元組的批次是否已被處理染服,并根據(jù)檢測結(jié)果采取適當(dāng)?shù)牟僮鞲聽顟B(tài)了。采取的具體操作取決于Spout的類型叨恨。Spout有三種類型:“非事務(wù)型(non-transactional)”柳刮,“事務(wù)型(transactional)”和“不透明事務(wù)型(opaque transactional)”。對應(yīng)的容錯能力也是三種:“非事務(wù)”痒钝,“事務(wù)”和“不透明事務(wù)”秉颗。下面來看看Spout的各個類型及對應(yīng)的容錯能力。

事務(wù)型Spout

Trident是按照批次發(fā)送元組進(jìn)行處理的送矩,每個批次的元組被賦予唯一的事務(wù)ID蚕甥。Spout的特性根據(jù)他們所提供容錯性保證機制來決定的,而且這種機制也會對每個批次發(fā)生作用栋荸。事務(wù)型Spout有如下特性:

  1. 每個批次的txid不變菇怀,對于一個特定的txid,重復(fù)執(zhí)行時晌块,它所包含的元組數(shù)據(jù)與第一次完全相同爱沟。
  2. 元組只會在一個批次出現(xiàn),不會重復(fù)(某個元組只會出現(xiàn)在一個批次中匆背,不會出現(xiàn)在多個批次中)呼伸。
  3. 每個元組都會出現(xiàn)一次(不會遺漏任何的元組數(shù)據(jù))

這是最簡單最容易理解的一種Spout類型,數(shù)據(jù)流被分割成固定的批次靠汁。storm中有與Kafka集成的事務(wù)型Spout的擴展蜂大,代碼在這里闽铐。

既然事務(wù)型Spout這么簡單易懂蝶怔,為什么不在Trident中完全使用事務(wù)型Spout呢?其實就在于它的容錯能力兄墅。比如踢星,TransactionalTridentKafkaSpout的工作方式是,同一個txid的批次中將包含kafka所有分區(qū)的元組隙咸。一旦某個批次發(fā)出后沐悦,出現(xiàn)異常,需要重新發(fā)出五督,就需要完全相同的元組集合才能滿足事務(wù)型Spout要求的語義藏否。但是這個時候,kafka某個節(jié)點異常(節(jié)點關(guān)閉或分區(qū)不可用)充包,就無法獲取完全相同的的一批元組副签,那整個拓?fù)渚蜁?yīng)為第3條語義(批次按順序執(zhí)行)停止遥椿。

這就是要有“不透明事務(wù)型”Spout的原因了,它能夠容忍數(shù)據(jù)源節(jié)點丟失淆储,而且又能保證數(shù)據(jù)恰好被操作一次冠场。

注:對kafka比較熟悉的應(yīng)該會想到,如果某一個topic支持復(fù)制本砰,那即使一個節(jié)點不可用碴裙,還會有其他復(fù)制節(jié)點頂上,那TransactionalTridentKafkaSpout也能夠避免上面的問題点额。

下面繼續(xù)看看如何設(shè)計一個支持恰好一次特性的“事務(wù)型”Spout語義(簡單的說就是同一個txid對應(yīng)的批次元組數(shù)據(jù)完全一致)的狀態(tài)實現(xiàn)舔株,這種狀態(tài)稱為“事務(wù)型狀態(tài)”。

比如咖楣,現(xiàn)在有一個單詞計數(shù)的拓?fù)涠桨剩枰獙卧~計數(shù)存儲在key/value數(shù)據(jù)庫中。key是單詞诱贿,value中包含單詞數(shù)量娃肿。另外,為了確定同一批次元組是否已經(jīng)被執(zhí)行珠十,需要將txid也一同存儲在value中料扰。這樣,當(dāng)需要更新單詞數(shù)量的時候焙蹭,先比較txid是否相同晒杈,如果相同,就跳過更新孔厉。如果不同拯钻,就更新計數(shù)。

考慮這個為什么它工作的例子撰豺。 假設(shè)您正在處理由以下批次元組組成的txid 3:

比如粪般,要處理一個txid是3的一批元組:

["man"]
["man"]
["dog"]

目前數(shù)據(jù)庫中存儲的數(shù)據(jù)為:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

在這個時候,發(fā)現(xiàn)“man”對應(yīng)的txid是1污桦,當(dāng)前的txid是3亩歹,就可以更新了。然后“dog”對應(yīng)的txid是3凡橱,說明同一批次的元組數(shù)據(jù)已經(jīng)發(fā)送過了小作,就不需要更新。從這點可以看出稼钩,txid是3的批次元組是重復(fù)發(fā)送的顾稀,在更新“dog”數(shù)量后,在更新“man”數(shù)量前坝撑,出現(xiàn)了錯誤静秆。最后的結(jié)果就是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

不透明事務(wù)型Spout

前面已經(jīng)提過氮块,不透明事務(wù)型Spout不能保證相同txid對應(yīng)的批次中的元組數(shù)據(jù)完全一致。其特點如下:

  1. 每個元組都會在有且僅有一個批次中處理成功诡宗。

[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有這種特性滔蝉,同時對kafka節(jié)點異常有很好的容錯性。OpaqueTridentKafkaSpout在發(fā)送一個批次元組的時候塔沃,會從上次成功之后的位置開始發(fā)送蝠引,這樣就能夠保證元組不會漏發(fā)或重發(fā)。

基于上面的特點蛀柴,不透明事務(wù)型Spout就不同通過txid來直接判斷是否可以跳過狀態(tài)更新螃概,因為具有相同txid的批次中元組可能發(fā)生了變化。

這就需要存儲更多的狀態(tài)信息了鸽疾,而不僅僅是一個結(jié)果和一個txid了吊洼,還需要存儲前一個結(jié)果值。

比如制肮,當(dāng)前批次的計數(shù)是2冒窍,需要進(jìn)行一次狀態(tài)更新,數(shù)據(jù)庫中的數(shù)據(jù)如下:

{
  "value": 4,
  "prevValue": 1,
  "txid": 2
}

如果當(dāng)前的txid是3豺鼻,與數(shù)據(jù)庫中的不同综液。在這種情況下,需要將prevValue的值該為value的值儒飒,value的值增加2谬莹,更新txid為3,最后的結(jié)果就是:

{
  "value": 6,
  "prevValue": 4,
  "txid": 3
}

如果當(dāng)前的txid是2桩了,等于數(shù)據(jù)庫中的txid附帽。因為txid相同,說明上一次txid為2的批次處理失敗井誉,但是本次的元組可能與上一次不同了蕉扮。這個時候,就需要使用本次數(shù)據(jù)覆蓋上次處理結(jié)果送悔。也就是說慢显,prevValue值不變爪模,value的值改為prevValue加2欠啤,txid不變,最后的結(jié)果如下:

{
  "value": 3,
  "prevValue": 1,
  "txid": 2
}

這種方式的可行性依賴于Trident的強順序性屋灌。也就是說洁段,一旦開始處理一個新的批次,就不會重復(fù)執(zhí)行上一個批次共郭。不透明事務(wù)型Spout保證了不同批次之間沒有重復(fù)的情況祠丝,也就是每個元組只會在一個批次中處理成功疾呻,所以就可以放心的使用前一個值與當(dāng)前值覆蓋已存數(shù)據(jù)了。

非事務(wù)型Spout

非事務(wù)型Spout不能為批次提供任何保證写半。所以可能出現(xiàn)"至多一次"的處理岸蜗,即在某個批次處理過程中失敗了,但是不會在重新處理叠蝇;也可能提供“至少一次”的處理璃岳,即可能會有多個批次分別處理某個元組。也就是沒有辦法實現(xiàn)“恰好一次”的語義悔捶。

不同類型spout和狀態(tài)總結(jié)

下面是不同的spout/狀態(tài)組合是否支持“恰好一次”處理語義:

Spouts vs States

不透明事務(wù)狀態(tài)有最強的容錯性铃慷,但是因為存儲txid和兩個結(jié)果帶來更大的開銷。事務(wù)型狀態(tài)只需要存儲一個狀態(tài)結(jié)果蜕该,但是只對事務(wù)型Spout有效犁柜。非事務(wù)型狀態(tài)要求存儲的數(shù)據(jù)更少,但是不能實現(xiàn)“恰好一次”的處理語義堂淡。

所以在選擇容錯與存儲空間中馋缅,需要根據(jù)具體的需要選擇合適的組合。

狀態(tài)API

根據(jù)前面來看绢淀,“恰好一次”語義的原理有些復(fù)雜股囊,但是作為用戶,并不需要了解這些txid對比更啄、多值存儲稚疹,因為Trident已經(jīng)在State中封裝了所有容錯處理邏輯,只需要想下面著用攜帶碼就行:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
                .parallelismHint(6);

所有的不透明事務(wù)狀態(tài)邏輯已經(jīng)封裝在MemcachedState.opaque中祭务,另外内狗,狀態(tài)更新會自動調(diào)整為批次操作,這樣可以減少與數(shù)據(jù)庫之間反復(fù)交互帶來的資源浪費义锥。

基本的State接口只有兩個方法:

public interface State {
    void beginCommit(Long txid); // 對于像DRPC流發(fā)生的partitionPersist這樣的事情柳沙,可以是null
    void commit(Long txid);
}

前面已經(jīng)說過,狀態(tài)更新開始和結(jié)束時都會獲取txid拌倍。Trident并不關(guān)心狀態(tài)如何操作赂鲤,使用哪種方式更新,使用哪種方式讀取柱恤。

假如有一個包含用戶地址信息的定制數(shù)據(jù)庫数初,需要使用Trident與數(shù)據(jù)庫交互,State擴展類中包含對于用戶信息的getter和setter方法:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocation(long userId, String location) {
        // 向數(shù)據(jù)庫設(shè)置地址信息
    }

    public String getLocation(long userId) {
        // 從數(shù)據(jù)庫中獲取地址信息
    }
}

然后就需要一個StateFactory來創(chuàng)建Trident所需的State對象梗顺,LocationDB所需的StateFactory大體結(jié)構(gòu)如下:

public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
        return new LocationDB();
    }
}

Trident提供了用于查詢狀態(tài)源的QueryFunction接口泡孩,以及更新狀態(tài)源的StateUpdater接口。比如寺谤,查詢LocationDB中用戶信息的QueryLocation

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));

QueryLocation的代碼如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for (TridentTuple input : inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

QueryFunction操作分為兩步:首先仑鸥,Trident會將收集到的數(shù)據(jù)放在一個批次中吮播,發(fā)送給batchRetrieve方法。在這個例子中眼俊,batchRetrieve方法收到的是一些用戶id意狠。batchRetrieve會返回一組與輸入元組長度相同的結(jié)果。輸入元組與輸出結(jié)果中各個元素是彼此對應(yīng)的疮胖。

從這點來看摄职,上面的LocationDB類并沒有發(fā)揮Trident批處理優(yōu)勢,所以需要盡心改造:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
        // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
        // get locations in bulk
    }
}

對應(yīng)的QueryLocation類如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for (TridentTuple input : inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

這段代碼大幅減少了數(shù)據(jù)庫操作获列。

對于更新狀態(tài)谷市,可以使用StateUpdater接口。比如下面的更新操作:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for (TridentTuple t : tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

對應(yīng)的更新操作拓?fù)渲芯涂梢允沁@樣:

TridentTopology topology = new TridentTopology();
TridentState locations =
        topology.newStream("locations", locationsSpout)
                .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());

partitionPersist方法會更新狀態(tài)击孩,StateUpdater接口接收一批元組和狀態(tài)信息迫悠,然后更新狀態(tài)。上面的LocationUpdater類中僅僅是從元組中抓取用戶id和地址信息巩梢,然后對狀態(tài)執(zhí)行批量處理创泄。然后,partitionPersist會返回一個表示更新狀態(tài)后的TridentState對象括蝠。隨后就可以在拓?fù)涞钠渌胤绞褂?code>stateQuery方法查詢狀態(tài)鞠抑。

StateUpdaterupdateState方法中有一個TridentCollector參數(shù),這個對象是可以將發(fā)送進(jìn)來的元組發(fā)送到一個新的數(shù)據(jù)流中忌警。在這個例子中沒有用到搁拙。如果需要進(jìn)行比如向數(shù)據(jù)庫更新計數(shù)值的后續(xù)操作,可以通過TridentState#newValuesStream方法獲取新的數(shù)據(jù)流數(shù)據(jù)法绵。

persistentAggregate

Trident使用一個名為persistentAggregate的方法更新狀態(tài)箕速。前面已經(jīng)出現(xiàn)過,這里再寫一遍:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

partitionPersist是一個接收Trident聚合器作為參數(shù)并對狀態(tài)數(shù)據(jù)進(jìn)行更新的方法朋譬,persistentAggregate就是構(gòu)建于partitionPersist上層的一個編程抽象盐茎。在這個例子中,通過groupBy返回一個分組數(shù)據(jù)徙赢,Trident需要一個實現(xiàn)MapState接口的對象字柠。分組字段是狀態(tài)的key,聚合結(jié)果是狀態(tài)的value狡赐。MapState接口如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

如果你需要在未分組的數(shù)據(jù)流上執(zhí)行聚合操作時窑业,Trident需要一個實現(xiàn)Snapshottable接口的對象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapStateMemcachedState 都實現(xiàn)了這些接口.

實現(xiàn)MapState接口

實現(xiàn)MapState接口非常簡單,Trident幾乎把所有事都做完了阴汇。OpaqueMap数冬、TransactionalMapNonTransactionalMap都分別實現(xiàn)了各自的容錯語義节槐。只需要為這些類提供一個用于對不同key/value進(jìn)行批量獲取搀庶、批量修改的IBackingMap實現(xiàn)就行拐纱。IBackingMap接口如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

OpaqueMap會使用OpaqueValue作為vals參數(shù)調(diào)用multiPut方法;TransactionalMap會使用TransactionalValue作為參數(shù)哥倔;NonTransactionalMaps會直接把拓?fù)鋵ο髠魅搿?/p>

Trident還提供了CachedMap類來實現(xiàn)key/value的自動LRU緩存操作秸架。

最后,Trident還提供了SnapshottableMap類咆蒿,該類通過將全局聚合的結(jié)果存入一個固定key中的方法將MapState對象轉(zhuǎn)化為Snapshottable對象东抹。

可以參考MemcachedState的實現(xiàn)來了解如何將這些工具結(jié)合在一起來提供一個高性能的MapState實現(xiàn)。MemcachedState支持不透明事務(wù)沃测、事務(wù)和非事務(wù)語義缭黔。


個人主頁: http://www.howardliu.cn

個人博文: storm筆記:Trident狀態(tài)

CSDN主頁: http://blog.csdn.net/liuxinghao

CSDN博文: storm筆記:Trident狀態(tài)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蒂破,隨后出現(xiàn)的幾起案子馏谨,更是在濱河造成了極大的恐慌,老刑警劉巖附迷,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惧互,死亡現(xiàn)場離奇詭異,居然都是意外死亡喇伯,警方通過查閱死者的電腦和手機喊儡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來稻据,“玉大人艾猜,你說我怎么就攤上這事∧砻酰” “怎么了箩朴?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長秋度。 經(jīng)常有香客問我炸庞,道長,這世上最難降的妖魔是什么荚斯? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任埠居,我火速辦了婚禮,結(jié)果婚禮上事期,老公的妹妹穿的比我還像新娘滥壕。我一直安慰自己,他們只是感情好兽泣,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布绎橘。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪称鳞。 梳的紋絲不亂的頭發(fā)上涮较,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音冈止,去河邊找鬼狂票。 笑死,一個胖子當(dāng)著我的面吹牛熙暴,可吹牛的內(nèi)容都是我干的闺属。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼周霉,長吁一口氣:“原來是場噩夢啊……” “哼掂器!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起俱箱,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤唉匾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后匠楚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巍膘,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年芋簿,在試婚紗的時候發(fā)現(xiàn)自己被綠了峡懈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡与斤,死狀恐怖肪康,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情撩穿,我是刑警寧澤磷支,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站食寡,受9級特大地震影響雾狈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜抵皱,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一善榛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呻畸,春花似錦移盆、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春叙甸,著一層夾襖步出監(jiān)牢的瞬間颖医,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工蚁署, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留便脊,地道東北人蚂四。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓光戈,卻偏偏與公主長得像,于是被迫代替她去往敵國和親遂赠。 傳聞我的和親對象是個殘疾皇子久妆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容