Hyperledger-Fabric源碼分析(Gossip-PullEngine)

/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items
   identified by string numbers.
   The protocol is as follows:
   1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers.
   2) Each remote peer responds with a digest of its messages and returns that NONCE.
   3) The initiator checks the validity of the NONCEs received, aggregates the digests,
      and crafts a request containing specific item ids it wants to receive from each remote peer and then
      sends each request to its corresponding peer.
   4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE.

    Other peer                          Initiator
    O <-------- Hello <NONCE> -------------------------  O
   /|\    --------- Digest <[3,5,8, 10...], NONCE> -------->     /|\
    | <-------- Request <[3,8], NONCE> -----------------      |
   / \    --------- Response <[item3, item8], NONCE>------->     / \

*/
  • 先看下官方的解釋好了五慈。
  • 還記得之前的RemoteStateRequest篇么格了?那個(gè)似乎也是去拉取block啊假抄,這里似乎也是啊劫拢,下面我們具體看下陶冷,為什么要有這兩種機(jī)制碴开,他們所面對(duì)的場(chǎng)景是什么。

Hello

發(fā)起點(diǎn)

func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
    engine := &PullEngine{
        PullAdapter:        participant,
        stopFlag:           int32(0),
        state:              util.NewSet(),
        item2owners:        make(map[string][]string),
        peers2nonces:       make(map[string]uint64),
        nonces2peers:       make(map[uint64]string),
        acceptingDigests:   int32(0),
        acceptingResponses: int32(0),
        incomingNONCES:     util.NewSet(),
        outgoingNONCES:     util.NewSet(),
        digFilter:          df,
        digestWaitTime:     util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime),
        requestWaitTime:    util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime),
        responseWaitTime:   util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime),
    }

    go func() {
        for !engine.toDie() {
            time.Sleep(sleepTime)
            if engine.toDie() {
                return
            }
            engine.initiatePull()
        }
    }()

    return engine
}

func (engine *PullEngine) initiatePull() {
   engine.lock.Lock()
   defer engine.lock.Unlock()

   engine.acceptDigests()
   for _, peer := range engine.SelectPeers() {
      nonce := engine.newNONCE()
      engine.outgoingNONCES.Add(nonce)
      engine.nonces2peers[nonce] = peer
      engine.peers2nonces[peer] = nonce
      engine.Hello(peer, nonce)
   }

   time.AfterFunc(engine.digestWaitTime, func() {
      engine.processIncomingDigests()
   })
}
  • acceptDigests表示正式開(kāi)始一次基于hello-digest流程青伤,串行執(zhí)行。
  • 當(dāng)PullEngine啟動(dòng)的時(shí)候每隔一定時(shí)間會(huì)隨機(jī)選取節(jié)點(diǎn)發(fā)起Hello請(qǐng)求
  • 這里需要注意的是殴瘦,會(huì)記錄Hello請(qǐng)求的Nonce和peer節(jié)點(diǎn)的關(guān)系狠角。應(yīng)該是后面會(huì)以這個(gè)來(lái)跟蹤執(zhí)行情況。
  • 發(fā)出的Hello會(huì)記錄到outgoingNONCES里面
  • 超時(shí)的情況蚪腋,之后再講

請(qǐng)求

func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
   helloMsg := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Content: &proto.GossipMessage_Hello{
         Hello: &proto.GossipHello{
            Nonce:    nonce,
            Metadata: nil,
            MsgType:  p.config.MsgType,
         },
      },
   }

   p.logger.Debug("Sending", p.config.MsgType, "hello to", dest)
   sMsg, err := helloMsg.NoopSign()
   if err != nil {
      p.logger.Errorf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
  • 沒(méi)什么好講的丰歌,注意下nonce就好。下面看下對(duì)方收到hello怎么處理屉凯。

處理

func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
   engine.incomingNONCES.Add(nonce)

   time.AfterFunc(engine.requestWaitTime, func() {
      engine.incomingNONCES.Remove(nonce)
   })

   a := engine.state.ToArray()
   var digest []string
   filter := engine.digFilter(context)
   for _, item := range a {
      dig := item.(string)
      if !filter(dig) {
         continue
      }
      digest = append(digest, dig)
   }
   if len(digest) == 0 {
      return
   }
   engine.SendDigest(digest, nonce, context)
}
  • 這里也是一樣會(huì)記錄收到的hello請(qǐng)求的nonce到本地incomingNONCES
  • 將本地的state里面的blocknums序號(hào)集組裝成Digest返回立帖,這里以blockpullengine為例,每次收到block消息時(shí)候神得,都會(huì)將block序號(hào)冗余到engine的state里面厘惦。
  • 下面我們看下Digest的情況

