Hyperledger-Fabric源碼分析(orderer-consensus-etcdraft)

背景

從區(qū)塊鏈的角度來說郎楼,kafka的方式是違背初衷的死嗦,試問中心化的kafka部署在哪里合適务甥,云牡辽?第三方機(jī)構(gòu)?可以說哪都不合適敞临,一個(gè)精心包裝的去中心化的架構(gòu)里面卻包含了中心化的服務(wù)态辛,真是如鯁在喉,不吐不快挺尿。好在奏黑,F(xiàn)abric早就意識(shí)到了這個(gè)問題,很早就在計(jì)劃要引入raft编矾。一個(gè)公開熟史,平等的聯(lián)盟鏈體系里,每個(gè)企業(yè)都能部署自己的排序服務(wù)窄俏。
在分布式一致性算法方面raft可以說非常成熟蹂匹,算法本身非常精妙。想要搞懂這部分實(shí)現(xiàn)凹蜈,還是需要一些背景知識(shí)的限寞,強(qiáng)烈建議先去學(xué)習(xí)下忍啸。
Fabric的這部分主要是用到了etcd的raft庫的實(shí)現(xiàn),實(shí)際就是raft算法的標(biāo)準(zhǔn)實(shí)現(xiàn)昆烁,至于網(wǎng)絡(luò)通訊及存儲(chǔ)部分吊骤,則留給應(yīng)用層自己。之后你可以看到Fabric還是做了不少工作静尼,以后如果etcdraft能獨(dú)立出來白粉,我想更有利于應(yīng)用接入。

https://ramcloud.stanford.edu/~ongaro/thesis.pdf

https://raft.github.io/

名詞解釋

名詞 解釋
Term 任期
Vote 選舉投票
Entry 日志數(shù)據(jù)條目
candidate 候選人
leader 領(lǐng)導(dǎo)者
follower 跟隨者
commit 提交
propose 提議

配置

Orderer: &OrdererDefaults
  OrdererType: etcdraft
  Addresses:
    - orderer1st-ordererorg:7050
    - orderer2nd-ordererorg:7050
    - orderer3rd-ordererorg:7050
  BatchTimeout: 2s
  BatchSize:
    MaxMessageCount: 500
    AbsoluteMaxBytes: 98 MB
    PreferredMaxBytes: 512 KB
  EtcdRaft:
    Consenters:
      - Host: orderer1st-ordererorg
        Port: 7050
        ClientTLSCert: ...
        ServerTLSCert: ...
      - Host: orderer2nd-ordererorg
        Port: 7050
        ClientTLSCert: ...
        ServerTLSCert: ...
      - Host: orderer3rd-ordererorg
        Port: 7050
        ClientTLSCert: ...
        ServerTLSCert: ...
    Options:
      TickInterval: 100
      ElectionTick: 10
      HeartbeatTick: 1
      MaxInflightMsgs: 256
      MaxSizePerMsg: 1048576
      SnapshotInterval: 500

可以看到Raftnode就是Orderer自己啦鼠渺,并沒有在Orderer上再建立Raft集群的概念鸭巴,跟kafka還是有區(qū)別。

Raft

Node

1552147356624.png

Raft庫有提供Node來與應(yīng)用層互動(dòng)拦盹。

名詞 解釋
Tick 這個(gè)就像是Raft的發(fā)條鹃祖,要每隔一段時(shí)間來調(diào)度這里,驅(qū)動(dòng)選舉和心跳
Advance 告訴raft普舆,上次推送的ready恬口,我已經(jīng)處理完畢,準(zhǔn)備好處理下一個(gè)Ready
Ready Raft世界的風(fēng)吹草動(dòng)會(huì)通知這里沼侣,這非常重要祖能,后面會(huì)講到
Step 將收到的消息寫入狀態(tài)機(jī)
ProposeConfChange 提交配置變更
Propose 提議寫入數(shù)據(jù)到日志中,可能會(huì)返回錯(cuò)誤蛾洛。
Campaign 調(diào)用該函數(shù)將驅(qū)動(dòng)節(jié)點(diǎn)進(jìn)入候選人狀態(tài)养铸,將競爭leader。
ApplyConfChange 應(yīng)用配置變更
type Ready struct {
   // The current volatile state of a Node.
   // SoftState will be nil if there is no update.
   // It is not required to consume or store SoftState.
   *SoftState

   // The current state of a Node to be saved to stable storage BEFORE
   // Messages are sent.
   // HardState will be equal to empty state if there is no update.
   pb.HardState

   // ReadStates can be used for node to serve linearizable read requests locally
   // when its applied index is greater than the index in ReadState.
   // Note that the readState will be returned when raft receives msgReadIndex.
   // The returned is only valid for the request that requested to read.
   ReadStates []ReadState

   // Entries specifies entries to be saved to stable storage BEFORE
   // Messages are sent.
   Entries []pb.Entry

   // Snapshot specifies the snapshot to be saved to stable storage.
   Snapshot pb.Snapshot

   // CommittedEntries specifies entries to be committed to a
   // store/state-machine. These have previously been committed to stable
   // store.
   CommittedEntries []pb.Entry

   // Messages specifies outbound messages to be sent AFTER Entries are
   // committed to stable storage.
   // If it contains a MsgSnap message, the application MUST report back to raft
   // when the snapshot has been received or has failed by calling ReportSnapshot.
   Messages []pb.Message

   // MustSync indicates whether the HardState and Entries must be synchronously
   // written to disk or if an asynchronous write is permissible.
   MustSync bool
}

