Flink 源碼:TM 端恢復(fù)及創(chuàng)建 OperatorState 的流程

本文僅為筆者平日學(xué)習(xí)記錄之用间坐,侵刪
原文:https://mp.weixin.qq.com/s/6Oi_1tP-7Jns3ZguMW7wLg

在之前《StreamTask 初始化流程》的文章中,省略掉了 TM 端恢復(fù) State 的詳細(xì)過程拔妥,本文主要講述:

  • OperatorState 的恢復(fù)和創(chuàng)建流程
  • Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)

一造垛、 TM 端恢復(fù) OperatorState 的流程

StateBackend 創(chuàng)建 OperatorStateBackend 時(shí) TM 端會(huì)恢復(fù) OperatorState会喝。目前 Flink 支持的三種 StateBackend 都對(duì)應(yīng)同一種 OperatorStateBackend俊卤,即:DefaultOperatorStateBackend,具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成浸赫。

三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

所有的初始化流程都在 DefaultOperatorStateBackendBuilder 類的 build 方法中赃绊,build 方法源碼如下所示:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(XXX);
 OperatorStateRestoreOperation restoreOperation = 
    new OperatorStateRestoreOperation(XXX);
 try {
  // OperatorState 恢復(fù)流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 return new DefaultOperatorStateBackend(XXX);
}

build 方法中除了構(gòu)造了幾個(gè)對(duì)象以外既峡,重點(diǎn)執(zhí)行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢復(fù)流程凭戴。

先介紹 OperatorStateRestoreOperation 類中兩個(gè)重要的 Map:

  • registeredOperatorStates 用于保存 StateName 和 ListState 的映射關(guān)系涧狮;
  • registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射關(guān)系

restore 源碼如下所示:

// OperatorStateRestoreOperation 類中兩個(gè)重要的 Map
// 保存 StateName 和 ListState 的映射關(guān)系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;

// 保存 StateName 和 BroadcastState 的映射關(guān)系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
  // stateHandles 為空炕矮,表示沒有要恢復(fù)的 State
 if (stateHandles.isEmpty()) {
  return null;
 }

 // 遍歷所有 stateHandles
 for (OperatorStateHandle stateHandle : stateHandles) {
  // 通過 stateHandle 可以獲取 InputStream 讀取數(shù)據(jù)
  FSDataInputStream in = stateHandle.openInputStream();
  try {
   List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
    backendSerializationProxy.getOperatorStateMetaInfoSnapshots();

   // 從元數(shù)據(jù)中創(chuàng)建 PartitionableListStates么夫,并沒有恢復(fù)真正的 State
   for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
    final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
     new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
        
    // registeredOperatorStates 中維護(hù)的 StateName 與 ListState 的映射關(guān)系
    PartitionableListState<?> listState = registeredOperatorStates
          .get(restoredSnapshot.getName());
        
    // listState == null 表示當(dāng)前 State 還未創(chuàng)建,則創(chuàng)建肤视,并保存到 map 中
    if (null == listState) {
     // 這里只是依賴 MetaInfo 創(chuàng)建了 PartitionableListState档痪,并沒有恢復(fù)真正的 State 數(shù)據(jù)
     listState = new PartitionableListState<>(restoredMetaInfo);
     // 創(chuàng)建出的 State 數(shù)據(jù) put 到 registeredOperatorStates 中
     registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
    }
   }

   // 真正恢復(fù) State 的操作
   for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
    stateHandle.getStateNameToPartitionOffsets().entrySet()) {
    final String stateName = nameToOffsets.getKey();
        // 通過 StateName 從 registeredOperatorStates 中獲取 ListState
        // 因?yàn)橹耙呀?jīng)根據(jù)元數(shù)據(jù)創(chuàng)建了 State,
        // 所以這里 get 不到邢滑,只能是因?yàn)楫?dāng)前的 StateName 屬于 BroadcastState
    PartitionableListState<?> listStateForName = 
            registeredOperatorStates.get(stateName);
        
    // listState 為 null腐螟,表示恢復(fù) Broadcast 相關(guān)的 State
    if (listStateForName == null) {
     BackendWritableBroadcastState<?, ?> broadcastStateForName = 
            registeredBroadcastStates.get(stateName);
     deserializeBroadcastStateValues(broadcastStateForName, 
                                          in, nameToOffsets.getValue());
    } else {
     // 恢復(fù) ListState,將恢復(fù)出來(lái)的元素 add 到 ListState 中
     deserializeOperatorStateValues(listStateForName, 
                                         in, nameToOffsets.getValue());
    }
   }
  } finally {
   Thread.currentThread().setContextClassLoader(restoreClassLoader);
   if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
    IOUtils.closeQuietly(in);
   }
  }
 }
 return null;
}