Digest

請(qǐng)求

func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) {
   digMsg := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataDig{
         DataDig: &proto.DataDigest{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Digests: util.StringsToBytes(digest),
         },
      },
   }
   remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
   if p.logger.IsEnabledFor(zapcore.DebugLevel) {
      p.logger.Debug("Sending", p.config.MsgType, "digest:", digMsg.GetDataDig().FormattedDigests(), "to", remotePeer)
   }

   context.(proto.ReceivedMessage).Respond(digMsg)
}
  • 不會(huì)再生成新的Nonce,這里會(huì)沿用之前的nonce哩簿。

處理

func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
   if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
      return
   }

   engine.lock.Lock()
   defer engine.lock.Unlock()

   for _, n := range digest {
      if engine.state.Exists(n) {
         continue
      }

      if _, exists := engine.item2owners[n]; !exists {
         engine.item2owners[n] = make([]string, 0)
      }

      engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
   }
}
  • 基本就是統(tǒng)計(jì)本地沒(méi)有的block序號(hào),將序號(hào)所屬的peer節(jié)點(diǎn)記錄下來(lái)酝静,還記得發(fā)hello的時(shí)候的nonces2peers么节榜,當(dāng)時(shí)記錄了hello是發(fā)給哪個(gè)節(jié)點(diǎn)的。
  • 這里需要注意的是item2owners别智,這里會(huì)最終組成一組映射宗苍,擁有某個(gè)block的peer列表

小結(jié)

好了,至此薄榛,整個(gè)流程處理了一半讳窟,發(fā)起點(diǎn)也收到了節(jié)點(diǎn)反饋的Digest數(shù)據(jù)。知道了自己和對(duì)方的差異的部分敞恋,那么什么時(shí)候開(kāi)始正式拉取數(shù)據(jù)呢丽啡?下面我們繼續(xù)往下深入。

Request

發(fā)起點(diǎn)

func (engine *PullEngine) initiatePull() {
   ...

   time.AfterFunc(engine.digestWaitTime, func() {
      engine.processIncomingDigests()
   })
}
  • 還記得這部分代碼么硬猫?就是hello的超時(shí)處理的部分补箍,很明顯了改执,Request的發(fā)起點(diǎn)就是等待digestWaitTime超時(shí)。
func (engine *PullEngine) processIncomingDigests() {
   engine.ignoreDigests()

   engine.lock.Lock()
   defer engine.lock.Unlock()

   requestMapping := make(map[string][]string)
   for n, sources := range engine.item2owners {
      // select a random source
      source := sources[util.RandomInt(len(sources))]
      if _, exists := requestMapping[source]; !exists {
         requestMapping[source] = make([]string, 0)
      }
      // append the number to that source
      requestMapping[source] = append(requestMapping[source], n)
   }

   engine.acceptResponses()

   for dest, seqsToReq := range requestMapping {
      engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
   }

   time.AfterFunc(engine.responseWaitTime, engine.endPull)
}
  • ignoreDigests表示hello-digest結(jié)束坑雅,開(kāi)始request-response流程
  • 統(tǒng)計(jì)item2owners辈挂,計(jì)算本地沒(méi)有的block部分,準(zhǔn)備去擁有方去拉取

請(qǐng)求

func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
   req := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataReq{
         DataReq: &proto.DataRequest{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Digests: util.StringsToBytes(items),
         },
      },
   }
   if p.logger.IsEnabledFor(zapcore.DebugLevel) {
      p.logger.Debug("Sending", req.GetDataReq().FormattedDigests(), "to", dest)
   }
   sMsg, err := req.NoopSign()
   if err != nil {
      p.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}

只需要注意nonce裹粤,其他沒(méi)什么好講的终蒂。說(shuō)明整個(gè)過(guò)程雙方都會(huì)密切關(guān)心這個(gè)流程的狀態(tài)。

處理