在Raft世界里一切風(fēng)吹草動(dòng)差不多都在這里了轧膘,應(yīng)用層要跟raft來互動(dòng)的話钞螟,這里是一切動(dòng)作的起源。搞懂這些字段的作用是理解實(shí)現(xiàn)的關(guān)鍵谎碍。

名詞 解釋
SoftState 記錄的是當(dāng)前任期的leader是誰鳞滨,以及該節(jié)點(diǎn)在raft集群的角色,易變的狀態(tài)不需要保存
HardState 需要寫入持久化存儲(chǔ)中蟆淀,包括:節(jié)點(diǎn)當(dāng)前Term太援、Vote、Commit
Entries 在向其他集群發(fā)送消息之前需要先寫入持久化存儲(chǔ)的日志數(shù)據(jù)
Snapshot 需要寫入持久化存儲(chǔ)中的快照數(shù)據(jù)
CommittedEntries 需要輸入到狀態(tài)機(jī)中的數(shù)據(jù)扳碍,這些數(shù)據(jù)之前已經(jīng)被保存到持久化存儲(chǔ)中了
Messages 在entries被寫入持久化存儲(chǔ)中以后,需要發(fā)送出去的數(shù)據(jù)

Raft->Orderer

case rd := <-n.Ready():
   if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
      n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
   }

   if !raft.IsEmptySnap(rd.Snapshot) {
      n.chain.snapC <- &rd.Snapshot
   }

   // skip empty apply
   if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
      n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
   }

   n.Advance()

   // TODO(jay_guo) leader can write to disk in parallel with replicating
   // to the followers and them writing to their disks. Check 10.2.1 in thesis
   n.send(rd.Messages)

這里處理了Raft發(fā)來的Ready通知仙蛉。

  1. 首先不管怎么樣笋敞,只要收到Ready,先把Entries荠瘪,HardState夯巷,Snapshot存儲(chǔ)在本地赛惩。要注意存下來并不代表會(huì)寫入狀態(tài)機(jī),先收下來比較重要趁餐,Raft之后會(huì)保證哪些是需要應(yīng)用到狀態(tài)機(jī)的喷兼。因?yàn)镽aft庫沒有存儲(chǔ)支持,所以需要應(yīng)用進(jìn)行接管后雷。
  2. 如果含有snapshot快照季惯,通知snapC,這里后面再講
  3. len(rd.CommittedEntries) != 0 || rd.SoftState != nil臀突,這里說明如果有CommittedEntries或SoftState變更勉抓,通知applyC
  4. 全部處理完,Advance候学,通知Raft處理完畢藕筋,可以發(fā)下一個(gè)Ready了。
  5. 因?yàn)镽aft庫沒有網(wǎng)絡(luò)支持梳码,所以node間的消息交互需要應(yīng)用進(jìn)行接管隐圾。這個(gè)后面再講。

存儲(chǔ)

func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
    if err := rs.wal.Save(hardstate, entries); err != nil {
        return err
    }

    if !raft.IsEmptySnap(snapshot) {
        if err := rs.saveSnap(snapshot); err != nil {
            return err
        }

        if err := rs.ram.ApplySnapshot(snapshot); err != nil {
            if err == raft.ErrSnapOutOfDate {
                rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
                    snapshot.Metadata.Term, snapshot.Metadata.Index)
            } else {
                rs.lg.Fatalf("Unexpected programming error: %s", err)
            }
        }
    }

    if err := rs.ram.Append(entries); err != nil {
        return err
    }

    return nil
}
  1. HardState和Entries寫入WAL
  2. Snapshot寫入snap
  3. Snapshot和Entries放到MemoryStorage掰茶,可以看成是storage的cache層

快照

case sn := <-c.snapC:
            if sn.Metadata.Index <= c.appliedIndex {
                c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
                break
            }

            b := utils.UnmarshalBlockOrPanic(sn.Data)
            c.lastSnapBlockNum = b.Header.Number
            c.confState = sn.Metadata.ConfState
            c.appliedIndex = sn.Metadata.Index

            if err := c.catchUp(sn); err != nil {
                    sn.Metadata.Term, sn.Metadata.Index, err)
            }
  1. 如果狀態(tài)機(jī)的index比快照還要新暇藏,那繼續(xù)下去沒有意義了
  2. 將快照的數(shù)據(jù)給chain做更新
  3. 注意這里的confState,里面記錄了成員列表符匾,以及l(fā)earner列表叨咖。
  4. 基本上收到快照的都是不成器的follower或新來的learner,要努力跟leader保持一致啊胶,所以要調(diào)用catchUp
  5. 多提一句甸各,learner不參加選舉,是因?yàn)樗浜筇嗔搜嫫海瑸榱瞬粩_亂民主程序的正常進(jìn)行趣倾,先靠邊站,等你跟我一致了某饰,你再來把儒恋。
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
   b, err := utils.UnmarshalBlock(snap.Data)
   if err != nil {
      return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
   }

   if c.lastBlock.Header.Number >= b.Header.Number {
      c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
      return nil
   }

   puller, err := c.createPuller()
   if err != nil {
      return errors.Errorf("failed to create block puller: %s", err)
   }
   defer puller.Close()

   var block *common.Block
   next := c.lastBlock.Header.Number + 1

   c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)

   for next <= b.Header.Number {
      block = puller.PullBlock(next)
      if block == nil {
         return errors.Errorf("failed to fetch block %d from cluster", next)
      }
      if utils.IsConfigBlock(block) {
         c.support.WriteConfigBlock(block, nil)
      } else {
         c.support.WriteBlock(block, nil)
      }

      next++
   }

   c.lastBlock = block
   c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
   return nil
}
  1. 這里有個(gè)技巧是快照是怎么構(gòu)建的,這里后面會(huì)講到黔漂,其實(shí)就是保存的那段快照區(qū)間的最后一個(gè)block诫尽。
  2. 這里用到了Puller,這里底層就是對(duì)接的Orderer的deliver服務(wù)拉取block炬守。
  3. 下面就很明顯了牧嫉,去拉取該區(qū)間的block的同時(shí)寫入本地賬本,并更新lastblock標(biāo)記位。

