死磕hyperledger fabric源碼|交易廣播
文章及代碼:https://github.com/blockchainGuide/
分支:v1.1.0
前言
Hyperledger Fabric
提供了Broadcast(srv ab.AtomicBroadcast_BroadcastServer)
交易廣播服務(wù)接口澜搅,接收客戶端提交的簽名交易消息請求假栓,交由共識組件鏈對象對交易進行排序與執(zhí)行通道管理脐瑰,按照交易出塊規(guī)則切割打包总棵,構(gòu)造新區(qū)塊并提交賬本。同時仅叫,通過Deliver()
區(qū)塊分發(fā)服務(wù)接口帜篇,將區(qū)塊數(shù)據(jù)發(fā)送給通道組織內(nèi)發(fā)起請求的Leader
主節(jié)點,再基于Gossip
消息協(xié)議廣播到組織內(nèi)的其他節(jié)點上诫咱,從而實現(xiàn)廣播交易消息的目的笙隙。
Broadcast服務(wù)消息處理
Orderer
節(jié)點啟動時已經(jīng)在本地的gRPC
服務(wù)器上注冊了Orderer
排序服務(wù)器,并創(chuàng)建了Broadcast
服務(wù)處理句柄坎缭。當客戶端調(diào)用Broadcast()
服務(wù)接口發(fā)起服務(wù)請求時竟痰,Orderer
排序服務(wù)器會調(diào)用Broadcast()→s.bh.Handle()
方法處理請求签钩,流程如下:
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
...
return s.bh.Handle(&broadcastMsgTracer{
...
})
}
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
...
}
主要就是這個Handle
的處理,分析如下:
①:等待接收處理消息
msg, err := srv.Recv()
②:解析獲取通道頭部chdr坏快、配置交易消息標志位isConfig铅檩、通道鏈支持對象(通道消息處理器)
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
③:檢查共識組件鏈對象是否準備好接收新的交易消息
if err = processor.WaitReady(); err != nil {}
④:分類處理消息
處理普通消息
4.1 解析獲取通道的最新配置序號
configSeq, err := processor.ProcessNormalMsg(msg)
/orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = s.support.Sequence()
err = s.filters.Apply(env)
return
}
configSeq是最新配置序號,默認初始值為0莽鸿,新建應(yīng)用通道后該配置序號自增為1昧旨,通過比較該序號就能判斷當前通道配置版本是否發(fā)生了更新,從而確定當前交易消息是否需要重新過濾與重新排序祥得。
接著就是使用自帶的默認通道消息過濾器過濾消息兔沃,有以下過濾條件:
- 驗證不能為空
- 拒絕過期的簽名者身份證書
- 消息最大字節(jié)數(shù)過濾器(98MB)
- 消息簽名驗證過濾器
4.2 構(gòu)造新的普通交易消息并發(fā)送到共識組件鏈對象請求處理
err = processor.Order(msg, configSeq)
這里我們只關(guān)注kafka
的共識組件處理。
首先序列化消息级及,然后將該消息發(fā)送到Kafka
集群的指定分區(qū)上請求排序乒疏,再轉(zhuǎn)發(fā)給Kafka
共識組件鏈對象請求打包出塊。
/orderer/consensus/kafka/chain.go
func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
marshaledEnv, err := utils.Marshal(env)
if err != nil {
return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
}
if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
return fmt.Errorf("cannot enqueue")
}
return nil
}
我們來看看enqueue方法是如何做的:
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
select {
case <-chain.startChan: // // 共識組件在啟動階段啟動完成
select {
case <-chain.haltChan: // 已經(jīng)關(guān)閉chain.startChan通道
...
}
//// 創(chuàng)建Kafka生產(chǎn)者消息
message := newProducerMessage(chain.channel, payload)
//// 發(fā)送消息到Kafka集群請求排序
if _, _, err = chain.producer.SendMessage(message); err != nil {
...
}
}
處理通道配置交易消息
4.3 獲取配置交易消息與通道的最新配置序號
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
代碼位置:/orderer/common/msgprocessor/systemchannel.go/ProcessConfigUpdateMsg,大概做了以下事情:
- 獲取消息中的通道ID
- 檢查消息中的通道ID與當前通道ID是否一致,一致的話交由標準通道處理器處理
- 創(chuàng)建新應(yīng)用通道的通道配置實體Bundle結(jié)構(gòu)對象
- 構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型)饮焦,注意將該消息的通道配置序號更新為1
- 創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
- 創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
- 應(yīng)用系統(tǒng)通道的消息過濾器
- 返回新的通道配置交易消息與當前系統(tǒng)通道的配置序號
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
channelID, err := utils.ChannelID(envConfigUpdate) // 獲取消息中的通道ID
...
//檢查消息中的通道ID與當前通道ID是否一致
if channelID == s.support.ChainID() {
//// 交由標準通道處理器處理
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
}
...
// 創(chuàng)建新的應(yīng)用通道怕吴,其通道配置序號默認初始化為0
// 創(chuàng)建新應(yīng)用通道的通道配置實體Bundle結(jié)構(gòu)對象
bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
...
//構(gòu)造新的通道配置更新交易消息(ConfigEnvelope類型),注意將該消息的通道配置序號更新為1
newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
...
//創(chuàng)建內(nèi)層的通道配置交易消息(CONFIG類型)
newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
...
//創(chuàng)建外層的配置交易消息(ORDERER_TRANSACTION類型)
wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
...
// 應(yīng)用系統(tǒng)通道的消息過濾器
err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
...
//返回新的通道配置交易消息與當前系統(tǒng)通道的配置序號
return wrappedOrdererTransaction, s.support.Sequence(), nil
4.4 構(gòu)造新的配置交易消息發(fā)送到共識組件鏈對象請求排序
err = processor.Configure(config, configSeq)
這里我們依舊只是考慮kafka
共識組件县踢,processor.Configure()
方法實際上是調(diào)用chainImpl.configure()
方法转绷,同樣構(gòu)造Kafka
常規(guī)消息(KafkaMessageRegular
類型)。其中硼啤,Class
消息類別屬于KafkaMessageRegular_CONFIG
類型暇咆,包含了通道配置交易消息、 通道配置序號configSeq
與初始消息偏移量originalOffset(0)
丙曙。接著,調(diào)用chain.enqueue()
方法其骄,將其發(fā)送到Kafka
集群上指定主題(chainID
)和分區(qū)號(0)的分區(qū)上亏镰,同時,由Kafka
共識組件鏈對象分區(qū)消費者channelConsumer
獲取該消息拯爽,再交由給Kafka
共識組件鏈對象請求打包出塊索抓。
⑤:發(fā)送成功處理狀態(tài)響應(yīng)消息
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
整個流程圖如下:
參考
https://github.com/blockchainGuide/ (文章圖片代碼資料)
微信公眾號:區(qū)塊鏈技術(shù)棧