死磕hyperledger fabric源碼|kafka共識(shí)排序
文章及代碼:https://github.com/blockchainGuide/
分支:v1.1.0
概述
Orderer共識(shí)組件提供HandleChain()方法創(chuàng)建通道綁定的共識(shí)組件鏈對(duì)象(consensus.Chain
接口)剩拢,包括Solo
(solo.chain
類型)、Kafka
(kafka.chainImpl
類型)等類型,屬于通道共識(shí)組件的重要實(shí)現(xiàn)模塊啼县,并設(shè)置到鏈支持對(duì)象的cs.Chain
字段甘邀。共識(shí)組件鏈對(duì)象提供Orderer共識(shí)排序服務(wù)伟骨,負(fù)責(zé)關(guān)聯(lián)通道上交易排序仔夺、打包出塊、提交賬本南捂、通道管理等工作吴裤,目前采用Golang
通道或Kafka
集群作為共識(shí)排序后端,接收來(lái)自Broadcast
服務(wù)過(guò)濾轉(zhuǎn)發(fā)的交易消息并進(jìn)行排序溺健。
kafka共識(shí)排序服務(wù)
orderer服務(wù)集群
Orderer
節(jié)點(diǎn)采用Sarama
開(kāi)源的Kafka
第三方庫(kù)構(gòu)建Kafka
共識(shí)組件麦牺,可以同時(shí)接受處理多個(gè)客戶端發(fā)送的交易消息請(qǐng)求,能夠有效提高Orderer
節(jié)點(diǎn)處理交易消息的并發(fā)能力鞭缭。同時(shí)剖膳,可利用Kafka
集群在單一分區(qū)內(nèi)按序收集相同主題消息(消息序號(hào)唯一)的功能,來(lái)保證交易消息具有確定性的順序(以消息序號(hào)排序)岭辣,從而實(shí)現(xiàn)對(duì)交易排序達(dá)成全局共識(shí)的目的吱晒。
Kafka
生產(chǎn)者按照主題(Topic
)生產(chǎn)消息并進(jìn)行發(fā)布,Kafka
服務(wù)器集群自動(dòng)對(duì)消息主題進(jìn)行分類沦童。同一個(gè)主題的消息都會(huì)被收集到一個(gè)或多個(gè)分區(qū)文件中仑濒,按照FIFO
的順序追加到文件尾部,并且每個(gè)消息在分區(qū)中都會(huì)有一個(gè)OFFSET
位置偏移量作為該消息的唯一標(biāo)識(shí)ID偷遗。目前躏精,Hyperledger Fabric
基于Kafka
集群為每個(gè)通道創(chuàng)建綁定了一個(gè)主題(即鏈ID,chainID
)鹦肿,并且只設(shè)置一個(gè)分區(qū)(分區(qū)號(hào)為0)。Kafka消費(fèi)者管理多個(gè)分區(qū)消費(fèi)者并訂閱指定分區(qū)的主題消息辅柴,包括主題(即chainID
)箩溃、分區(qū)號(hào)(目前只有1個(gè)分區(qū)號(hào)為0的分區(qū))、起始偏移量(開(kāi)始訂閱的消息位置offset
)等碌嘀。
Hyperledger Fabric采用Kafka
集群對(duì)單個(gè)或多個(gè)Orderer
排序節(jié)點(diǎn)提交的交易消息進(jìn)行排序涣旨。此時(shí),Orderer
排序節(jié)點(diǎn)同時(shí)充當(dāng)Kafka
集群的消息生產(chǎn)者(分區(qū))和消費(fèi)者股冗,發(fā)布消息與訂閱消息到Kafka集群上的同一個(gè)主題分區(qū)霹陡,即先將Peer
節(jié)點(diǎn)提交的交易消息轉(zhuǎn)發(fā)給Kafka服務(wù)端,同時(shí)止状,從指定主題的Kafka
分區(qū)上按順序獲取排序后的交易消息并自動(dòng)過(guò)濾重啟的交易消息烹棉。這期間可能會(huì)存在網(wǎng)絡(luò)時(shí)延造成獲取消息時(shí)間的差異。如果不考慮丟包造成消息丟失的情況怯疤,則所有Orderer
節(jié)點(diǎn)獲取消息的順序與數(shù)量應(yīng)該是確定的和一致的浆洗。同時(shí),采用相同的Kafka共識(shí)組件鏈對(duì)象與出塊規(guī)則等集峦,以保證所有Orderer節(jié)點(diǎn)都可以創(chuàng)建與更新相同配置的通道伏社,并切割生成相同的批量交易集合出塊抠刺,再“同步”構(gòu)造出相同的區(qū)塊數(shù)據(jù),從而基于Kafka
集群達(dá)成全局共識(shí)摘昌,以保證區(qū)塊數(shù)據(jù)的全局一致性速妖。
啟動(dòng)共識(shí)組件鏈對(duì)象
啟動(dòng)入口:
orderer/consensus/kafka/chain.go/Start()
func (chain *chainImpl) Start() {
go startThread(chain)
}
func startThread(chain *chainImpl) {
...
//創(chuàng)建kafka生產(chǎn)者
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
// Kafka生產(chǎn)者發(fā)送CONNECT消息建立連接
if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil {
logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
}
...
//創(chuàng)建Kafka消費(fèi)者
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
...
//創(chuàng)建Kafka分區(qū)消費(fèi)者
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
...
close(chain.startChan) // 已經(jīng)啟動(dòng)共識(shí)組件鏈對(duì)象,不阻塞Broadcast
chain.errorChan = make(chan struct{}) // 創(chuàng)建errorChan通道聪黎,不阻塞Deliver服務(wù)處理句柄
...
chain.processMessagesToBlocks() //創(chuàng)建消息處理循環(huán)罕容,循環(huán)處理訂閱分區(qū)上接收到的消息
}
startThread
函數(shù)首先創(chuàng)建kafka
生產(chǎn)者,發(fā)布消息到指定主題(即通道ID)和分區(qū)號(hào)的通道分區(qū)(chain.channel)上挺举。
然后發(fā)送CONNECT
消息建立連接杀赢,該消息指定了主題Topic
字段為鏈ID、Key
字段為分區(qū)號(hào)0湘纵、Value
字段為CONNECT
類型消息負(fù)載等脂崔。訂閱該主題的Kafka
(分區(qū))消費(fèi)者會(huì)接收到該消息。
接著創(chuàng)建指定Kafka
分區(qū)和Broker
服務(wù)器配置的Kafka
消費(fèi)者對(duì)象梧喷,并設(shè)置從指定主題(鏈ID)和分區(qū)號(hào)(0)的Kafka
分區(qū)上獲取消息砌左。
最后,調(diào)用processMessagesToBlocks()
方法創(chuàng)建消息處理循環(huán)铺敌,負(fù)責(zé)處理從Kafka
集群中接收到的訂閱消息汇歹。
處理消息
processMessagesToBlocks
接收到正常的Kafka
分區(qū)消費(fèi)者消息會(huì)根據(jù)kafka
的消息類型進(jìn)行處理,包括以下幾種類型:
- Kafka- Message_Regular
- KafkaMessage_TimeToCut
- KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
...
for { // 消息處理循環(huán)
select {
...
case in, ok := <-chain.channelConsumer.Messages(): //接收到正常的Kafka分區(qū)消費(fèi)者消息
...
select {
case <-chain.errorChan: // If this channel was closed... // 如果該通道已經(jīng)關(guān)閉偿凭,則重新創(chuàng)建該通道
...
switch msg.Type.(type) { //分析Kafka消息類型
case *ab.KafkaMessage_Connect: //Kafka連接消息 由于錯(cuò)誤而重新恢復(fù)Kafka消費(fèi)者分區(qū)訂閱流程
_ = chain.processConnect(chain.ChainID()) //處理CONNECT連接消息产弹, 不做任何事情
counts[indexProcessConnectPass]++ // 成功處理消息計(jì)數(shù)增1
case *ab.KafkaMessage_TimeToCut: // Kafka定時(shí)切割生成區(qū)塊消息
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexProcessTimeToCutError]++
return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
}
counts[indexProcessTimeToCutPass]++ // 成功處理消息計(jì)數(shù)增1
case *ab.KafkaMessage_Regular: // Kafka常規(guī)消息
if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil { // 處理Kafka常 規(guī)消息
...
counts[indexProcessRegularError]++
}...
}
case <-chain.timer: // 超時(shí)定時(shí)器
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil { //發(fā)送TimeToCut類型消息,請(qǐng)求打包出塊
...
counts[indexSendTimeToCutError]++
} ...
}
}
}
①:KafkaMessage_Connect類型消息
Kafka
連接消息用于測(cè)試連通Kafka
分區(qū)消費(fèi)者的工作狀態(tài)弯囊,用于驗(yàn)證Kafka
共識(shí)組件的正常工作狀態(tài)與排除故障痰哨,并調(diào)用chain.processConnect(chain.ChainID())
方法處理該消息。
②:KafkaMessage_TimeToCut類型消息
processMessagesToBlocks
()方法可調(diào)用chain.processTimeToCut()
方法處理TIMETOCUT
類型消息匾嘱。如果消息中的區(qū)塊號(hào)ttcNumber
不是當(dāng)前Orderer
節(jié)點(diǎn)當(dāng)前通道賬本中下一個(gè)打包出塊的區(qū)塊號(hào)(最新區(qū)塊號(hào)lastCutBlockNumber
+1)斤斧,則直接丟棄不處理。否則霎烙,調(diào)用BlockCutter().Cut()
方法撬讽,切割當(dāng)前該通道上待處理的緩存交易消息列表為批量交易集合batch([]*cb.Envelope)
,再調(diào)用CreateNextBlock(batch)
方法構(gòu)造新區(qū)塊并提交賬本悬垃。最后游昼,調(diào)用WriteBlock(block,metadata)
方法尝蠕,更新區(qū)塊元數(shù)據(jù)并提交賬本酱床,同時(shí)更新Kafka共識(shí)組件鏈對(duì)象的最新區(qū)塊號(hào)lastCutBlockNumber
增1。
事實(shí)上趟佃,Orderer
服務(wù)集群節(jié)點(diǎn)獨(dú)立打包出塊的時(shí)間點(diǎn)通常不是完全同步的扇谣,同時(shí)還可能會(huì)重復(fù)接收其他Orderer節(jié)點(diǎn)提交的TIMETOCUT類型消息(重復(fù)區(qū)塊號(hào))昧捷。此時(shí),Orderer
節(jié)點(diǎn)以接收到的第一個(gè)TIMETOCUT
類型消息為準(zhǔn)罐寨,打包出塊并提交到賬本靡挥,再更新當(dāng)前通道的最新區(qū)塊號(hào)lastCutBlockNumber
。這樣鸯绿,processTimeToCut
()方法就能利用最新的lastCutBlockNumber
過(guò)濾掉其他重復(fù)的TIMETOCUT
類型消息跋破,以保證所有Orderer
節(jié)點(diǎn)上賬本區(qū)塊文件的數(shù)據(jù)同步,實(shí)際上是將原先的時(shí)間同步機(jī)制轉(zhuǎn)換為消息同步機(jī)制瓶蝴。
③:KafkaMessage_Regular類型消息
包括通道配置交易消息(KafkaMessageRegular_CONFIG類型)和普通交易消息(KafkaMessageRegular_NORMAL類型)毒返。 詳細(xì)的分析將會(huì)在processRegular
方法中體現(xiàn)。
處理配置交易消息
我們先大概的看一下ProcessRegular中關(guān)于處理配置交易消息的代碼部分,因?yàn)檫@部分相當(dāng)?shù)拈L(zhǎng)舷手,必須先看個(gè)概覽:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
...
commitConfigMsg := func(message *cb.Envelope, newOffset int64){...}
seq := chain.Sequence() // 獲取當(dāng)前通道的最新配置序號(hào)
...
switch regularMessage.Class {
case ab.KafkaMessageRegular_UNKNOWN: // 未知消息類型
...
case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
...
case ab.KafkaMessageRegular_CONFIG: // 通道配置交易消息
...
}
...
}
我們直接跳轉(zhuǎn)到case ab.KafkaMessageRegular_CONFIG
進(jìn)行分析:
①:如果regularMessage.OriginalOffset 不為 0
說(shuō)明這是重新過(guò)濾驗(yàn)證和排序的通道配置交易消息拧簸。
1.1 過(guò)濾重復(fù)提交的消息
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}
1.2 確認(rèn)是否是最近重新驗(yàn)證且重新排序的配置交易消息,并且通道配置序號(hào)是最新的
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &®ularMessage.ConfigSeq == seq {
// 因此男窟,關(guān)閉通道并解除Broadcast服務(wù)處理句柄阻塞等待盆赤,通知重新接收消息進(jìn)行處理
close(chain.doneReprocessingMsgInFlight)
}
1.3 主動(dòng)更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted
存在其他Orderer
節(jié)點(diǎn)重新提交了配置消息,但是本地Orderer
節(jié)點(diǎn)沒(méi)有重新提交該消息歉眷。因此這里需要更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted牺六。
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
}
②:regularMessage.OriginalOffset為 0
說(shuō)明是第一次提交通道配置交易消息,而不是重新驗(yàn)證和重新排序的汗捡。
2.1 如果消息中的配置序號(hào)regularMessage.ConfigSeq小于當(dāng)前通道的最新配置序號(hào)seq
則說(shuō)明已經(jīng)更新了通道配置(配置序號(hào)較高)淑际,然后再處理當(dāng)前配置交易消息(配置序號(hào)較低)。將會(huì)調(diào)用ProcessConfigMsg
重新過(guò)濾和處理該消息扇住。
接著通過(guò)configure
重新提交該配置消息進(jìn)行排序庸追,重置消息初始偏移量。然后再更新最近重新提交消息的偏移量台囱。
if regularMessage.ConfigSeq < seq {
...
configEnv, configSeq, err := chain.ProcessConfigMsg(env)
if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {...}
// 阻塞接收消息處理,更新最近重新提交消息的偏移量
chain.lastResubmittedConfigOffset = receivedOffset
//創(chuàng)建通道阻塞Broadcast服務(wù)接收處理消息
chain.doneReprocessingMsgInFlight = make(chan struct{})
}
③:提交配置交易消息執(zhí)行通道管理操作
經(jīng)過(guò)上面的①和②過(guò)濾掉不符合條件的情況读整,接下來(lái)就提交配置交易消息執(zhí)行通道管理操作簿训,核心函數(shù):commitConfigMsg(env, offset)
3.1 將當(dāng)前緩存交易消息切割成批量交易集合
batch := chain.BlockCutter().Cut()
3.2 創(chuàng)建新區(qū)塊block
block := chain.CreateNextBlock(batch)
3.3 構(gòu)造Kafka元數(shù)據(jù)
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ //構(gòu)造Kafka元數(shù)據(jù)
LastOffsetPersisted: receivedOffset - 1, // 偏移量減1
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
3.4 寫入?yún)^(qū)塊
通過(guò)區(qū)塊寫組件提交新區(qū)塊到賬本,更新當(dāng)前通道的最新區(qū)塊號(hào)chain.lastCutBlockNumber增1
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
接著更新本鏈的lastOriginal- OffsetProcessed為newOffset參數(shù)米间,然后做和上面差不多的事情:
chain.lastOriginalOffsetProcessed = newOffset
block := chain.CreateNextBlock([]*cb.Envelope{message}) // 構(gòu)造新區(qū)塊
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ // 構(gòu)造Kafka元數(shù)據(jù)
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteConfigBlock(block, metadata) // 寫入配置區(qū)塊
chain.lastCutBlockNumber++ // 最新區(qū)塊號(hào)增1
不管是上面的WriteBlock
還是WriteConfigBlock
底層都是調(diào)用的commitBlock
强品,如下:
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
... // 添加塊簽名
bw.addBlockSignature(bw.lastBlock)
// 添加最新的配置簽名
bw.addLastConfigSignature(bw.lastBlock)
// 寫入新塊
err := bw.support.Append(bw.lastBlock)
...
}
接下來(lái)再討論kafka共識(shí)組件如何處理普通交易消息的。
處理普通交易消息
還是先回到 processRegular
方法屈糊,關(guān)于處理普通消息的方法大概如下:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
...
case ab.KafkaMessageRegular_NORMAL: // 普通交易消息類型
// 如果OriginalOffset不是0的榛,則說(shuō)明該消息是重新驗(yàn)證且重新提交排序的
if regularMessage.OriginalOffset != 0 {
...
// 如果消息偏移量不大于lastOriginalOffsetProcessed最近已處理消息的偏移量,
// 則說(shuō)明已經(jīng)處理過(guò)該消息逻锐,此時(shí)應(yīng)丟棄返回夫晌,防止重復(fù)處理其他Orderer提交的相同偏移 量的普通交易消息
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
...
}
// // 檢查通道的配置序號(hào)是否更新
if regularMessage.ConfigSeq < seq {
...
//// 消息的配置序號(hào)低雕薪,需要重新驗(yàn)證過(guò)濾消息
configSeq, err := chain.ProcessNormalMsg(env)
...
//重新提交普通交易消息
if err := chain.order(env, configSeq, receivedOffset); err != nil {}
...
}
// advance lastOriginalOffsetProcessed iff message is re-validated and re-ordered
//當(dāng)且僅當(dāng)消息重新驗(yàn)證和重新排序時(shí),才需要修正lastOriginalOffsetProcessed偏移量
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
// 提交處理普通交易消息晓淀,offset為最近處理的普通交易消息偏移量
commitNormalMsg(env, offset)
}
處理普通交易消息的流程與處理配置交易消息的流程基本類似所袁,主要看最后的commitNormalMsg(env, offset)
,我們來(lái)繼續(xù)分析:
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
//// 添加所接收的消息到緩存交易消息列表凶掰,并切割成批量交易集合列表batches
batches, pending := chain.BlockCutter().Ordered(message)
...
if len(batches) == 0 {
// 如果不存在批量交易集合燥爷,則啟動(dòng)定時(shí)器周期性地發(fā)送切割出塊消息n
chain.lastOriginalOffsetProcessed = newOffset
if chain.timer == nil {
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
...
return
}
chain.timer = nil
offset := receivedOffset // 設(shè)置當(dāng)前消息偏移量
if pending || len(batches) == 2 {
offset-- // 計(jì)算第1個(gè)批量交易消息的偏移量是offset減1
} else { // 只有1個(gè)批量交易集合構(gòu)成1個(gè)區(qū)塊
//// 設(shè)置第1個(gè)批量交易集合的消息偏移量為newOffset
chain.lastOriginalOffsetProcessed = newOffset
}
//// 構(gòu)造并提交第1個(gè)區(qū)塊
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata) // 更新區(qū)塊元數(shù)據(jù),并提交區(qū)塊到賬本
chain.lastCutBlockNumber++ // 更新當(dāng)前通道上最近出塊的區(qū)塊號(hào)增1
...
// Commit the second block if exists
//// 檢查第2個(gè)批量交易集合懦窘,構(gòu)造并提交第2個(gè)區(qū)塊
if len(batches) == 2 {
chain.lastOriginalOffsetProcessed = newOffset
offset++ // 設(shè)置第2個(gè)批量交易集合的消息偏移量offset加1
block := chain.CreateNextBlock(batches[1])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: newOffset,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
...
}
}
首先將新的普通交易消息添加到當(dāng)前的緩存交易列表前翎,并切割成批量交易集合列表batches ,但最多只能包含2個(gè)批量交易集合,并且第2個(gè)批量交易集合最多包含1個(gè)交易畅涂。最終也是調(diào)用的WriteBlock
寫入到賬本港华。
到此為止整個(gè)processRegular
()方法處理消息結(jié)束舰褪。
總結(jié)及參考
kafka共識(shí)排序的邏輯其實(shí)是比較簡(jiǎn)單的般婆,大概的流程如下 :
https://github.com/blockchainGuide/ (文章圖片代碼資料在里面)
微信公眾號(hào):區(qū)塊鏈技術(shù)棧