applyC

SoftState
case app := <-c.applyC:
   if app.soft != nil {
      newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
      if newLeader != soft.Lead {
         c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
         c.Metrics.LeaderChanges.Add(1)

         atomic.StoreUint64(&c.lastKnownLeader, newLeader)

         if newLeader == c.raftID {
            propC, cancelProp = becomeLeader()
         }

         if soft.Lead == c.raftID {
            becomeFollower()
         }
      }

    ...

      soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}

      // notify external observer
      select {
      case c.observeC <- soft:
      default:
      }
   }
  1. 收到這個(gè)通知酣藻,就代表可能變天了曹洽,要換領(lǐng)導(dǎo)。
  2. 看下新來的領(lǐng)導(dǎo)任命書跟現(xiàn)在所知的是不是一個(gè)人辽剧,如果不是送淆,不好意思就是這么現(xiàn)實(shí),開始工作交接怕轿。
  3. 看下是不是自己當(dāng)選偷崩,如是becomeLeader。不用懷疑撤卢。
  4. 如果上次是本人當(dāng)選环凿,這次換人的話,那leader職權(quán)得立即停止放吩,becomeFollower智听。
  5. 記錄最新得softstate通知observeC,不過當(dāng)前外部沒有人關(guān)注這個(gè)事情渡紫。
becomeLeader
becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
        c.Metrics.IsLeader.Set(1)

        c.blockInflight = 0
        c.justElected = true
        submitC = nil
        ch := make(chan *common.Block, c.opts.MaxInflightMsgs)

        // if there is unfinished ConfChange, we should resume the effort to propose it as
        // new leader, and wait for it to be committed before start serving new requests.
        if cc := c.getInFlightConfChange(); cc != nil {
            // The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
            go func() {
                if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
                    c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
                }
            }()

            c.confChangeInProgress = cc
            c.configInflight = true
        }

        // Leader should call Propose in go routine, because this method may be blocked
        // if node is leaderless (this can happen when leader steps down in a heavily
        // loaded network). We need to make sure applyC can still be consumed properly.
        ctx, cancel := context.WithCancel(context.Background())
        go func(ctx context.Context, ch <-chan *common.Block) {
            for {
                select {
                case b := <-ch:
                    data := utils.MarshalOrPanic(b)
                    if err := c.Node.Propose(ctx, data); err != nil {
                        c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
                        return
                    }
                    c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)

                case <-ctx.Done():
                    c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
                    return
                }
            }
        }(ctx, ch)

        return ch, cancel
    }

其實(shí)最主要就是后面的函數(shù)到推,外部調(diào)用propC, cancelProp = becomeLeader(),會(huì)循環(huán)監(jiān)聽propC通道惕澎,然后將data用Node.Propose發(fā)給Raft莉测,這個(gè)后面再講。

再Raft的世界里唧喉,leader就是王道捣卤,它的話就是圣旨,只有l(wèi)eader才有資格Propose東西出去八孝。所以選上的最重要的事情就是拿到與Raft的溝通權(quán)力董朝。

如果有配置未提交,c.Node.ProposeConfChange

這里什么地方會(huì)通知到ch干跛,這里賣個(gè)關(guān)子子姜,后面會(huì)講到。

becomeFollower
becomeFollower := func() {
   cancelProp()
   c.blockInflight = 0
   _ = c.support.BlockCutter().Cut()
   stop()
   submitC = c.submitC
   bc = nil
   c.Metrics.IsLeader.Set(0)
}

交出權(quán)力的心情是痛苦的楼入,我們看下做了什么哥捕?

  1. 原來以為cancelProp會(huì)斷開與propC的關(guān)系,現(xiàn)在看來do nothing嘉熊,一是從理論上來說遥赚,follower不會(huì)有propose的機(jī)會(huì),二是給最后一次的超時(shí)補(bǔ)償做準(zhǔn)備阐肤。

  2. blockInflight是代表說leader會(huì)記錄propose出去的block鸽捻,是不是在Raft里面形成了大多數(shù)一致,如果達(dá)成一致,leader會(huì)在本地commit御蒲,這個(gè)時(shí)候才會(huì)移除掉這條記錄。

  3. c.support.BlockCutter().Cut(), 這里有個(gè)疑問诊赊,這種調(diào)法會(huì)清理掉pendingBatch厚满,真的這么肯定到這里不會(huì)剩下沒有處理完的么?

    func (r *receiver) Cut() []*cb.Envelope {
       r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
       r.PendingBatchStartTime = time.Time{}
       batch := r.pendingBatch
       r.pendingBatch = nil
       r.pendingBatchSizeBytes = 0
       return batch
    }
    
  4. stop碧磅,既然上面已經(jīng)清掉了pending碘箍,那這里再stop pendingbatch的超時(shí)處理,也就沒有什么問題鲸郊。

  5. submitC通道是代表接受客戶端的數(shù)據(jù)提交丰榴,這個(gè)后面再講。

  6. bc就是blockcreator秆撮,里面保存的最近一次創(chuàng)建block的信息四濒,既然你都卸任了,這些也就沒什么意義了职辨。