restore 方法中拿到的就是 JM 分配給當(dāng)前 subtask 的 stateHandles困后,如果 stateHandles 為空表示沒有要恢復(fù)的 State 則直接返回 null乐纸,可能是因?yàn)槿蝿?wù)是直接啟動(dòng),而不是從 Checkpoint 處恢復(fù)摇予。否則 stateHandles 不為空的情況汽绢,就遍歷一個(gè)個(gè) OperatorStateHandle,通過 stateHandle 可以獲取 InputStream 讀取數(shù)據(jù)侧戴。

首先讀出元數(shù)據(jù)宁昭,用于創(chuàng)建 PartitionableListState,并沒有真正恢復(fù) State 數(shù)據(jù)酗宋,PartitionableListState 是 OperatorState 對(duì) ListState 的具體實(shí)現(xiàn)积仗。ListState 維護(hù)在 registeredOperatorStates 這個(gè) Map 中,通過 StateName 從 registeredOperatorStates 中 get蜕猫,get 不到時(shí)寂曹,通過元數(shù)據(jù)創(chuàng)建 State,并存放在 registeredOperatorStates 中回右。

代碼中省略了 BroadcastState 的創(chuàng)建流程隆圆,整體流程與 ListState 流程類似,只不過 BroadcastState 維護(hù)在 registeredBroadcastStates 中楣黍。

最后真正的恢復(fù) State 數(shù)據(jù)匾灶,對(duì)于 ListState 而言將恢復(fù)出來(lái)的元素 add 到 ListState 中∽馄恢復(fù) State 數(shù)據(jù)的過程其實(shí)用反序列化器對(duì)狀態(tài)數(shù)據(jù)反序列化生成對(duì)象的過程阶女。反序列化器維護(hù)在 PartitionableListState 的元數(shù)據(jù)中颊糜。

到這里 OperatorState 就恢復(fù)完成,此時(shí)映射關(guān)系已經(jīng)保存到 OperatorStateRestoreOperation 類的兩個(gè) Map 集合中⊥翰龋現(xiàn)在又回到 DefaultOperatorStateBackendBuilder 類的 build 方法中衬鱼,就會(huì)發(fā)現(xiàn)其實(shí)這兩個(gè) Map 是好多地方共享的。這里再貼一下 build 方法的完整源碼憔杨,重點(diǎn)關(guān)注兩個(gè) Map:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 // 保存 StateName 和 ListState 的映射關(guān)系
 Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
 // 保存 StateName 和 BroadcastState 的映射關(guān)系
 Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = 
    new HashMap<>();

 CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
  
 OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
  cancelStreamRegistry,
  userClassloader,
  // 將兩個(gè) Map 傳遞進(jìn)去鸟赫,即:restore 過程中,映射關(guān)系會(huì)存儲(chǔ)在這兩個(gè) Map 中
  registeredOperatorStates,
  registeredBroadcastStates,
  restoreStateHandles
 );
 try {
  // OperatorState 恢復(fù)流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(
   userClassloader,
   asynchronousSnapshots,
   // 將兩個(gè) Map 傳遞給 DefaultOperatorStateBackendSnapshotStrategy
   registeredOperatorStates,
   registeredBroadcastStates,
   cancelStreamRegistryForBackend);
  
 return new DefaultOperatorStateBackend(
  executionConfig,
  cancelStreamRegistryForBackend,
  // 再將兩個(gè) Map 傳遞給 DefaultOperatorStateBackend
  registeredOperatorStates,
  registeredBroadcastStates,
  new HashMap<>(),
  new HashMap<>(),
  snapshotStrategy
 );
}

可以看到 build 方法剛開始會(huì) new 兩個(gè) Map消别,然后傳遞給了 OperatorStateRestoreOperation抛蚤,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢復(fù)流程)實(shí)際上將 Checkpoint 中恢復(fù)出來(lái)的映射關(guān)系保存到了這兩個(gè) Map 中。之后兩個(gè) Map 又傳遞給了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend寻狂。

所以得出結(jié)論:DefaultOperatorStateBackend 中持有從 Checkpoint 處恢復(fù)出來(lái)的 StateName 與具體 State 的映射關(guān)系岁经。

到這里 DefaultOperatorStateBackend 就創(chuàng)建完成了,同時(shí)留兩個(gè)問題:

  • 上面流程雖然將 OperatorState 從 Checkpoint 中恢復(fù)了蛇券,但用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復(fù)的 OperatorState 關(guān)聯(lián)起來(lái)呢缀壤?

  • 另外對(duì)于直接啟動(dòng),不從 Checkpoint 處恢復(fù)的任務(wù)纠亚,OperatorState 又是如何創(chuàng)建出來(lái)的塘慕?

