死磕hyperledger fabric源碼|交易廣播

死磕hyperledger fabric源碼|交易廣播

文章及代碼:https://github.com/blockchainGuide/

分支:v1.1.0

40fe3d4a84cc46e22caf5e06071c3aa7

前言

Hyperledger Fabric提供了Broadcast(srv ab.AtomicBroadcast_BroadcastServer)交易廣播服務(wù)接口澜搅,接收客戶端提交的簽名交易消息請求假栓,交由共識組件鏈對象對交易進行排序與執(zhí)行通道管理脐瑰,按照交易出塊規(guī)則切割打包总棵,構(gòu)造新區(qū)塊并提交賬本。同時仅叫,通過Deliver()區(qū)塊分發(fā)服務(wù)接口帜篇,將區(qū)塊數(shù)據(jù)發(fā)送給通道組織內(nèi)發(fā)起請求的Leader主節(jié)點,再基于Gossip消息協(xié)議廣播到組織內(nèi)的其他節(jié)點上诫咱,從而實現(xiàn)廣播交易消息的目的笙隙。

Broadcast服務(wù)消息處理

Orderer節(jié)點啟動時已經(jīng)在本地的gRPC服務(wù)器上注冊了Orderer排序服務(wù)器,并創(chuàng)建了Broadcast服務(wù)處理句柄坎缭。當客戶端調(diào)用Broadcast()服務(wù)接口發(fā)起服務(wù)請求時竟痰,Orderer排序服務(wù)器會調(diào)用Broadcast()→s.bh.Handle()方法處理請求签钩,流程如下:

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
...
    return s.bh.Handle(&broadcastMsgTracer{
    ...
    })
}
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
  ...
}

主要就是這個Handle的處理,分析如下:

①:等待接收處理消息

msg, err := srv.Recv()

②:解析獲取通道頭部chdr坏快、配置交易消息標志位isConfig铅檩、通道鏈支持對象(通道消息處理器)

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)

③:檢查共識組件鏈對象是否準備好接收新的交易消息

if err = processor.WaitReady(); err != nil {}

④:分類處理消息

處理普通消息

4.1 解析獲取通道的最新配置序號

configSeq, err := processor.ProcessNormalMsg(msg)
/orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    configSeq = s.support.Sequence()
    err = s.filters.Apply(env)
    return
}

configSeq是最新配置序號,默認初始值為0莽鸿,新建應(yīng)用通道后該配置序號自增為1昧旨,通過比較該序號就能判斷當前通道配置版本是否發(fā)生了更新,從而確定當前交易消息是否需要重新過濾與重新排序祥得。

接著就是使用自帶的默認通道消息過濾器過濾消息兔沃,有以下過濾條件:

  • 驗證不能為空
  • 拒絕過期的簽名者身份證書
  • 消息最大字節(jié)數(shù)過濾器(98MB)
  • 消息簽名驗證過濾器

4.2 構(gòu)造新的普通交易消息并發(fā)送到共識組件鏈對象請求處理

err = processor.Order(msg, configSeq) 

這里我們只關(guān)注kafka的共識組件處理。

首先序列化消息级及,然后將該消息發(fā)送到Kafka集群的指定分區(qū)上請求排序乒疏,再轉(zhuǎn)發(fā)給Kafka共識組件鏈對象請求打包出塊。

/orderer/consensus/kafka/chain.go
func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
    marshaledEnv, err := utils.Marshal(env)
    if err != nil {
        return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
    }
    if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
        return fmt.Errorf("cannot enqueue")
    }
    return nil
}

我們來看看enqueue方法是如何做的:

func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
    logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
    select {
    case <-chain.startChan: // // 共識組件在啟動階段啟動完成
        select {
        case <-chain.haltChan: //  已經(jīng)關(guān)閉chain.startChan通道
        ...
            }
            //// 創(chuàng)建Kafka生產(chǎn)者消息
            message := newProducerMessage(chain.channel, payload)
            //// 發(fā)送消息到Kafka集群請求排序
            if _, _, err = chain.producer.SendMessage(message); err != nil {
            ...
    }
}

處理通道配置交易消息

4.3 獲取配置交易消息與通道的最新配置序號

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)

