Hyperledger-Fabric源碼分析(Gossip-StateInfo)

StateInfo是用來傳播peer的狀態(tài)信息給其他成員没龙。

struct

type StateInfo struct {
   Timestamp *PeerTime 
   PkiId     []byte   
   // channel_MAC is an authentication code that proves
   // that the peer that sent this message knows
   // the name of the channel.
   Channel_MAC          []byte      
   Properties           *Properties 
}

type Properties struct {
    LedgerHeight         uint64       
    LeftChannel          bool         
    Chaincodes           []*Chaincode 
}

初始化

stateInfMsg := &proto.StateInfo{
        Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
        PkiId:       gc.pkiID,
        Timestamp: &proto.PeerTime{
            IncNum: gc.incTime,
            SeqNum: uint64(time.Now().UnixNano()),
        },
        Properties: &proto.Properties{
            LeftChannel:  leftChannel,
            LedgerHeight: ledgerHeight,
            Chaincodes:   chaincodes,
        },
    }
    m := &proto.GossipMessage{
        Nonce: 0,
        Tag:   proto.GossipMessage_CHAN_OR_ORG,
        Content: &proto.GossipMessage_StateInfo{
            StateInfo: stateInfMsg,
        },
    }
  • 可以看到stateinfo的組成
    • pkiid,你可以理解為peer的標(biāo)識(shí)id陪白,內(nèi)部其實(shí)就是mspid+cert算出來的一個(gè)摘要。
    • Channel_MAC膳灶,pkiid+chainid生成MAC咱士,背后就是sha256計(jì)算hash
    • 時(shí)間戳
    • 屬性代表著三個(gè)觸發(fā)stateinfo消息的地方
      • 該節(jié)點(diǎn)退出通道
      • 有新的block寫入,更新peer的賬本height
      • chaincode更新
        • 這個(gè)看起來不太好理解轧钓。我跟進(jìn)了下序厉,chaincode部署成功會(huì)觸發(fā)這里。換句話說如果cc部署成功毕箍,是通過這個(gè)消息讓成員知道的弛房。

觸發(fā)點(diǎn)

commitblock

func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {

    // Commit block with available private transactions
    if err := s.ledger.StoreBlock(block, pvtData); err != nil {
        logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
        return err
    }

    // Update ledger height
    s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
    logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
        s.chainID, block.Header.Number, len(block.Data.Data))

    return nil
}
  • 前面就是提交block到賬本了
  • 后來開始UpdateLedgerHeight,開始處理賬本新的height

UpdateLedgerHeight

func (gc *gossipChannel) UpdateLedgerHeight(height uint64) {
    gc.Lock()
    defer gc.Unlock()

    var chaincodes []*proto.Chaincode
    var leftChannel bool
    if prevMsg := gc.stateInfoMsg; prevMsg != nil {
        leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
        chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
    }
    gc.updateProperties(height, chaincodes, leftChannel)
}
  • 因?yàn)橹皇歉耯eight而柑,所以其他的狀態(tài)沿用之前的stateInfoMsg
  • 下面開始要廣播前的最后準(zhǔn)備工作了

updateStateInfo

func (gc *gossipChannel) updateStateInfo(msg *proto.SignedGossipMessage) {
   gc.stateInfoMsgStore.Add(msg)
   gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
   gc.stateInfoMsg = msg
   atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
  • stateInfoMsgStore用來保存收到的成員發(fā)來的所有的stateinfo消息文捶,包括自己的。
  • 更新自己的height
  • 每給其他成員分享一次stateinfo的時(shí)候媒咳,都會(huì)自己保留一份粹排,以備不時(shí)之需。這種情況正好用上涩澡。
  • 啟動(dòng)shouldGossipStateInfo開關(guān)

發(fā)送點(diǎn)

func (gc *gossipChannel) publishStateInfo() {
   if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
      return
   }
   gc.RLock()
   stateInfoMsg := gc.stateInfoMsg
   gc.RUnlock()
   gc.Gossip(stateInfoMsg)
   if len(gc.GetMembership()) > 0 {
      atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))
   }
}
  • 可以看到最后會(huì)在這里將消息Gossip出去顽耳,里面是用emitter模塊去處理。emitter你暫時(shí)不用關(guān)心,里面根據(jù)不同的消息類型來決定點(diǎn)對(duì)點(diǎn)發(fā)送還是群發(fā)射富,不過在這里你只用知道發(fā)出去就好了膝迎。有時(shí)間我會(huì)專門講這個(gè)模塊。
  • 那么發(fā)送是怎么觸發(fā)的呢胰耗?

時(shí)機(jī)