CommittedEntries
func (c *Chain) apply(ents []raftpb.Entry) {
    if len(ents) == 0 {
        return
    }

    if ents[0].Index > c.appliedIndex+1 {
        c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
    }

    var appliedb uint64
    var position int
    for i := range ents {
        switch ents[i].Type {
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                break
            }

            // We need to strictly avoid re-applying normal entries,
            // otherwise we are writing the same block twice.
            if ents[i].Index <= c.appliedIndex {
                c.logger.Debugf("Received block with raft index (%d) <= applied index (%d), skip", ents[i].Index, c.appliedIndex)
                break
            }

            block := utils.UnmarshalBlockOrPanic(ents[i].Data)
            c.writeBlock(block, ents[i].Index)

            appliedb = block.Header.Number
            c.Metrics.CommittedBlockNumber.Set(float64(appliedb))
            position = i
            c.accDataSize += uint32(len(ents[i].Data))

        ...

    if c.accDataSize >= c.sizeLimit {
        select {
        case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
            c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
                "taking snapshot at block %d, last snapshotted block number is %d",
                c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
            c.accDataSize = 0
            c.lastSnapBlockNum = appliedb
            c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
        default:
            c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
        }
    }

    return
}

再Raft的世界里面盗蟆,確認(rèn)提交的有兩種entry啦,一種就是所謂的普通舒裤,一種就是配置變更喳资。

  1. 遍歷普通日志,如果是已經(jīng)寫入狀態(tài)機(jī)腾供,也就是寫入本地賬本的block, 那當(dāng)然要拒絕仆邓,免得重復(fù)。
  2. 接下就是writeblock到本地啦伴鳖。
  3. 然后記錄這次處理到第幾個(gè)了节值,最后再統(tǒng)計(jì)這次總共處理的datasize,就是blocksize累加啦黎侈。這個(gè)之后會(huì)有妙用察署,后面再講。

下面我們看下writeBlock的邏輯

writeBlock
func (c *Chain) writeBlock(block *common.Block, index uint64) {
   if c.blockInflight > 0 {a
      c.blockInflight-- // only reduce on leader
   }
   c.lastBlock = block

   c.logger.Debugf("Writing block %d to ledger", block.Header.Number)

   if utils.IsConfigBlock(block) {
      c.writeConfigBlock(block, index)
      return
   }

   c.raftMetadataLock.Lock()
   c.opts.RaftMetadata.RaftIndex = index
   m := utils.MarshalOrPanic(c.opts.RaftMetadata)
   c.raftMetadataLock.Unlock()

   c.support.WriteBlock(block, m)
}
  1. blockInflight前面講過了峻汉,這里收到代表我發(fā)出去的propose收到了群眾的強(qiáng)烈支持贴汪,那這個(gè)提案就過了。剩下就是好好把提案落地就好休吠。
  2. 配置部分有機(jī)會(huì)單獨(dú)講扳埂,本身不影響主要的流程,這里先跳過
  3. c.support.WriteBlock(block, m)瘤礁,就是寫本地賬本啦阳懂。
if c.accDataSize >= c.sizeLimit {
   select {
   case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
      c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
         "taking snapshot at block %d, last snapshotted block number is %d",
         c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
      c.accDataSize = 0
      c.lastSnapBlockNum = appliedb
      c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
   default:
      c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
   }
}

func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error {
    rs.lg.Debugf("Creating snapshot at index %d from MemoryStorage", i)
    snap, err := rs.ram.CreateSnapshot(i, &cs, data)
    if err != nil {
        return errors.Errorf("failed to create snapshot from MemoryStorage: %s", err)
    }

    if err = rs.saveSnap(snap); err != nil {
        return err
    }

    rs.snapshotIndex = append(rs.snapshotIndex, snap.Metadata.Index)

    // Keep some entries in memory for slow followers to catchup
    if i > rs.SnapshotCatchUpEntries {
        compacti := i - rs.SnapshotCatchUpEntries
        rs.lg.Debugf("Purging in-memory raft entries prior to %d", compacti)
        if err = rs.ram.Compact(compacti); err != nil {
            if err == raft.ErrCompacted {
                rs.lg.Warnf("Raft entries prior to %d are already purged", compacti)
            } else {
                rs.lg.Fatalf("Failed to purge raft entries: %s", err)
            }
        }
    }

    rs.lg.Infof("Snapshot is taken at index %d", i)

    rs.gc()
    return nil
}
  1. 如果累加的accDataSize超過閾值,這里會(huì)將寫入的最后一個(gè)block的相關(guān)信息通知給gcC通道。
  2. gcC再轉(zhuǎn)調(diào)takeSnapshot
  3. TakeSnapshot里面很簡單岩调,就是生成snapshot巷燥,包括任期,最后一次的日志下標(biāo)以及block号枕。保存到本地snap缰揪。
  4. rs.gc里面涉及到一個(gè)閾值,MaxSnapshotFiles葱淳,如果超過钝腺,需要清理文件。首當(dāng)其沖的是wal赞厕,看下是不是有比快照還要老的日志艳狐,有的話清掉。既然都有快照了皿桑,wal日志也就沒有存在的意義了毫目。Raft的世界里index是一切衡量的基礎(chǔ)。snap文件就簡單唁毒,超過多少就刪多少蒜茴。
if c.justElected {
   msgInflight := c.Node.lastIndex() > c.appliedIndex
   if msgInflight {
      c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
      continue
   }

   if c.configInflight {
      c.logger.Debugf("There is config block in flight, new leader should not serve requests")
      continue
   }

   c.logger.Infof("Start accepting requests as Raft leader at block %d", c.lastBlock.Header.Number)
   bc = &blockCreator{
      hash:   c.lastBlock.Header.Hash(),
      number: c.lastBlock.Header.Number,
      logger: c.logger,
   }
   submitC = c.submitC
   c.justElected = false
} else if c.configInflight {
   c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
   submitC = nil
} else if c.blockInflight < c.opts.MaxInflightMsgs {
   submitC = c.submitC
}
  1. justElected就代表剛選上那會(huì)。過程自己體會(huì)浆西。
  2. msgInflight就代表有MemoryStorage的entry還沒有寫入賬本啦粉私,不宜出門接客
  3. configInflight也是一樣,有Raft配置變更或config block進(jìn)來還沒有生效前近零,更加不宜出門接客
  4. 如果前面都過了诺核,submitC = c.submitC就代表結(jié)果submitC通道,正式開始開門迎客久信。需要注意的是之后可進(jìn)不到這里哦窖杀。
  5. 如果之前有過配置變更的干擾,c.blockInflight < c.opts.MaxInflightMsgs這里就是給她重新出門接客的機(jī)會(huì)裙士。
  6. 還記不記得becomeFollower的時(shí)候立馬就能接客入客,而leader條件很多,說明leader要求高嘛腿椎。

到這里基本把Raft到Orderer的處理都講完了桌硫。

Messages

   n.Advance()

   // TODO(jay_guo) leader can write to disk in parallel with replicating
   // to the followers and them writing to their disks. Check 10.2.1 in thesis
   n.send(rd.Messages)

Advance的意思是這波Ready我已經(jīng)處理完了,我準(zhǔn)備好再處理

前面提到過啃炸,EtcdRaft只關(guān)注算法本身铆隘,集群節(jié)點(diǎn)間怎么通訊,不是它關(guān)注的點(diǎn)南用,不過當(dāng)然了膀钠,消息要發(fā)給誰掏湾,它是知道的,只不過想讓你代勞而已肿嘲。

func (n *node) send(msgs []raftpb.Message) {
   n.unreachableLock.RLock()
   defer n.unreachableLock.RUnlock()

   for _, msg := range msgs {
      if msg.To == 0 {
         continue
      }

      status := raft.SnapshotFinish

      msgBytes := utils.MarshalOrPanic(&msg)
      err := n.rpc.SendConsensus(msg.To, &orderer.ConsensusRequest{Channel: n.chainID, Payload: msgBytes})
      if err != nil {
         // TODO We should call ReportUnreachable if message delivery fails
         n.logSendFailure(msg.To, err)

         status = raft.SnapshotFailure
      } else if _, ok := n.unreachable[msg.To]; ok {
         n.logger.Infof("Successfully sent StepRequest to %d after failed attempt(s)", msg.To)
         delete(n.unreachable, msg.To)
      }

      if msg.Type == raftpb.MsgSnap {
         n.ReportSnapshot(msg.To, status)
      }
   }
}

還記得之前將snap寫入存儲(chǔ)吧融击?到這里一般的情況可以將狀態(tài)置為SnapshotFinish,但是保險(xiǎn)起見雳窟,這波消息只要發(fā)送失敗就認(rèn)為這次快照存儲(chǔ)失敗砚嘴,情愿重發(fā)一次。

最后ReportSnapshot就是用來向Leader報(bào)告你發(fā)給我的快照的執(zhí)行情況涩拙。

Orderer->Raft

Orderer是怎么把消息發(fā)給Raft的呢?Fabric剝離了底層共識(shí)算法與Orderer的耦合耸采,讓替換成為可能兴泥。看過之前kafka和solo的對(duì)這個(gè)應(yīng)該很熟悉虾宇。

type Consenter interface {
   // Order accepts a message or returns an error indicating the cause of failure
   // It ultimately passes through to the consensus.Chain interface
   Order(env *cb.Envelope, configSeq uint64) error

   // Configure accepts a reconfiguration or returns an error indicating the cause of failure
   // It ultimately passes through to the consensus.Chain interface
   Configure(config *cb.Envelope, configSeq uint64) error

   // WaitReady blocks waiting for consenter to be ready for accepting new messages.
   // This is useful when consenter needs to temporarily block ingress messages so
   // that in-flight messages can be consumed. It could return error if consenter is
   // in erroneous states. If this blocking behavior is not desired, consenter could
   // simply return nil.
   WaitReady() error
}

只要是普通類型的事件都會(huì)走Order搓彻,來push到后端的共識(shí)服務(wù)。

func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
   return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

這里封裝成SubmitRequest繼續(xù)往后傳遞

func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
   if err := c.isRunning(); err != nil {
      c.Metrics.ProposalFailures.Add(1)
      return err
   }

   leadC := make(chan uint64, 1)
   select {
   case c.submitC <- &submit{req, leadC}:
      lead := <-leadC
      if lead == raft.None {
         c.Metrics.ProposalFailures.Add(1)
         return errors.Errorf("no Raft leader")
      }

      if lead != c.raftID {
         if err := c.rpc.SendSubmit(lead, req); err != nil {
            c.Metrics.ProposalFailures.Add(1)
            return err
         }
      }

   case <-c.doneC:
      c.Metrics.ProposalFailures.Add(1)
      return errors.Errorf("chain is stopped")
   }

   return nil
}
  1. 當(dāng)然chain要是running狀態(tài)嘱朽,將收到的事件通知給submitC通道旭贬。

  2. 等待leadC的通知,先中斷下搪泳,看下要干嘛稀轨,其實(shí)就是返回當(dāng)前任期的leader,當(dāng)前任期不準(zhǔn)確岸军,應(yīng)該是最新一任leader奋刽,因?yàn)橛锌赡茉谀硞€(gè)任期leader沒有選出來,當(dāng)然這個(gè)幾率非常非常低艰赞,因?yàn)橛须S機(jī)超時(shí)的存在佣谐。

  3. 繼續(xù),拿到任命書方妖,看下如果是raft.None, 說明現(xiàn)在還沒有領(lǐng)導(dǎo)狭魂,直接return,表示這個(gè)事件我不能收党觅,收下真的處理不了雌澄。

  4. 如果leader不是本人,那問題大了仔役,在Raft的世界里只有l(wèi)eader才能發(fā)號(hào)施令掷伙,現(xiàn)在這個(gè)事件怎么辦?丟掉又可惜又兵,因?yàn)槿绻及l(fā)給leader任柜,那那邊壓力太大了卒废。既然你沒有篡位之意,那努力給你的領(lǐng)導(dǎo)分憂不是宙地,借助rpc將這個(gè)事件發(fā)給他好了摔认。RPC模塊是給orderer間通訊用的,也就是Raftnode間通訊用的宅粥。沒它整個(gè)體系你玩不轉(zhuǎn)的参袱,以后有機(jī)會(huì)再講把。

  5. 前面Raft->Orderer章節(jié)秽梅,我們講了用submitC來通知開門迎客抹蚀,下面我們看下接客會(huì)做些什么?

