死磕hyperledger fabric源碼|kafka共識(shí)排序

死磕hyperledger fabric源碼|kafka共識(shí)排序

文章及代碼:https://github.com/blockchainGuide/

分支:v1.1.0

d1e794177969e09552b173b7d6eaea19

概述

Orderer共識(shí)組件提供HandleChain()方法創(chuàng)建通道綁定的共識(shí)組件鏈對(duì)象(consensus.Chain接口)剩拢,包括Solosolo.chain類型)、Kafkakafka.chainImpl類型)等類型,屬于通道共識(shí)組件的重要實(shí)現(xiàn)模塊啼县,并設(shè)置到鏈支持對(duì)象的cs.Chain字段甘邀。共識(shí)組件鏈對(duì)象提供Orderer共識(shí)排序服務(wù)伟骨,負(fù)責(zé)關(guān)聯(lián)通道上交易排序仔夺、打包出塊、提交賬本南捂、通道管理等工作吴裤,目前采用Golang通道或Kafka集群作為共識(shí)排序后端,接收來(lái)自Broadcast服務(wù)過(guò)濾轉(zhuǎn)發(fā)的交易消息并進(jìn)行排序溺健。

kafka共識(shí)排序服務(wù)

orderer服務(wù)集群

Orderer節(jié)點(diǎn)采用Sarama開(kāi)源的Kafka第三方庫(kù)構(gòu)建Kafka共識(shí)組件麦牺,可以同時(shí)接受處理多個(gè)客戶端發(fā)送的交易消息請(qǐng)求,能夠有效提高Orderer節(jié)點(diǎn)處理交易消息的并發(fā)能力鞭缭。同時(shí)剖膳,可利用Kafka集群在單一分區(qū)內(nèi)按序收集相同主題消息(消息序號(hào)唯一)的功能,來(lái)保證交易消息具有確定性的順序(以消息序號(hào)排序)岭辣,從而實(shí)現(xiàn)對(duì)交易排序達(dá)成全局共識(shí)的目的吱晒。

Kafka生產(chǎn)者按照主題(Topic)生產(chǎn)消息并進(jìn)行發(fā)布,Kafka服務(wù)器集群自動(dòng)對(duì)消息主題進(jìn)行分類沦童。同一個(gè)主題的消息都會(huì)被收集到一個(gè)或多個(gè)分區(qū)文件中仑濒,按照FIFO的順序追加到文件尾部,并且每個(gè)消息在分區(qū)中都會(huì)有一個(gè)OFFSET位置偏移量作為該消息的唯一標(biāo)識(shí)ID偷遗。目前躏精,Hyperledger Fabric基于Kafka集群為每個(gè)通道創(chuàng)建綁定了一個(gè)主題(即鏈ID,chainID)鹦肿,并且只設(shè)置一個(gè)分區(qū)(分區(qū)號(hào)為0)。Kafka消費(fèi)者管理多個(gè)分區(qū)消費(fèi)者并訂閱指定分區(qū)的主題消息辅柴,包括主題(即chainID)箩溃、分區(qū)號(hào)(目前只有1個(gè)分區(qū)號(hào)為0的分區(qū))、起始偏移量(開(kāi)始訂閱的消息位置offset)等碌嘀。