func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
   if !engine.incomingNONCES.Exists(nonce) {
      return
   }
   engine.lock.Lock()
   defer engine.lock.Unlock()

   filter := engine.digFilter(context)
   var items2Send []string
   for _, item := range items {
      if engine.state.Exists(item) && filter(item) {
         items2Send = append(items2Send, item)
      }
   }

   if len(items2Send) == 0 {
      return
   }

   go engine.SendRes(items2Send, context, nonce)
}
  • 基本上就是拿到對(duì)方發(fā)來(lái)的請(qǐng)求遥诉,來(lái)滿足對(duì)方的需求拇泣,馬上來(lái)看看Response發(fā)送會(huì)組裝些什么?

Response

請(qǐng)求

func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) {
   items2return := []*proto.Envelope{}
   p.RLock()
   defer p.RUnlock()
   for _, item := range items {
      if msg, exists := p.itemID2Msg[item]; exists {
         items2return = append(items2return, msg.Envelope)
      }
   }
   returnedUpdate := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataUpdate{
         DataUpdate: &proto.DataUpdate{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Data:    items2return,
         },
      },
   }
   remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
   p.logger.Debug("Sending", len(returnedUpdate.GetDataUpdate().Data), p.config.MsgType, "items to", remotePeer)
   context.(proto.ReceivedMessage).Respond(returnedUpdate)
}
  • itemID2Msg突那,這里暫存了收到的block
  • 所以這里就是根據(jù)收到block序號(hào)列表挫酿,然后組裝items2return返回給對(duì)方。

處理

if res := msg.GetDataUpdate(); res != nil {
   itemIDs = make([]string, len(res.Data))
   items = make([]*proto.SignedGossipMessage, len(res.Data))
   pullMsgType = ResponseMsgType
   for i, pulledMsg := range res.Data {
      msg, err := pulledMsg.ToGossipMessage()
      if err != nil {
         p.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
         return
      }
      p.MsgCons(msg)
      itemIDs[i] = p.IdExtractor(msg)
      items[i] = msg
      p.Lock()
      p.itemID2Msg[itemIDs[i]] = msg
      p.Unlock()
   }
   p.engine.OnRes(itemIDs, res.Nonce)
}
  • MsgCons(msg)這里會(huì)將拉取的block push到payloads里面愕难,等待commit到本地早龟,并同步最新的height給好朋友們。至于怎么到payloads的猫缭,細(xì)節(jié)這里就不提了葱弟,有興趣的可以自己研究。
  • 剩下的就是為了Pull機(jī)制來(lái)服務(wù)的猜丹,將收到的數(shù)據(jù)進(jìn)行下次pull的準(zhǔn)備芝加。加入本地state和itemID2Msg,什么的射窒,都跟前面的做一些呼應(yīng)藏杖。

補(bǔ)充

下面我們看下gossip里面是怎么使用這個(gè)PullEngine的,總的來(lái)說(shuō)分為兩塊脉顿,一是block蝌麸,前面也提到,另一個(gè)就是cert艾疟。

BlockPuller

func (gc *gossipChannel) createBlockPuller() pull.Mediator {
   conf := pull.Config{
      MsgType:           proto.PullMsgType_BLOCK_MSG,
      Channel:           []byte(gc.chainID),
      ID:                gc.GetConf().ID,
      PeerCountToSelect: gc.GetConf().PullPeerNum,
      PullInterval:      gc.GetConf().PullInterval,
      Tag:               proto.GossipMessage_CHAN_AND_ORG,
   }
   seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {
      dataMsg := msg.GetDataMsg()
      if dataMsg == nil || dataMsg.Payload == nil {
         gc.logger.Warning("Non-data block or with no payload")
         return ""
      }
      return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
   }
   adapter := &pull.PullAdapter{
      Sndr:        gc,
      MemSvc:      gc.memFilter,
      IdExtractor: seqNumFromMsg,
      MsgCons: func(msg *proto.SignedGossipMessage) {
         gc.DeMultiplex(msg)
      },
   }

   adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
      gc.RLock()
      height := gc.ledgerHeight
      gc.RUnlock()
      digests := digestMsg.Digests
      digestMsg.Digests = nil
      for i := range digests {
         seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
         if err != nil {
            gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
            continue
         }
         if seqNum >= height {
            digestMsg.Digests = append(digestMsg.Digests, digests[i])
         }

      }
      return digestMsg
   }

   return pull.NewPullMediator(conf, adapter)
}
  • 這里決定了首先接收的是PullMsgType_BLOCK_MSG類型消息
  • 然后會(huì)在DataMessage消息中提取SeqNum来吩,也就是blocknum
  • 過(guò)濾掉收到消息的height低于本地賬本的部分,沒(méi)有意義啦蔽莱,既然本地已經(jīng)有了弟疆。
  • 這里的MsgCons也就是消息的comsumer,最終會(huì)將收到的block提交到本地賬本
  • 之后就是走engine的標(biāo)準(zhǔn)流程了