func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
   chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
   gc := &gossipChannel{
      incTime:                   uint64(time.Now().UnixNano()),
      selfOrg:                   org,
      pkiID:                     pkiID,
      mcs:                       mcs,
      Adapter:                   adapter,
      logger:                    util.GetLogger(util.ChannelLogger, adapter.GetConf().ID),
      stopChan:                  make(chan struct{}, 1),
      shouldGossipStateInfo:     int32(0),
      stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval),
      stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval),
      orgs:                      []api.OrgIdentityType{},
      chainID:                   chainID,
   }

   ...

   // Periodically publish state info
   go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
   ...
}
  • 在初始化GossipChannel的時(shí)候限次,會(huì)定時(shí)來監(jiān)聽是否有新的stateinfo消息需要發(fā)布。
  • GossipChannel看名字你也大概能猜到干嘛的宪郊,這是專門給同channel的成員進(jìn)行g(shù)ossip服務(wù)的掂恕。

接受點(diǎn)

中間省略了很多地方,這個(gè)消息的專題都是這種風(fēng)格弛槐,盡量不要被其他的細(xì)節(jié)給干擾懊亡。總之乎串,消息已經(jīng)被peer收到店枣,下面我們看下收到后,怎么處理叹誉。

GossipService

GossipService是統(tǒng)管gossip服務(wù)的鸯两,所有的動(dòng)作都由這里發(fā)起,消息處理也不例外

消息驗(yàn)證

func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage) error {
    verifier := func(identity []byte, signature, message []byte) error {
        pkiID := g.idMapper.GetPKIidOfCert(api.PeerIdentityType(identity))
        if pkiID == nil {
            return errors.New("PKI-ID not found in identity mapper")
        }
        return g.idMapper.Verify(pkiID, signature, message)
    }
    identity, err := g.idMapper.Get(msg.GetStateInfo().PkiId)
    if err != nil {
        return errors.WithStack(err)
    }
    return msg.Verify(identity, verifier)
}
  • 這里主要做兩個(gè)事情
  • 一长豁,判斷當(dāng)前消息的Pkiid是否認(rèn)識(shí)钧唐,這是消息接收的基礎(chǔ)。因?yàn)镚ossip有機(jī)制能同步成員列表匠襟,如果有不認(rèn)識(shí)節(jié)點(diǎn)钝侠,那么就問題大了。
  • 二酸舍,對(duì)方消息是私鑰進(jìn)行數(shù)字簽名的帅韧,這里用本地的公鑰進(jìn)行簽名校驗(yàn)。這也是安全的基礎(chǔ)啃勉。

消息處理

func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
    ...

    if msg.IsChannelRestricted() {
        if gc := g.chanState.lookupChannelForMsg(m); gc == nil {
            // If we're not in the channel, we should still forward to peers of our org
            // in case it's a StateInfo message
            if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
                if g.stateInfoMsgStore.Add(msg) {
                    g.emitter.Add(&emittedGossipMessage{
                        SignedGossipMessage: msg,
                        filter:              m.GetConnectionInfo().ID.IsNotSameFilter,
                    })
                }
            }
            if !g.toDie() {
                g.logger.Debug("No such channel", msg.Channel, "discarding message", msg)
            }
        } else {
            ...
            gc.HandleMessage(m)
        }
        return
    }

    ...
}
  • 前面校驗(yàn)部分過了后忽舟,基本上消息沒有大的問題
  • 下面開始正式處理了,這里需要一些背景知識(shí)淮阐。首先一個(gè)Peer加入一個(gè)channel, 就會(huì)有一個(gè)GossipChannel相伴叮阅。所以如果查不到這個(gè)gc,那么說明Peer不在這個(gè)channel里面枝嘶。
  • 這里是描述了兩個(gè)場(chǎng)景
    • 首先該P(yáng)eer不在這個(gè)channel里面帘饶,但屬于同一個(gè)Org,也就是組織群扶,那么處于優(yōu)化的目的及刻,可以盡快將消息擴(kuò)散出去镀裤,以盡快讓同channel的節(jié)點(diǎn)處理。再次遇到emitter缴饭,忽略它暑劝。
    • 如果同屬一個(gè)channel,那么開始交給所屬的GossipChannel來處理颗搂。

GossipChannel

消息校驗(yàn)

func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
    ...
    if m.IsStateInfoMsg() {
        si := m.GetStateInfo()
        expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
        if !bytes.Equal(expectedMAC, si.Channel_MAC) {
            gc.logger.Warning("Message contains wrong channel MAC(", si.Channel_MAC, "), expected", expectedMAC)
            return false
        }
        return true
    }
    ...
}

  • 通用校驗(yàn)担猛,我們這里就不費(fèi)功夫了,主要看stateinfo的丢氢。

  • 可以看到這里主要做MAC校驗(yàn)傅联,這個(gè)校驗(yàn)感覺沒什么用,并不能保證其完整性疚察。

消息處理