submitC

case s := <-submitC:
   if s == nil {
      // polled by `WaitReady`
      continue
   }

   if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
      s.leader <- raft.None
      continue
   }

   s.leader <- soft.Lead
   if soft.Lead != c.raftID {
      continue
   }

   batches, pending, err := c.ordered(s.req)
   if err != nil {
      c.logger.Errorf("Failed to order message: %s", err)
   }
   if pending {
      start() // no-op if timer is already started
   } else {
      stop()
   }

   c.propose(propC, bc, batches...)

   if c.configInflight {
      c.logger.Info("Received config block, pause accepting transaction till it is committed")
      submitC = nil
   } else if c.blockInflight >= c.opts.MaxInflightMsgs {
      c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
         c.blockInflight, c.opts.MaxInflightMsgs)
      submitC = nil
   }
  1. 如果當(dāng)前節(jié)點(diǎn)的狀態(tài)是準(zhǔn)候選人或候選人企垦,那就沒什么好說了环壤,leader現(xiàn)在還沒有產(chǎn)生
  2. 如果soft.Lead != c.raftID,說明什么钞诡,說明最新任期不是自己哦郑现,沒有propose的權(quán)利,丟棄這次請(qǐng)求荧降。
  3. batches, pending, err := c.ordered(s.req)接箫,很熟悉了,這里負(fù)責(zé)出包朵诫。
  4. 如果還有剩下的事件沒有出包辛友,為了保證不浪費(fèi),啟動(dòng)計(jì)時(shí)器拗窃,來做補(bǔ)償瞎领,這部分后面再講。
  5. c.propose(propC, bc, batches...)随夸,重點(diǎn)九默,這里是真正給Raft發(fā)狀態(tài)的地方,后面講到宾毒。
  6. 最后無非就是一些異常情況驼修,會(huì)讓leader失去接受請(qǐng)求的能力。
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
   for _, batch := range batches {
      b := bc.createNextBlock(batch)
      c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)

      select {
      case ch <- b:
      default:
         c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
      }

      // if it is config block, then we should wait for the commit of the block
      if utils.IsConfigBlock(b) {
         c.configInflight = true
      }

      c.blockInflight++
   }

   return
}