Hyperledger Fabric采用Kafka集群對(duì)單個(gè)或多個(gè)Orderer排序節(jié)點(diǎn)提交的交易消息進(jìn)行排序涣旨。此時(shí),Orderer排序節(jié)點(diǎn)同時(shí)充當(dāng)Kafka集群的消息生產(chǎn)者(分區(qū))和消費(fèi)者股冗,發(fā)布消息與訂閱消息到Kafka集群上的同一個(gè)主題分區(qū)霹陡,即先將Peer節(jié)點(diǎn)提交的交易消息轉(zhuǎn)發(fā)給Kafka服務(wù)端,同時(shí)止状,從指定主題的Kafka分區(qū)上按順序獲取排序后的交易消息并自動(dòng)過(guò)濾重啟的交易消息烹棉。這期間可能會(huì)存在網(wǎng)絡(luò)時(shí)延造成獲取消息時(shí)間的差異。如果不考慮丟包造成消息丟失的情況怯疤,則所有Orderer節(jié)點(diǎn)獲取消息的順序與數(shù)量應(yīng)該是確定的和一致的浆洗。同時(shí),采用相同的Kafka共識(shí)組件鏈對(duì)象與出塊規(guī)則等集峦,以保證所有Orderer節(jié)點(diǎn)都可以創(chuàng)建與更新相同配置的通道伏社,并切割生成相同的批量交易集合出塊抠刺,再“同步”構(gòu)造出相同的區(qū)塊數(shù)據(jù),從而基于Kafka集群達(dá)成全局共識(shí)摘昌,以保證區(qū)塊數(shù)據(jù)的全局一致性速妖。

啟動(dòng)共識(shí)組件鏈對(duì)象

啟動(dòng)入口:

orderer/consensus/kafka/chain.go/Start()

func (chain *chainImpl) Start() {
    go startThread(chain)
}
func startThread(chain *chainImpl) {
    ...
    //創(chuàng)建kafka生產(chǎn)者
    chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
    ...
    // Kafka生產(chǎn)者發(fā)送CONNECT消息建立連接
    if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil {
        logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
    }
    ...
    //創(chuàng)建Kafka消費(fèi)者
    chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
    ...
    //創(chuàng)建Kafka分區(qū)消費(fèi)者
    chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
    ...
    close(chain.startChan) // 已經(jīng)啟動(dòng)共識(shí)組件鏈對(duì)象,不阻塞Broadcast
    chain.errorChan = make(chan struct{}) // 創(chuàng)建errorChan通道聪黎,不阻塞Deliver服務(wù)處理句柄
    ...
    chain.processMessagesToBlocks() //創(chuàng)建消息處理循環(huán)罕容,循環(huán)處理訂閱分區(qū)上接收到的消息
}

startThread函數(shù)首先創(chuàng)建kafka生產(chǎn)者,發(fā)布消息到指定主題(即通道ID)和分區(qū)號(hào)的通道分區(qū)(chain.channel)上挺举。

然后發(fā)送CONNECT消息建立連接杀赢,該消息指定了主題Topic字段為鏈ID、Key字段為分區(qū)號(hào)0湘纵、Value字段為CONNECT類型消息負(fù)載等脂崔。訂閱該主題的Kafka(分區(qū))消費(fèi)者會(huì)接收到該消息。

接著創(chuàng)建指定Kafka分區(qū)和Broker服務(wù)器配置的Kafka消費(fèi)者對(duì)象梧喷,并設(shè)置從指定主題(鏈ID)和分區(qū)號(hào)(0)的Kafka分區(qū)上獲取消息砌左。

最后,調(diào)用processMessagesToBlocks()方法創(chuàng)建消息處理循環(huán)铺敌,負(fù)責(zé)處理從Kafka集群中接收到的訂閱消息汇歹。

處理消息