if m.IsDataMsg() || m.IsStateInfoMsg() {
   added := false

   if m.IsDataMsg() {
      ...
   } else { // StateInfoMsg verification should be handled in a layer above
      //  since we don't have access to the id mapper here
      added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
   }

   if added {
      // Forward the message
      gc.Forward(msg)
      // DeMultiplex to local subscribers
      gc.DeMultiplex(m)

      if m.IsDataMsg() {
         gc.blocksPuller.Add(msg.GetGossipMessage())
      }
   }
  • 當(dāng)然了這里會(huì)直接加到gc的stateInfoMsgStore里面存起來蒸走。當(dāng)然了Add也不是那么簡(jiǎn)單。簡(jiǎn)單看看貌嫡。

    func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
       if !cache.MessageStore.CheckValid(msg) {
          return false
       }
       if !cache.verify(msg) {
          return false
       }
       added := cache.MessageStore.Add(msg)
       if added {
          pkiID := msg.GetStateInfo().PkiId
          cache.MembershipStore.Put(pkiID, msg)
       }
       return added
    }
    
    • CheckValid會(huì)將msg跟本地保存做比較比驻,如果是全新的或比已有的新,會(huì)判定有效
    • verify主要是校驗(yàn)消息發(fā)起人是否同屬一個(gè)Channel岛抄,還有就是這個(gè)節(jié)點(diǎn)是否有讀取成員狀態(tài)的權(quán)力别惦。
    • 按pkiID的維度冗余一遍
  • 如果Add成功,第一件事就是讓其他人知道夫椭。通過emitter轉(zhuǎn)發(fā)出去

  • DeMultiplex是本地的一個(gè)多路分發(fā)的模塊掸掸,里面可以增加訂閱器,來訂閱感興趣的消息類型蹭秋,進(jìn)而處理猾漫。不過幸運(yùn)的是,stateinfo沒有人訂閱感凤,所以這里不擴(kuò)散了。

使用

前面講了這么多StateInfo消息粒督,那么問題來了陪竿,這消息收下來到底干嘛用呢?舉兩個(gè)例子就清楚了屠橄。當(dāng)然里面的Properties還有別的用處族跛,這里先不展開。

func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
   peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
   if len(peerIdentity) == 0 {
      gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
      return false
   }
   msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
   if msg == nil {
      return false
   }
   return true
}
  • 這里如果能從stateInfoMsgStore里面找到锐墙,說明這個(gè)member同屬一個(gè)通道礁哄。
// If we don't have a StateInfo message from the peer,
// no way of validating its eligibility in the channel.
if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {
   gc.logger.Debug("Don't have StateInfo message of peer", msg.GetConnectionInfo())
   return
}
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
   gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
   return
}
  • 這里也是一樣,上面判定同屬一個(gè)通道溪北,下面判定是否同屬一個(gè)通道下的同一個(gè)組織桐绒。

總結(jié)

  • StateInfo主要給別人分享是否有新的cc部署完成夺脾,是否退出通道,是否有新的block寫入茉继。
  • 不管如何咧叭,第一時(shí)間轉(zhuǎn)發(fā)消息給成員,利于消息擴(kuò)散烁竭。
  • 然后自循環(huán)菲茬,對(duì)外發(fā)布狀態(tài)更新。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末派撕,一起剝皮案震驚了整個(gè)濱河市婉弹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌终吼,老刑警劉巖镀赌,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異衔峰,居然都是意外死亡佩脊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門垫卤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來威彰,“玉大人,你說我怎么就攤上這事穴肘⌒危” “怎么了?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵评抚,是天一觀的道長(zhǎng)豹缀。 經(jīng)常有香客問我,道長(zhǎng)慨代,這世上最難降的妖魔是什么邢笙? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮侍匙,結(jié)果婚禮上氮惯,老公的妹妹穿的比我還像新娘。我一直安慰自己想暗,他們只是感情好妇汗,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著说莫,像睡著了一般杨箭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上储狭,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天互婿,我揣著相機(jī)與錄音捣郊,去河邊找鬼。 笑死擒悬,一個(gè)胖子當(dāng)著我的面吹牛模她,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播懂牧,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼侈净,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了僧凤?” 一聲冷哼從身側(cè)響起畜侦,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎躯保,沒想到半個(gè)月后旋膳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡途事,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年验懊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尸变。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡义图,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出召烂,到底是詐尸還是另有隱情碱工,我是刑警寧澤,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布奏夫,位于F島的核電站怕篷,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏酗昼。R本人自食惡果不足惜廊谓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望麻削。 院中可真熱鬧蹂析,春花似錦、人聲如沸碟婆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)竖共。三九已至,卻和暖如春俺祠,著一層夾襖步出監(jiān)牢的瞬間公给,已是汗流浹背借帘。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淌铐,地道東北人肺然。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像腿准,于是被迫代替她去往敵國(guó)和親际起。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容