還記不記得前面講becomeLeader的時(shí)候提到的ch诈铛,這里最后會(huì)一個(gè)接一個(gè)的將block通知到ch乙各。再貼一遍那邊的代碼。

go func(ctx context.Context, ch <-chan *common.Block) {
   for {
      select {
      case b := <-ch:
         data := utils.MarshalOrPanic(b)
         if err := c.Node.Propose(ctx, data); err != nil {
            c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
            return
         }
         c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)

      case <-ctx.Done():
         c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
         return
      }
   }
}(ctx, ch)

最終會(huì)調(diào)用c.Node.Propose(ctx, data)的方法幢竹。

Propose的意思就是將日志廣播出去耳峦,要群眾都盡量保存起來,但還沒有提交焕毫,等到leader收到半數(shù)以上的群眾都響應(yīng)說已經(jīng)保存完了蹲坷,leader這時(shí)就可以提交了驶乾,下一次Ready的時(shí)候就會(huì)帶上committedindex。

超時(shí)處理

case <-timer.C():
   ticking = false

   batch := c.support.BlockCutter().Cut()
   if len(batch) == 0 {
      c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
      continue
   }

   c.logger.Debugf("Batch timer expired, creating block")
   c.propose(propC, bc, batch) // we are certain this is normal block, no need to block