CertPuller

func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
   conf := pull.Config{
      MsgType:           proto.PullMsgType_IDENTITY_MSG,
      Channel:           []byte(""),
      ID:                g.conf.InternalEndpoint,
      PeerCountToSelect: g.conf.PullPeerNum,
      PullInterval:      g.conf.PullInterval,
      Tag:               proto.GossipMessage_EMPTY,
   }
   pkiIDFromMsg := func(msg *proto.SignedGossipMessage) string {
      identityMsg := msg.GetPeerIdentity()
      if identityMsg == nil || identityMsg.PkiId == nil {
         return ""
      }
      return fmt.Sprintf("%s", string(identityMsg.PkiId))
   }
   certConsumer := func(msg *proto.SignedGossipMessage) {
      idMsg := msg.GetPeerIdentity()
      if idMsg == nil || idMsg.Cert == nil || idMsg.PkiId == nil {
         g.logger.Warning("Invalid PeerIdentity:", idMsg)
         return
      }
      err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
      if err != nil {
         g.logger.Warningf("Failed associating PKI-ID with certificate: %+v", errors.WithStack(err))
      }
      g.logger.Debug("Learned of a new certificate:", idMsg.Cert)
   }
   adapter := &pull.PullAdapter{
      Sndr:            g.comm,
      MemSvc:          g.disc,
      IdExtractor:     pkiIDFromMsg,
      MsgCons:         certConsumer,
      EgressDigFilter: g.sameOrgOrOurOrgPullFilter,
   }
   return pull.NewPullMediator(conf, adapter)
}
  • 這里決定了接收的是PullMsgType_IDENTITY_MSG類型消息
  • 然后會(huì)在PeerIdentity消息中提取PkiId
  • certConsumer這里是消息的訂閱者盗冷,最終會(huì)將收到的消息存入idMapper
  • 之后就是走engine的標(biāo)準(zhǔn)流程了

總結(jié)

  • 至此怠苔,基本上你能區(qū)別RemoteStateRequest和DataRequest的區(qū)別。DataRequest是用來(lái)給本地的block進(jìn)行查漏補(bǔ)缺正塌,而RemoteStateRequest是block同步的主力嘀略,主要是跟leader的最新的block保持一致恤溶。
  • PullEngine不光是用來(lái)同步block,也用來(lái)同步peer節(jié)點(diǎn)的證書帜羊。分別叫BlockPullEngine和CertPullEngine咒程。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市讼育,隨后出現(xiàn)的幾起案子帐姻,更是在濱河造成了極大的恐慌,老刑警劉巖奶段,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件饥瓷,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡痹籍,警方通過(guò)查閱死者的電腦和手機(jī)呢铆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蹲缠,“玉大人棺克,你說(shuō)我怎么就攤上這事∠叨ǎ” “怎么了娜谊?”我有些...
    開(kāi)封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)斤讥。 經(jīng)常有香客問(wèn)我纱皆,道長(zhǎng),這世上最難降的妖魔是什么芭商? 我笑而不...
    開(kāi)封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任派草,我火速辦了婚禮,結(jié)果婚禮上铛楣,老公的妹妹穿的比我還像新娘澳眷。我一直安慰自己,他們只是感情好蛉艾,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著衷敌,像睡著了一般勿侯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缴罗,一...
    開(kāi)封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天助琐,我揣著相機(jī)與錄音,去河邊找鬼面氓。 笑死兵钮,一個(gè)胖子當(dāng)著我的面吹牛蛆橡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播掘譬,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼泰演,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了葱轩?” 一聲冷哼從身側(cè)響起睦焕,我...
    開(kāi)封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎靴拱,沒(méi)想到半個(gè)月后垃喊,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡袜炕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年本谜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片偎窘。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡乌助,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出评架,到底是詐尸還是另有隱情眷茁,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布纵诞,位于F島的核電站上祈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏浙芙。R本人自食惡果不足惜登刺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望嗡呼。 院中可真熱鬧纸俭,春花似錦、人聲如沸南窗。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)万伤。三九已至窒悔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間敌买,已是汗流浹背简珠。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留虹钮,地道東北人聋庵。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓膘融,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親祭玉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子氧映,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容