帶著這兩個(gè)問題閱讀下面流程。

二蒂胞、 用戶定義的 OperatorState 創(chuàng)建流程

Flink 源碼中最典型的使用 OperatorState 的場(chǎng)景就是 FlinkConsumer 使用 ListState 去維護(hù) Kafka 的 offset 信息图呢,所以本文就從這塊源碼入手,看一下這個(gè) ListState 創(chuàng)建流程啤誊。

FlinkKafkaConsumerBase 類的 initializeState 方法中用到了 getUnionListState 創(chuàng)建一個(gè) ListState岳瞭,簡(jiǎn)潔版源碼如下所示:

@Override
public final void initializeState(FunctionInitializationContext context) {
 OperatorStateStore stateStore = context.getOperatorStateStore();
 unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
   OFFSETS_STATE_NAME,
   TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}

這里調(diào)用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是個(gè)接口蚊锹,它只有一個(gè)實(shí)現(xiàn)類瞳筏,就是前面創(chuàng)建出來(lái)的 DefaultOperatorStateBackend。所以這里會(huì)調(diào)用 DefaultOperatorStateBackend 類的 getUnionListState 方法牡昆。不過 DefaultOperatorStateBackend 中還有一個(gè) getListState(ListStateDescriptor stateDescriptor) 方法姚炕,這也就是 OperatorState 類型的 ListState 兩種獲取方式《妫可以看一下這兩個(gè)方法的源碼:

@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}

@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}

源碼中可以看到柱宦,無(wú)論業(yè)務(wù)使用的是 getListState 還是 getUnionListState 方法獲取 ListState ,最后都會(huì)調(diào)用同一個(gè)方法播瞳,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)掸刊。加了一個(gè)參數(shù) OperatorStateHandle.Mode 用于區(qū)分 OperatorState 的模式:

  • getListState 對(duì)應(yīng) SPLIT_DISTRIBUTE 模式
  • getUnionListState 對(duì)應(yīng) UNION 模式

getListState(stateDescriptor, mode) 方法源碼如下所示:

// 無(wú)論是 getListState 還是 getUnionListState 方法都會(huì)調(diào)用這里,
// 只不過傳遞的 Mode 參數(shù)不同而已
private <S> ListState<S> getListState(
  ListStateDescriptor<S> stateDescriptor,
  OperatorStateHandle.Mode mode) throws StateMigrationException {

 String name = Preconditions.checkNotNull(stateDescriptor.getName());

 TypeSerializer<S> partitionStateSerializer = 
    Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

 PartitionableListState<S> partitionableListState = (PartitionableListState<S>) 
    registeredOperatorStates.get(name);

 // registeredOperatorStates 中維護(hù)的是 Checkpoint 中恢復(fù)的 StateName 和 ListState 的映射關(guān)系
 // 如果 partitionableListState == null 表示從 Checkpoint 中沒有恢復(fù)出這個(gè) State赢乓,
 // 即:這是一個(gè)新的 State忧侧,則新建一個(gè) PartitionableListState石窑,并維護(hù)在 Map 中
 if (null == partitionableListState) {
  partitionableListState = new PartitionableListState<>(
   new RegisteredOperatorStateBackendMetaInfo<>(
    name,
    partitionStateSerializer,
    mode));

  registeredOperatorStates.put(name, partitionableListState);
 } else {
  // State 已經(jīng)從 Checkpoint 中恢復(fù)了,檢查兼容性問題
  // 這里會(huì)檢查 StateName 和 AssignmentMode 是否可以匹配
  checkStateNameAndMode(
    partitionableListState.getStateMetaInfo().getName(),
    name,
    partitionableListState.getStateMetaInfo().getAssignmentMode(),
    mode);

  RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
   partitionableListState.getStateMetaInfo();

  // 檢查 序列化是否兼容
  TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

  TypeSerializerSchemaCompatibility<S> stateCompatibility =
   restoredPartitionableListStateMetaInfo.
      updatePartitionStateSerializer(newPartitionStateSerializer);
  //  不兼容蚓炬,則拋出異常
  if (stateCompatibility.isIncompatible()) {
   throw new StateMigrationException("XXX.");
  }
  partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
 }

 accessedStatesByName.put(name, partitionableListState);
 // 返回 State
 return partitionableListState;
}

getListState(stateDescriptor, mode) 方法首先通過 name 從 registeredOperatorStates 中 get 對(duì)應(yīng)的 ListState 保存到 partitionableListState 中松逊,registeredOperatorStates 維護(hù)的是 Checkpoint 中恢復(fù)的 StateName 和 ListState 的映射關(guān)系。所以 partitionableListState == null 表示從 Checkpoint 中沒有恢復(fù)出這個(gè) State肯夏,即:這是一個(gè)新的 State经宏,所以新建一個(gè) PartitionableListState,并保存在 registeredOperatorStates 中驯击。

