Order組件即Fabric區(qū)塊鏈的共識服務(wù)互订,負(fù)責(zé)對不同Client發(fā)送的交易進(jìn)行排序流妻,Order組件的現(xiàn)版本中有Solo模式和Kafka兩種實(shí)現(xiàn)(Solo模式太簡單僅適用于開發(fā)模式,本文后面所述均針對基于Kafka的實(shí)現(xiàn))革砸,本文將結(jié)合源碼分析闡述Order組件的架構(gòu)及其運(yùn)行原理除秀。
1. 整體架構(gòu)分析
首先我們來了解一下官方的架構(gòu)設(shè)計(jì)圖,如下圖該架構(gòu)使用Kafka集群提供排序服務(wù)及其容錯性算利。這種排序服務(wù)包括多個(gè)Order節(jié)點(diǎn)(OSN)以及Kafka集群組成册踩,排序服務(wù)Client可以連接多個(gè)OSN但是OSN節(jié)點(diǎn)互相不通訊。
排序服務(wù)的基本工作原理是這樣的:
- 排序服務(wù)Client向OSN發(fā)送交易效拭;
- OSN節(jié)點(diǎn)對交易進(jìn)行相關(guān)檢查暂吉,符合條件之后會將交易發(fā)送給Kafka集群胖秒;
- OSN節(jié)點(diǎn)從Kafka集群拉取交易消息并對交易消息進(jìn)行打包將打包之后的交易batch寫入本地?cái)?shù)據(jù)庫;
- OSN節(jié)點(diǎn)按客戶端Deliver請求從本地?cái)?shù)據(jù)庫讀取區(qū)塊返回慕的;
這種設(shè)計(jì)主要利用了Kafka的兩個(gè)特性(如下圖所示)阎肝,1. 發(fā)送到Kafka的消息會按序存儲并且保證消費(fèi)者能夠按序消費(fèi);2. Kafka允許對消息進(jìn)行分類按照消息的Topic進(jìn)行分區(qū)肮街,分區(qū)內(nèi)部消息依然有序风题;
其中特性1幫助Fabric實(shí)現(xiàn)了多節(jié)點(diǎn)交易的順序一致性,特性2幫助Fabric實(shí)現(xiàn)了多通道架構(gòu)(Kafka的消費(fèi)者可以選擇訂閱其感興趣的Topic);
Fabric的這種Order的設(shè)計(jì)個(gè)人感覺好處在于極大的發(fā)揮了Kafka的高可擴(kuò)展嫉父、高可用以及順序一致性沛硅。然而劣勢也比較明顯,首先Kafka以及OSN的節(jié)點(diǎn)對Fabric網(wǎng)絡(luò)來說是一個(gè)中心化存在違背了去中心化的區(qū)塊鏈宗旨绕辖,其次Kafka以及OSN中保存所有的交易信息摇肌,對隱私保護(hù)不是很好;最后Kafka的一致性協(xié)議不能容忍拜占庭錯誤在安全性上和類PBFT算法相比較弱仪际;最后的最后就是這種架構(gòu)極大增加了新手入門以及運(yùn)營維護(hù)的成本围小。
2. 關(guān)鍵接口分析
OSN為一個(gè)獨(dú)立的組件也是通過gRPC的方式對外提供服務(wù),其主要有兩個(gè)服務(wù)接口, 其中Broadcast接口用于接收Client的排序請求树碱,Deliver是在排序之后給Client發(fā)送的流式Blocks.
service AtomicBroadcast {
rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
}
如下圖所示是實(shí)現(xiàn)這兩個(gè)接口的相關(guān)類圖吩抓,其中Server是OSN的啟動入口,其實(shí)現(xiàn)了Order提供的兩個(gè)服務(wù)Broadcast和Deliver赴恨。Broadcast和Deliver又是通過broadcast.go和deliver.go中的Handle方法具體實(shí)現(xiàn)疹娶。
Broadcast的具體實(shí)現(xiàn)如下,broadcast.go的Handle方法中開啟一個(gè)循環(huán)進(jìn)行如下2~4的操作:
- 抽取遠(yuǎn)程地址信息伦连,
addr := util.ExtractRemoteAddress(srv.Context())
- 接收消息雨饺,
msg, err := srv.Recv()
- 獲取BroadcastSupport,
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
- 按照當(dāng)前配置檢查該msg是否合法
configSeq, err := processor.ProcessNormalMsg(msg)
- 對交易進(jìn)行排序,Kafka的實(shí)現(xiàn)是將交易盡心序列化并發(fā)送到Kafka集群
marshaledEnv, err := utils.Marshal(env)
if err != nil {
logger.Errorf("[channel: %s] cannot enqueue, unable to marshal envelope = %s", chain.support.ChainID(), err)
return false
}
// We're good to go
payload := utils.MarshalOrPanic(newRegularMessage(marshaledEnv))
message := newProducerMessage(chain.channel, payload)
if _, _, err := chain.producer.SendMessage(message); err != nil {...}
Deliver的實(shí)現(xiàn)同樣也是在Handle方法中開啟一個(gè)服務(wù)惑淳,如下讀取來自客戶端的Deliver請求并執(zhí)行deliverBlocks操作额港。
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
if err := ds.deliverBlocks(srv, envelope); err != nil {
return err
}
logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
deliverBlocks內(nèi)部實(shí)現(xiàn)邏輯如下:
- 解析客戶端地址并反序列化消息內(nèi)容并提取頭部信息:
addr := util.ExtractRemoteAddress(srv.Context())
payload, err := utils.UnmarshalPayload(envelope.Payload)
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
- 獲取對應(yīng)channel的服務(wù)組件
chain, ok := ds.sm.GetChain(chdr.ChannelId)
- 讀取所需的block的索引信息
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
cursor, number := chain.Reader().Iterator(seekInfo.Start)
- 循環(huán)讀取DB中的block并返回給客戶端。
至此Broadcast和Deliver的接口大致流程分析完畢歧焦。