代碼位置:/orderer/common/msgprocessor/systemchannel.go/ProcessConfigUpdateMsg,大概做了以下事情:

  • 獲取消息中的通道ID
  • 檢查消息中的通道ID與當前通道ID是否一致,一致的話交由標準通道處理器處理
  • 創(chuàng)建新應(yīng)用通道的通道配置實體Bundle結(jié)構(gòu)對象
  • 構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型)饮焦,注意將該消息的通道配置序號更新為1
  • 創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
  • 創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
  • 應(yīng)用系統(tǒng)通道的消息過濾器
  • 返回新的通道配置交易消息與當前系統(tǒng)通道的配置序號
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    channelID, err := utils.ChannelID(envConfigUpdate) // 獲取消息中的通道ID
    ...
    //檢查消息中的通道ID與當前通道ID是否一致
    if channelID == s.support.ChainID() {
        //// 交由標準通道處理器處理
        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
    }
    ...
    // 創(chuàng)建新的應(yīng)用通道怕吴,其通道配置序號默認初始化為0
    // 創(chuàng)建新應(yīng)用通道的通道配置實體Bundle結(jié)構(gòu)對象
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    ...
    //構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型),注意將該消息的通道配置序號更新為1
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
    ...
    //創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    ...
    //創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    ...
    // 應(yīng)用系統(tǒng)通道的消息過濾器
    err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
    ...
    //返回新的通道配置交易消息與當前系統(tǒng)通道的配置序號
    return wrappedOrdererTransaction, s.support.Sequence(), nil

4.4 構(gòu)造新的配置交易消息發(fā)送到共識組件鏈對象請求排序

err = processor.Configure(config, configSeq)

這里我們依舊只是考慮kafka共識組件县踢,processor.Configure()方法實際上是調(diào)用chainImpl.configure()方法转绷,同樣構(gòu)造Kafka常規(guī)消息(KafkaMessageRegular類型)。其中硼啤,Class消息類別屬于KafkaMessageRegular_CONFIG類型暇咆,包含了通道配置交易消息、 通道配置序號configSeq與初始消息偏移量originalOffset(0)丙曙。接著,調(diào)用chain.enqueue()方法其骄,將其發(fā)送到Kafka集群上指定主題(chainID)和分區(qū)號(0)的分區(qū)上亏镰,同時,由Kafka共識組件鏈對象分區(qū)消費者channelConsumer獲取該消息拯爽,再交由給Kafka共識組件鏈對象請求打包出塊索抓。

⑤:發(fā)送成功處理狀態(tài)響應(yīng)消息

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

整個流程圖如下:

image-20210125112957202

參考

https://github.com/blockchainGuide/ (文章圖片代碼資料)

微信公眾號:區(qū)塊鏈技術(shù)棧

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市毯炮,隨后出現(xiàn)的幾起案子逼肯,更是在濱河造成了極大的恐慌,老刑警劉巖桃煎,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件篮幢,死亡現(xiàn)場離奇詭異,居然都是意外死亡为迈,警方通過查閱死者的電腦和手機三椿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門缺菌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人搜锰,你說我怎么就攤上這事伴郁。” “怎么了蛋叼?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵焊傅,是天一觀的道長。 經(jīng)常有香客問我狈涮,道長狐胎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任薯嗤,我火速辦了婚禮顽爹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘骆姐。我一直安慰自己镜粤,他們只是感情好,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布玻褪。 她就那樣靜靜地躺著肉渴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪带射。 梳的紋絲不亂的頭發(fā)上同规,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機與錄音窟社,去河邊找鬼券勺。 笑死,一個胖子當著我的面吹牛灿里,可吹牛的內(nèi)容都是我干的关炼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼匣吊,長吁一口氣:“原來是場噩夢啊……” “哼儒拂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起色鸳,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤社痛,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后命雀,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蒜哀,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年咏雌,在試婚紗的時候發(fā)現(xiàn)自己被綠了凡怎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片校焦。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖统倒,靈堂內(nèi)的尸體忽然破棺而出寨典,到底是詐尸還是另有隱情,我是刑警寧澤房匆,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布耸成,位于F島的核電站,受9級特大地震影響浴鸿,放射性物質(zhì)發(fā)生泄漏井氢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一岳链、第九天 我趴在偏房一處隱蔽的房頂上張望花竞。 院中可真熱鬧,春花似錦掸哑、人聲如沸约急。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽厌蔽。三九已至,卻和暖如春摔癣,著一層夾襖步出監(jiān)牢的瞬間奴饮,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工择浊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留戴卜,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓琢岩,卻偏偏與公主長得像叉瘩,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子粘捎,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容