沒有新意循签,無非就是將pending的做一次Cut级乐,然后propose到Raft。

配置更新

配置部分是Raft不可忽略的一部分县匠,F(xiàn)abric是怎樣將成員的變更傳遞給Raft的风科?

首先我們回到Node接口,看下ProposeConfChange和ApplyConfChange乞旦。

一個(gè)是通知Raft贼穆,有配置變更。另外一個(gè)是接到Raft通知兰粉,有配置更新扮惦,立即執(zhí)行。

ProposeConfChange

func (c *Chain) writeBlock(block *common.Block, index uint64) {
  ...
   if utils.IsConfigBlock(block) {
      c.writeConfigBlock(block, index)
      return
   }
   ...
}

還記得么亲桦,前面提到的writeBlock里面會(huì)判斷當(dāng)前寫入的是不是configblock

func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
   metadata, raftMetadata := c.newRaftMetadata(block)

   var changes *MembershipChanges
   if metadata != nil {
      changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
   }

   confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
   raftMetadata.RaftIndex = index

   raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
   // write block with metadata
   c.support.WriteConfigBlock(block, raftMetadataBytes)
   c.configInflight = false

   // update membership
   if confChange != nil {
      // We need to propose conf change in a go routine, because it may be blocked if raft node
      // becomes leaderless, and we should not block `serveRequest` so it can keep consuming applyC,
      // otherwise we have a deadlock.
      go func() {
         // ProposeConfChange returns error only if node being stopped.
         // This proposal is dropped by followers because DisableProposalForwarding is enabled.
         if err := c.Node.ProposeConfChange(context.TODO(), *confChange); err != nil {
            c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
         }
      }()

      c.confChangeInProgress = confChange

      switch confChange.Type {
      case raftpb.ConfChangeAddNode:
         c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", confChange.NodeID)
      case raftpb.ConfChangeRemoveNode:
         c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", confChange.NodeID)
      default:
         c.logger.Panic("Programming error, encountered unsupported raft config change")
      }

      c.configInflight = true
   }

   c.raftMetadataLock.Lock()
   c.opts.RaftMetadata = raftMetadata
   c.raftMetadataLock.Unlock()
}

第一眼就可以得出一個(gè)結(jié)論,在Fabric的世界里浊仆,Raft的配置更新是包括在ConfigBlock里面的客峭,只不過在block寫入賬本之前會(huì)從里面剝離出來涉及到Raft的配置變更的部分,然后去通知Raft抡柿。

  1. 去ConfigBlock里面拿到EtcdRaft部分的配置舔琅,以及當(dāng)前在用的配置
  2. 做比對(duì),得出一份報(bào)告洲劣,這次新增了哪幾個(gè)節(jié)點(diǎn)备蚓,要?jiǎng)h除哪幾個(gè)幾點(diǎn)
  3. 得出報(bào)告還不行,還得結(jié)合Raft的規(guī)定囱稽,出局一份書面的申請(qǐng)郊尝。UpdateRaftMetadataAndConfChange就是干這個(gè)的。
func (mc *MembershipChanges) UpdateRaftMetadataAndConfChange(raftMetadata *etcdraft.RaftMetadata) *raftpb.ConfChange {
   if mc == nil || mc.TotalChanges == 0 {
      return nil
   }

   var confChange *raftpb.ConfChange

   // producing corresponding raft configuration changes
   if len(mc.AddedNodes) > 0 {
      nodeID := raftMetadata.NextConsenterId
      raftMetadata.Consenters[nodeID] = mc.AddedNodes[0]
      raftMetadata.NextConsenterId++
      confChange = &raftpb.ConfChange{
         ID:     raftMetadata.ConfChangeCounts,
         NodeID: nodeID,
         Type:   raftpb.ConfChangeAddNode,
      }
      raftMetadata.ConfChangeCounts++
      return confChange
   }

   if len(mc.RemovedNodes) > 0 {
      for _, c := range mc.RemovedNodes {
         for nodeID, node := range raftMetadata.Consenters {
            if bytes.Equal(c.ClientTlsCert, node.ClientTlsCert) {
               delete(raftMetadata.Consenters, nodeID)
               confChange = &raftpb.ConfChange{
                  ID:     raftMetadata.ConfChangeCounts,
                  NodeID: nodeID,
                  Type:   raftpb.ConfChangeRemoveNode,
               }
               raftMetadata.ConfChangeCounts++
               break
            }
         }
      }
   }

   return confChange
}
  1. 有沒有發(fā)現(xiàn)战惊,這里執(zhí)行下來每次只會(huì)更新一個(gè)節(jié)點(diǎn)流昏,意味著每次更新Raft成員信息的時(shí)候,每次只能新增或刪除一個(gè)節(jié)點(diǎn)吞获,否則剩下的是不會(huì)生效的况凉。這里感興趣的可以參考Raft論文,每次只變更一個(gè)節(jié)點(diǎn)各拷,是性價(jià)比高的實(shí)現(xiàn)刁绒。

    if len(confState.Nodes) == len(c.opts.RaftMetadata.Consenters) {
       // since configuration change could only add one node or
       // remove one node at a time, if raft nodes state size
       // equal to membership stored in block metadata field,
       // that means everything is in sync and no need to propose
       // update
       return nil
    }
    
  2. 寫入ConfigBlock到本地賬本

  3. c.Node.ProposeConfChange,這里就是通知Raft做配置更新了烤黍。

  4. 設(shè)置configInflight=true知市,表示現(xiàn)在有個(gè)配置更新已經(jīng)提案給Raft了傻盟,等通知。

  5. 記錄本次更新到confChangeInProgress用來之后的跟蹤進(jìn)度