processMessagesToBlocks接收到正常的Kafka分區(qū)消費(fèi)者消息會(huì)根據(jù)kafka的消息類型進(jìn)行處理,包括以下幾種類型:

  • Kafka- Message_Regular
  • KafkaMessage_TimeToCut
  • KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
    ...
    for { // 消息處理循環(huán)
        select {
        ...
        case in, ok := <-chain.channelConsumer.Messages(): //接收到正常的Kafka分區(qū)消費(fèi)者消息
            ...
            select {
            case <-chain.errorChan: // If this channel was closed...  // 如果該通道已經(jīng)關(guān)閉偿凭,則重新創(chuàng)建該通道
                ...
            switch msg.Type.(type) { //分析Kafka消息類型
            case *ab.KafkaMessage_Connect: //Kafka連接消息  由于錯(cuò)誤而重新恢復(fù)Kafka消費(fèi)者分區(qū)訂閱流程
                _ = chain.processConnect(chain.ChainID()) //處理CONNECT連接消息产弹, 不做任何事情
                counts[indexProcessConnectPass]++         // 成功處理消息計(jì)數(shù)增1
            case *ab.KafkaMessage_TimeToCut: // Kafka定時(shí)切割生成區(qū)塊消息
                if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
                    logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
                    counts[indexProcessTimeToCutError]++
                    return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
                }
                counts[indexProcessTimeToCutPass]++ // 成功處理消息計(jì)數(shù)增1
            case *ab.KafkaMessage_Regular: // Kafka常規(guī)消息
                if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil { // 處理Kafka常 規(guī)消息
                    ...
                    counts[indexProcessRegularError]++
        }...
      }
        case <-chain.timer: // 超時(shí)定時(shí)器
            if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil { //發(fā)送TimeToCut類型消息,請(qǐng)求打包出塊
            ...
                counts[indexSendTimeToCutError]++
            } ...
        }
    }
}

①:KafkaMessage_Connect類型消息

Kafka連接消息用于測(cè)試連通Kafka分區(qū)消費(fèi)者的工作狀態(tài)弯囊,用于驗(yàn)證Kafka共識(shí)組件的正常工作狀態(tài)與排除故障痰哨,并調(diào)用chain.processConnect(chain.ChainID())方法處理該消息。

②:KafkaMessage_TimeToCut類型消息

processMessagesToBlocks()方法可調(diào)用chain.processTimeToCut()方法處理TIMETOCUT類型消息匾嘱。如果消息中的區(qū)塊號(hào)ttcNumber不是當(dāng)前Orderer節(jié)點(diǎn)當(dāng)前通道賬本中下一個(gè)打包出塊的區(qū)塊號(hào)(最新區(qū)塊號(hào)lastCutBlockNumber+1)斤斧,則直接丟棄不處理。否則霎烙,調(diào)用BlockCutter().Cut()方法撬讽,切割當(dāng)前該通道上待處理的緩存交易消息列表為批量交易集合batch([]*cb.Envelope),再調(diào)用CreateNextBlock(batch)方法構(gòu)造新區(qū)塊并提交賬本悬垃。最后游昼,調(diào)用WriteBlock(block,metadata)方法尝蠕,更新區(qū)塊元數(shù)據(jù)并提交賬本酱床,同時(shí)更新Kafka共識(shí)組件鏈對(duì)象的最新區(qū)塊號(hào)lastCutBlockNumber增1。

事實(shí)上趟佃,Orderer服務(wù)集群節(jié)點(diǎn)獨(dú)立打包出塊的時(shí)間點(diǎn)通常不是完全同步的扇谣,同時(shí)還可能會(huì)重復(fù)接收其他Orderer節(jié)點(diǎn)提交的TIMETOCUT類型消息(重復(fù)區(qū)塊號(hào))昧捷。此時(shí),Orderer節(jié)點(diǎn)以接收到的第一個(gè)TIMETOCUT類型消息為準(zhǔn)罐寨,打包出塊并提交到賬本靡挥,再更新當(dāng)前通道的最新區(qū)塊號(hào)lastCutBlockNumber。這樣鸯绿,processTimeToCut()方法就能利用最新的lastCutBlockNumber過(guò)濾掉其他重復(fù)的TIMETOCUT類型消息跋破,以保證所有Orderer節(jié)點(diǎn)上賬本區(qū)塊文件的數(shù)據(jù)同步,實(shí)際上是將原先的時(shí)間同步機(jī)制轉(zhuǎn)換為消息同步機(jī)制瓶蝴。

③:KafkaMessage_Regular類型消息

包括通道配置交易消息(KafkaMessageRegular_CONFIG類型)和普通交易消息(KafkaMessageRegular_NORMAL類型)毒返。 詳細(xì)的分析將會(huì)在processRegular方法中體現(xiàn)。

