死磕hyperledger fabric源碼|Deliver區(qū)塊分發(fā)
文章及代碼:https://github.com/blockchainGuide/
分支:v1.1.0
概述
Orderer
排序服務(wù)器提供了區(qū)塊分發(fā)服務(wù)接口当凡,接收客戶端提交的區(qū)塊請求消息(Envelope
類型缀磕,通道頭部類型是DELIVER_SEEK_INFO
逢享、CONFIG_UPDATE
等)鞭盟,根據(jù)該消息封裝的區(qū)塊搜索信息對象(SeekInfo
類型)贪薪,包括查找最舊區(qū)塊SeekOldest類型叠赐、查找最新區(qū)塊SeekNewest
類型慧妄、查找指定位置區(qū)塊SeekSpecified
類型等对妄,構(gòu)造對應(yīng)請求范圍的范圍查詢結(jié)果迭代器河绽,讀取Orderer
節(jié)點指定通道賬本上的區(qū)塊數(shù)據(jù)己单,同時,建立消息處理循環(huán)耙饰,基于該結(jié)果迭代器依次讀取請求的區(qū)塊數(shù)據(jù)結(jié)果纹笼,發(fā)送給組織的Leader主節(jié)點等請求節(jié)點。
Orderer
節(jié)點啟動時在本地gRPC
服務(wù)器上注冊了Orderer
排序服務(wù)器苟跪,并創(chuàng)建了Deliver服務(wù)處理句柄廷痘。當客戶端發(fā)起Deliver
服務(wù)請求時,Orderer
排序服務(wù)器就調(diào)用Deliver()
方法處理消息請求件已。
Diliver消息服務(wù)處理
入口在orderer/common/server/server.go/Deliver()
方法中:
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
...
policyChecker := func(env *cb.Envelope, channelID string) error { // 定義策略檢查器
chain, ok := s.GetChain(channelID) // 獲取指定通道的鏈支持對象
if !ok {
return errors.Errorf("channel %s not found", channelID)
}
// 創(chuàng)建消息過濾器
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain)
return sf.Apply(env) // 過濾消息
}
server := &deliverMsgTracer{
DeliverSupport: &deliverHandlerSupport{AtomicBroadcast_DeliverServer: srv},
msgTracer: msgTracer{
debug: s.debug,
function: "Deliver",
},
}
// Deliver服務(wù)消息處理
return s.dh.Handle(deliver.NewDeliverServer(server, policyChecker, s.sendProducer(srv)))
}
大概做了以下幾件事:
- 定義策略檢查器:用于檢查接收的區(qū)塊請求消息必須滿足指定通道上的訪問控制權(quán)限策略的要求
- 獲取指定通道的鏈支持對象
- 創(chuàng)建消息過濾器笋额,過濾消息
- Deliver服務(wù)消息處理區(qū)塊請求
我們來看是如何處理的,進入到s.dh.Handle
:
/common/deliver/deliver.go/Handle
func (ds *deliverHandler) Handle(srv *DeliverServer) error {
...
// 等待消息請求并進行處理
for {
...
envelope, err := srv.Recv() // 等待接收客戶端發(fā)送的區(qū)塊消息請求
...
// 從Orderer節(jié)點本地指定通道的區(qū)塊賬本中獲取指定區(qū)塊篷扩,并向客戶端發(fā)送請求
if err := ds.deliverBlocks(srv, envelope); err != nil {
return err
}
...
}
}
不言而喻兄猩,直接進入到deliverBlocks
,這部分的內(nèi)容是最核心的,逐步分析如下:
①:解析PayLoad,檢查header和ChannelHeader的合法性
payload, err := utils.UnmarshalPayload(envelope.Payload) // 解析消息負載
...
if payload.Header == nil {}
// 解析通道頭部
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
err = ds.validateChannelHeader(srv, chdr) // 驗證通道頭部合法性
②:從chains字典中獲取指定通道(chainID)的鏈支持對象chain鉴未,并檢查該對象是否存在錯誤信息
chain, ok := ds.sm.GetChain(chdr.ChannelId) // 獲取指定通道的鏈支持對象
③:創(chuàng)建訪問控制對象,并檢查消息簽名是否符合指定的通道讀權(quán)限策略**
accessControl, err := newSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
...
err := accessControl.evaluate()
④:解析區(qū)塊搜索信息SeekInfo結(jié)構(gòu)對象
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {}
⑤:檢查起始位置與結(jié)束位置的合法性
if seekInfo.Start == nil || seekInfo.Stop == nil {}
⑥:創(chuàng)建區(qū)塊賬本迭代器并獲取起始區(qū)塊號枢冤,同時設(shè)置起始位置
cursor, number := chain.Reader().Iterator(seekInfo.Start)
Iterator
根據(jù)startPosition.Type
起始位置對象的類型計算起始區(qū)塊號startingBlockNumbe
,類型如下:
SeekPosition_Oldest:搜索最舊的區(qū)塊,將起始區(qū)塊號
startingBlockNumber
設(shè)置為 0歼狼;SeekPosition_Newest:搜索最新的區(qū)塊掏导,將起始區(qū)塊號
startingBlockNumber
設(shè)置為當前通道賬本的最新區(qū)塊號info.Height-1
,即賬本高度減1羽峰;SeekPosition_Specified:搜索指定位置的區(qū)塊,將起始區(qū)塊號
startingBlockNumber
設(shè)置為指定起始位置的區(qū)塊號start.Specified.Number
添瓷。
Iterator
方法的大致功能如下: common/ledger/blockledger/file/impl.go/Iterator
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) { // 分析起始位置類型
case *ab.SeekPosition_Oldest: // 搜索最舊區(qū)塊梅屉,區(qū)塊號為0
startingBlockNumber = 0
case *ab.SeekPosition_Newest: // 搜索最新區(qū)塊
info, err := fl.blockStore.GetBlockchainInfo() // 獲取區(qū)塊鏈信息
if err != nil {
logger.Panic(err)
}
newestBlockNumber := info.Height - 1 // 最新區(qū)塊號
startingBlockNumber = newestBlockNumber
case *ab.SeekPosition_Specified: // 搜索指定位置區(qū)塊
startingBlockNumber = start.Specified.Number
height := fl.Height()
if startingBlockNumber > height { // 若超過高度,則報錯
return &blockledger.NotFoundErrorIterator{}, 0
}
default:
return &blockledger.NotFoundErrorIterator{}, 0
}
// 構(gòu)造區(qū)塊迭代器
iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)
if err != nil {
return &blockledger.NotFoundErrorIterator{}, 0
}
// 構(gòu)造賬本區(qū)塊迭代器
return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}
⑦:循環(huán)讀取區(qū)塊數(shù)據(jù)鳞贷,從本地區(qū)塊賬本中獲取指定區(qū)塊號范圍內(nèi)的區(qū)塊數(shù)據(jù)坯汤,并依次順序發(fā)送給請求客戶端
7.1 未找到數(shù)據(jù)返回
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
7.2 獲取下一個數(shù)據(jù)
block, status := nextBlock(cursor, erroredChan) // 從本地賬本獲取下一個區(qū)塊
if status != cb.Status_SUCCESS {...}
7.3 再次檢查是否滿足訪問控制策略要求
if err := accessControl.evaluate(); err != nil {}
7.4 發(fā)送區(qū)塊數(shù)據(jù)
if err := sendBlockReply(srv, block); err != nil { }
7.5 循環(huán)結(jié)束,發(fā)送成功狀態(tài)
if err := sendStatusReply(srv, cb.Status_SUCCESS);
Deliver服務(wù)客戶端
以Leader
主節(jié)點為例搀愧,分析Deliver
服務(wù)客戶端從Orderer
節(jié)點請求獲取區(qū)塊的流程惰聂。
初始化Deliver服務(wù)實例
入口:gossip/service/gossip_service.go/InitializeChannel
func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string, support Support) {
...
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
if g.deliveryService[chainID] == nil { // 檢查是否已經(jīng)存在Deliver服務(wù)實例
var err error
g.deliveryService[chainID], err = g.deliveryFactory.Service(g, endpoints, g.mcs) // 檢查是否已經(jīng)存在Deliver服務(wù)實例
...
// peer.gossip.useLeaderElection與peer.gossip.orgLeader是互斥的兩個配置參數(shù)疆偿,
// 如果將兩個都設(shè)置為true且沒有被定義,則會引起Peer節(jié)點錯誤
// 啟用Leader主節(jié)點動態(tài)選舉機制
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
// 靜態(tài)設(shè)置為組織Leader主節(jié)點
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")
...
if leaderElection { // 啟用了動態(tài)Leader主節(jié)點選舉機制
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
} else if isStaticOrgLeader {
// 若靜態(tài)指定了Leader主節(jié)點搓幌,則連接 Orderer節(jié)點請求區(qū)塊數(shù)據(jù)
// 啟動指定通道上的Deliver服務(wù)實例請求獲取區(qū)塊數(shù)據(jù)
g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func() {})
} ....
}
首先檢查是否已經(jīng)存在Deliver
實例杆故,然后根據(jù)Leader
主節(jié)點動態(tài)選舉機制還是靜態(tài)指定了Leader
主節(jié)點分別進入不同的分支,如果是靜態(tài)指定了Leader
主節(jié)點溉愁,則連接 Orderer
節(jié)點請求區(qū)塊數(shù)據(jù),啟動指定通道上的Deliver
服務(wù)實例請求獲取區(qū)塊數(shù)據(jù)处铛。接下來關(guān)注啟動Deliver
服務(wù)實例。
啟動Deliver服務(wù)實例
主要做了以下事:
①:獲取綁定指定通道的區(qū)塊提供者
if _, exist := d.blockProviders[chainID];
②:不存在區(qū)塊提供者
client := d.newClient(chainID, ledgerInfo)
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
requester := &blocksRequester{ //定義區(qū)塊請求者blocksRequester結(jié)構(gòu)對象
tls: comm.TLSEnabled(),
chainID: chainID,
}
//定義broadcastSetup()方法
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider) // 請求區(qū)塊數(shù)據(jù)
}
...
//創(chuàng)建connProducer對象
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
//// 創(chuàng)建broadcastClient客戶端
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient // 設(shè)置到區(qū)塊請求者對象的客戶端
return bClient
}
2.1 創(chuàng)建Deliver服務(wù)實例上的 broadcastClient客戶端
client := d.newClient(chainID, ledgerInfo)
2.2 創(chuàng)建指定通道關(guān)聯(lián)的區(qū)塊提供者
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
2.3 啟動goroutine開始從Orderer節(jié)點請求獲取區(qū)塊拐揭,并發(fā)送到組織內(nèi)其他Peer節(jié)點
go func() {
d.blockProviders[chainID].DeliverBlocks() // 請求獲取區(qū)塊數(shù)據(jù)
finalizer()
}()
接下來就是調(diào)用區(qū)塊提供者對象的DeliverBlocks()
方法撤蟆,向Orderer
節(jié)點發(fā)送消息請求的區(qū)塊數(shù)據(jù)。
請求獲取區(qū)塊數(shù)據(jù)
入口在:core/deliverservice/blocksprovider/blocksprovider.go/DeliverBlocks()
,具體分析如下:
①:接收消息
msg, err := b.client.Recv()
② :根據(jù)消息類型進行處理
大致有以下幾種消息類型:
- DeliverResponse_Status:用于描述
Deliver
服務(wù)請求執(zhí)行狀態(tài)堂污。 - DeliverResponse_Block:包含請求獲取的區(qū)塊數(shù)據(jù)家肯。
2.1 DeliverResponse_Status分支
如果DeliverBlocks()方法接收到Status_SUCCESS狀態(tài),則說明本次區(qū)塊請求處理成功盟猖,表示已經(jīng)接收完畢區(qū)塊請求消息指定范圍內(nèi)的區(qū)塊數(shù)據(jù)讨衣。除此以外的其他狀態(tài)消息都是非成功的執(zhí)行狀態(tài)消息,包括Status_BAD_REQUEST扒披、Status_FORBIDDEN等
if t.Status == common.Status_SUCCESS {}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
2.2 DeliverResponse_Block分支
2.2.1 獲取區(qū)塊號
seqNum := t.Block.Header.Number
2.2.2獲取經(jīng)過序列化的區(qū)塊字節(jié)數(shù)組
marshaledBlock, err := proto.Marshal(t.Block)
2.2.3驗證區(qū)塊
err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock);
2.2.4獲取通道Peer節(jié)點數(shù)量
numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
2.2.5創(chuàng)建消息負載和Gossip消息
payload := createPayload(seqNum, marshaledBlock)
gossipMsg := createGossipMsg(b.chainID, payload)
2.2.6添加消息負載到本地消息負載緩沖區(qū)值依,等待提交賬本
err := b.gossip.AddPayload(b.chainID, payload)
2.2.7通過Gossip消息協(xié)議發(fā)送區(qū)塊消息到組織內(nèi)的其他節(jié)點
基于Gossip
消息協(xié)議將DataMsg
類型數(shù)據(jù)消息(只含有區(qū)塊數(shù)據(jù))分發(fā)到組織內(nèi)的其他Peer
節(jié)點上,并保存到該節(jié)點的消息存儲器上碟案。
b.gossip.Gossip(gossipMsg)
參考
https://github.com/blockchainGuide/ (文章圖片代碼資料)
微信公眾號:區(qū)塊鏈技術(shù)棧