反之烁兰,partitionableListState != null 表示 State 已經(jīng)從 Checkpoint 中恢復(fù)了,開始檢查兼容性余耽,首先會(huì)檢查 Checkpoint 中恢復(fù)的 State 和用戶新申請(qǐng)的 StateName 和 AssignmentMode 是否可以匹配缚柏。

  • StateName 和 name 肯定是匹配的,因?yàn)?partitionableListState 是根據(jù) name get 出來(lái)的碟贾。
  • AssignmentMode 枚舉用于區(qū)分應(yīng)用層使用的 getListState 恢復(fù)還是 getUnionListState 恢復(fù),getListState 表示 SPLIT_DISTRIBUTE 模式轨域,getUnionListState 表示 UNION 模式袱耽。如果 State 中存儲(chǔ)的是 SPLIT_DISTRIBUTE 模式,但任務(wù)恢復(fù)時(shí)干发,代碼改成了 getUnionListState朱巨,實(shí)際上 State 不能正常恢復(fù)的枉长。

StateName 和 AssignmentMode 檢查完畢后冀续,會(huì)檢查序列化是否兼容,不兼容必峰,則拋出異常洪唐。兼容則會(huì)返回 State。

上述流程就回答了最開始提的兩個(gè)問題:

  1. OperatorState 從 Checkpoint 中恢復(fù)后吼蚁,用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復(fù)的 OperatorState 關(guān)聯(lián)起來(lái)呢凭需?

    答:依賴 registeredOperatorStates 這個(gè) Map 維護(hù)了 StateName 和 ListState 的映射關(guān)系,用戶創(chuàng)建 State 是通過 StateName 從 registeredOperatorStates 中查找肝匆,如果能找到粒蜈,對(duì)其進(jìn)行兼容性檢查,檢查通過就會(huì)返回從 Checkpoint 中恢復(fù)的 ListState旗国,從而完成了關(guān)聯(lián)枯怖。

  2. 對(duì)于直接啟動(dòng),不從 Checkpoint 處恢復(fù)的任務(wù)能曾,OperatorState 又是如何創(chuàng)建出來(lái)的度硝?

    答:對(duì)于直接啟動(dòng)的任務(wù)设捐,registeredOperatorStates 肯定是空的。創(chuàng)建 State 時(shí)塘淑,從 registeredOperatorStates 中 get 不到萝招,所以就創(chuàng)建一個(gè)新的 PartitionableListState,并保存在 registeredOperatorStates 中存捺。

到這里槐沼,OperatorState 就完成了恢復(fù),且用戶的 State 也正常的創(chuàng)建出來(lái)了捌治。

三岗钩、總結(jié)

文中首先介紹了 OperatorState 的恢復(fù)和創(chuàng)建流程,并介紹從 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)的肖油。后續(xù)將會(huì)詳細(xì)介紹 KeyedState 的恢復(fù)創(chuàng)建流程以及如何將 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)兼吓。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市森枪,隨后出現(xiàn)的幾起案子视搏,更是在濱河造成了極大的恐慌,老刑警劉巖县袱,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浑娜,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡式散,警方通過查閱死者的電腦和手機(jī)筋遭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)暴拄,“玉大人漓滔,你說(shuō)我怎么就攤上這事」耘瘢” “怎么了响驴?”我有些...
    開封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)那伐。 經(jīng)常有香客問我踏施,道長(zhǎng),這世上最難降的妖魔是什么罕邀? 我笑而不...
    開封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任畅形,我火速辦了婚禮,結(jié)果婚禮上诉探,老公的妹妹穿的比我還像新娘日熬。我一直安慰自己,他們只是感情好肾胯,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開白布竖席。 她就那樣靜靜地躺著耘纱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪毕荐。 梳的紋絲不亂的頭發(fā)上束析,一...
    開封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音憎亚,去河邊找鬼员寇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛第美,可吹牛的內(nèi)容都是我干的蝶锋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼什往,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扳缕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起别威,我...
    開封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤躯舔,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后兔港,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體庸毫,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年衫樊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片利花。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡科侈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出炒事,到底是詐尸還是另有隱情臀栈,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布挠乳,位于F島的核電站权薯,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏睡扬。R本人自食惡果不足惜盟蚣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望卖怜。 院中可真熱鬧屎开,春花似錦、人聲如沸马靠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至逞度,卻和暖如春额划,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背档泽。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工俊戳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人茁瘦。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓品抽,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親甜熔。 傳聞我的和親對(duì)象是個(gè)殘疾皇子圆恤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354