ApplyConfChange

回憶下初狰,當(dāng)我們的提案發(fā)到Raft后莫杈,我怎么知道成員都達(dá)成一致準(zhǔn)備開干呢?當(dāng)然是等待Ready的CommittedEntries啦奢入,最終會(huì)通知applyc通道筝闹。

case raftpb.EntryConfChange:
   var cc raftpb.ConfChange
   if err := cc.Unmarshal(ents[i].Data); err != nil {
      c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
      continue
   }

   c.confState = *c.Node.ApplyConfChange(cc)

   switch cc.Type {
   case raftpb.ConfChangeAddNode:
      c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
   case raftpb.ConfChangeRemoveNode:
      c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
   default:
      c.logger.Panic("Programming error, encountered unsupported raft config change")
   }

   // This ConfChange was introduced by a previously committed config block,
   // we can now unblock submitC to accept envelopes.
   if c.confChangeInProgress != nil &&
      c.confChangeInProgress.NodeID == cc.NodeID &&
      c.confChangeInProgress.Type == cc.Type {

      if err := c.configureComm(); err != nil {
         c.logger.Panicf("Failed to configure communication: %s", err)
      }

      c.confChangeInProgress = nil
      c.configInflight = false
      // report the new cluster size
      c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
   }

   if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
      c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
      // calling goroutine, since otherwise it will be blocked
      // trying to write into haltC
      go c.Halt()
   }
}
  1. c.Node.ApplyConfChange(cc),總算是看到了腥光,這里就是執(zhí)行配置更新了关顷。
  2. 還記得上面會(huì)去記錄confChangeInProgress么?如果相等說明之前給Raft的提案武福,終于收到了響應(yīng)议双,大家都準(zhǔn)備好了,開始吧捉片。
  3. c.configureComm()會(huì)在cluster章節(jié)講解平痰,這里簡單的說就是按照最新的成員,構(gòu)建Raft網(wǎng)絡(luò)伍纫。
  4. 釋放configInflight和confChangeInProgress宗雇,代表本次配置更新完畢。
  5. 如果接收到的是刪除節(jié)點(diǎn)的通知莹规,看下是不是本人赔蒲,如果是,調(diào)用Halt良漱,想也知道舞虱,最終會(huì)去調(diào)Node的Stop,停掉該Raft節(jié)點(diǎn)母市。

最后

關(guān)于通訊層也是很重要的部分矾兜,這里不光是托管Raft的消息傳遞,也是支撐Orderer cluster的關(guān)鍵患久,下次單獨(dú)拿來講吧焕刮。

etcdraft的部分差不多就是這樣了,當(dāng)然了墙杯,有很多細(xì)節(jié)沒有涉及配并,比如config的部分。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末高镐,一起剝皮案震驚了整個(gè)濱河市溉旋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嫉髓,老刑警劉巖观腊,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邑闲,死亡現(xiàn)場離奇詭異,居然都是意外死亡梧油,警方通過查閱死者的電腦和手機(jī)苫耸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來儡陨,“玉大人褪子,你說我怎么就攤上這事∑澹” “怎么了嫌褪?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長胚股。 經(jīng)常有香客問我笼痛,道長,這世上最難降的妖魔是什么琅拌? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任缨伊,我火速辦了婚禮,結(jié)果婚禮上进宝,老公的妹妹穿的比我還像新娘倘核。我一直安慰自己,他們只是感情好即彪,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著活尊,像睡著了一般隶校。 火紅的嫁衣襯著肌膚如雪啤挎。 梳的紋絲不亂的頭發(fā)上蹲堂,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音佩研,去河邊找鬼铜犬。 笑死舞终,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的癣猾。 我是一名探鬼主播敛劝,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼纷宇!你這毒婦竟也來了夸盟?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤像捶,失蹤者是張志新(化名)和其女友劉穎上陕,沒想到半個(gè)月后桩砰,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡释簿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年亚隅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片庶溶。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡煮纵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出渐尿,到底是詐尸還是另有隱情醉途,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布砖茸,位于F島的核電站隘擎,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏凉夯。R本人自食惡果不足惜货葬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望劲够。 院中可真熱鬧震桶,春花似錦、人聲如沸征绎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽人柿。三九已至柴墩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凫岖,已是汗流浹背江咳。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哥放,地道東北人歼指。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像甥雕,于是被迫代替她去往敵國和親踩身。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容