處理配置交易消息

我們先大概的看一下ProcessRegular中關(guān)于處理配置交易消息的代碼部分,因?yàn)檫@部分相當(dāng)?shù)拈L(zhǎng)舷手,必須先看個(gè)概覽:

func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
  ...
  commitConfigMsg := func(message *cb.Envelope, newOffset int64){...}
  seq := chain.Sequence() // 獲取當(dāng)前通道的最新配置序號(hào)
  ...
  switch regularMessage.Class {
    case ab.KafkaMessageRegular_UNKNOWN: // 未知消息類型
    ...
    case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
        ...
    case ab.KafkaMessageRegular_CONFIG: // 通道配置交易消息
    ...
        }
    ...
}

我們直接跳轉(zhuǎn)到case ab.KafkaMessageRegular_CONFIG進(jìn)行分析:

①:如果regularMessage.OriginalOffset 不為 0

說(shuō)明這是重新過(guò)濾驗(yàn)證和排序的通道配置交易消息拧簸。

1.1 過(guò)濾重復(fù)提交的消息

if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}

1.2 確認(rèn)是否是最近重新驗(yàn)證且重新排序的配置交易消息,并且通道配置序號(hào)是最新的

if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &&regularMessage.ConfigSeq == seq {
  // 因此男窟,關(guān)閉通道并解除Broadcast服務(wù)處理句柄阻塞等待盆赤,通知重新接收消息進(jìn)行處理
  close(chain.doneReprocessingMsgInFlight) 
}

1.3 主動(dòng)更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted

存在其他Orderer節(jié)點(diǎn)重新提交了配置消息,但是本地Orderer節(jié)點(diǎn)沒(méi)有重新提交該消息歉眷。因此這里需要更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted牺六。

if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
                chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
            }

②:regularMessage.OriginalOffset為 0

說(shuō)明是第一次提交通道配置交易消息,而不是重新驗(yàn)證和重新排序的汗捡。

2.1 如果消息中的配置序號(hào)regularMessage.ConfigSeq小于當(dāng)前通道的最新配置序號(hào)seq

則說(shuō)明已經(jīng)更新了通道配置(配置序號(hào)較高)淑际,然后再處理當(dāng)前配置交易消息(配置序號(hào)較低)。將會(huì)調(diào)用ProcessConfigMsg重新過(guò)濾和處理該消息扇住。

接著通過(guò)configure重新提交該配置消息進(jìn)行排序庸追,重置消息初始偏移量。然后再更新最近重新提交消息的偏移量台囱。

if regularMessage.ConfigSeq < seq {
  ...
    configEnv, configSeq, err := chain.ProcessConfigMsg(env)
  if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {...}
  
  // 阻塞接收消息處理,更新最近重新提交消息的偏移量
  chain.lastResubmittedConfigOffset = receivedOffset 
  //創(chuàng)建通道阻塞Broadcast服務(wù)接收處理消息
  chain.doneReprocessingMsgInFlight = make(chan struct{})
}

③:提交配置交易消息執(zhí)行通道管理操作

經(jīng)過(guò)上面的①和②過(guò)濾掉不符合條件的情況读整,接下來(lái)就提交配置交易消息執(zhí)行通道管理操作簿训,核心函數(shù):commitConfigMsg(env, offset)

3.1 將當(dāng)前緩存交易消息切割成批量交易集合

batch := chain.BlockCutter().Cut()

3.2 創(chuàng)建新區(qū)塊block

block := chain.CreateNextBlock(batch)

3.3 構(gòu)造Kafka元數(shù)據(jù)

metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ //構(gòu)造Kafka元數(shù)據(jù)
                LastOffsetPersisted:         receivedOffset - 1, // 偏移量減1
                LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
                LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
            })

3.4 寫入?yún)^(qū)塊

通過(guò)區(qū)塊寫組件提交新區(qū)塊到賬本,更新當(dāng)前通道的最新區(qū)塊號(hào)chain.lastCutBlockNumber增1

chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++  

接著更新本鏈的lastOriginal- OffsetProcessed為newOffset參數(shù)米间,然后做和上面差不多的事情:

chain.lastOriginalOffsetProcessed = newOffset
        block := chain.CreateNextBlock([]*cb.Envelope{message}) // 構(gòu)造新區(qū)塊
        metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{     // 構(gòu)造Kafka元數(shù)據(jù)
            LastOffsetPersisted:         receivedOffset,
            LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
            LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
        })
        chain.WriteConfigBlock(block, metadata) // 寫入配置區(qū)塊
        chain.lastCutBlockNumber++              // 最新區(qū)塊號(hào)增1

不管是上面的WriteBlock還是WriteConfigBlock底層都是調(diào)用的commitBlock强品,如下:

func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
    ... // 添加塊簽名
    bw.addBlockSignature(bw.lastBlock)
  // 添加最新的配置簽名
    bw.addLastConfigSignature(bw.lastBlock)
    // 寫入新塊
    err := bw.support.Append(bw.lastBlock)
    ...
}

接下來(lái)再討論kafka共識(shí)組件如何處理普通交易消息的。

處理普通交易消息

還是先回到 processRegular方法屈糊,關(guān)于處理普通消息的方法大概如下:

func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
  ...
  case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
        // 如果OriginalOffset不是0的榛,則說(shuō)明該消息是重新驗(yàn)證且重新提交排序的
        if regularMessage.OriginalOffset != 0 {
            ...
            // 如果消息偏移量不大于lastOriginalOffsetProcessed最近已處理消息的偏移量,
            // 則說(shuō)明已經(jīng)處理過(guò)該消息逻锐,此時(shí)應(yīng)丟棄返回夫晌,防止重復(fù)處理其他Orderer提交的相同偏移 量的普通交易消息
            if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
                ...
        }

        // // 檢查通道的配置序號(hào)是否更新
        if regularMessage.ConfigSeq < seq {
            ...
            //// 消息的配置序號(hào)低雕薪,需要重新驗(yàn)證過(guò)濾消息
            configSeq, err := chain.ProcessNormalMsg(env)
            ...
            //重新提交普通交易消息
      if err := chain.order(env, configSeq, receivedOffset); err != nil {}
                ...
        }
        // advance lastOriginalOffsetProcessed iff message is re-validated and re-ordered
        //當(dāng)且僅當(dāng)消息重新驗(yàn)證和重新排序時(shí),才需要修正lastOriginalOffsetProcessed偏移量
        offset := regularMessage.OriginalOffset
        if offset == 0 {
            offset = chain.lastOriginalOffsetProcessed
        }
        // 提交處理普通交易消息晓淀,offset為最近處理的普通交易消息偏移量
        commitNormalMsg(env, offset)
}

處理普通交易消息的流程與處理配置交易消息的流程基本類似所袁,主要看最后的commitNormalMsg(env, offset),我們來(lái)繼續(xù)分析:

commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
        //// 添加所接收的消息到緩存交易消息列表凶掰,并切割成批量交易集合列表batches
        batches, pending := chain.BlockCutter().Ordered(message)
        ...
        if len(batches) == 0 {
            // 如果不存在批量交易集合燥爷,則啟動(dòng)定時(shí)器周期性地發(fā)送切割出塊消息n
            chain.lastOriginalOffsetProcessed = newOffset
            if chain.timer == nil {
                chain.timer = time.After(chain.SharedConfig().BatchTimeout())
            ...
            return
        }
        chain.timer = nil
        offset := receivedOffset // 設(shè)置當(dāng)前消息偏移量
        if pending || len(batches) == 2 {
            offset-- // 計(jì)算第1個(gè)批量交易消息的偏移量是offset減1
        } else {  // 只有1個(gè)批量交易集合構(gòu)成1個(gè)區(qū)塊
            //// 設(shè)置第1個(gè)批量交易集合的消息偏移量為newOffset
            chain.lastOriginalOffsetProcessed = newOffset
        }
        //// 構(gòu)造并提交第1個(gè)區(qū)塊
        block := chain.CreateNextBlock(batches[0])
        metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
            LastOffsetPersisted:         offset,
            LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
            LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
        })
        chain.WriteBlock(block, metadata) // 更新區(qū)塊元數(shù)據(jù),并提交區(qū)塊到賬本
        chain.lastCutBlockNumber++ // 更新當(dāng)前通道上最近出塊的區(qū)塊號(hào)增1
    ...
        // Commit the second block if exists
        //// 檢查第2個(gè)批量交易集合懦窘,構(gòu)造并提交第2個(gè)區(qū)塊
        if len(batches) == 2 {
            chain.lastOriginalOffsetProcessed = newOffset
            offset++ // 設(shè)置第2個(gè)批量交易集合的消息偏移量offset加1

            block := chain.CreateNextBlock(batches[1])
            metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
                LastOffsetPersisted:         offset,
                LastOriginalOffsetProcessed: newOffset,
                LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
            })
            chain.WriteBlock(block, metadata)
            chain.lastCutBlockNumber++
            ...
        }
    }

首先將新的普通交易消息添加到當(dāng)前的緩存交易列表前翎,并切割成批量交易集合列表batches ,但最多只能包含2個(gè)批量交易集合,并且第2個(gè)批量交易集合最多包含1個(gè)交易畅涂。最終也是調(diào)用的WriteBlock寫入到賬本港华。

到此為止整個(gè)processRegular()方法處理消息結(jié)束舰褪。

總結(jié)及參考

kafka共識(shí)排序的邏輯其實(shí)是比較簡(jiǎn)單的般婆,大概的流程如下 :

image-20210126092717144

https://github.com/blockchainGuide/ (文章圖片代碼資料在里面)

微信公眾號(hào):區(qū)塊鏈技術(shù)棧

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末持舆,一起剝皮案震驚了整個(gè)濱河市嘲碱,隨后出現(xiàn)的幾起案子荸哟,更是在濱河造成了極大的恐慌理卑,老刑警劉巖座韵,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件公壤,死亡現(xiàn)場(chǎng)離奇詭異扇单,居然都是意外死亡商模,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門蜘澜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)施流,“玉大人,你說(shuō)我怎么就攤上這事鄙信〉纱祝” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵装诡,是天一觀的道長(zhǎng)银受。 經(jīng)常有香客問(wèn)我,道長(zhǎng)鸦采,這世上最難降的妖魔是什么宾巍? 我笑而不...
    開(kāi)封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮渔伯,結(jié)果婚禮上顶霞,老公的妹妹穿的比我還像新娘。我一直安慰自己锣吼,他們只是感情好选浑,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布蓝厌。 她就那樣靜靜地躺著,像睡著了一般鲜侥。 火紅的嫁衣襯著肌膚如雪褂始。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天描函,我揣著相機(jī)與錄音崎苗,去河邊找鬼。 笑死舀寓,一個(gè)胖子當(dāng)著我的面吹牛胆数,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播互墓,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼必尼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了篡撵?” 一聲冷哼從身側(cè)響起判莉,我...
    開(kāi)封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎育谬,沒(méi)想到半個(gè)月后券盅,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡膛檀,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年锰镀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咖刃。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡泳炉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出嚎杨,到底是詐尸還是另有隱情花鹅,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布枫浙,位于F島的核電站刨肃,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏自脯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一斤富、第九天 我趴在偏房一處隱蔽的房頂上張望膏潮。 院中可真熱鬧,春花似錦满力、人聲如沸焕参。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)叠纷。三九已至刻帚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間涩嚣,已是汗流浹背崇众。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留航厚,地道東北人顷歌。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像幔睬,于是被迫代替她去往敵國(guó)和親眯漩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容