節(jié)點啟動時會啟動以下幾種服務
1歉秫、net service p2p服務模塊
2触徐、core engine 區(qū)塊處理模塊
3、consensus module 共識服務模塊
4券册、tx pool 交易服務模塊
5蚣常、sync service 異步服務模塊
代碼在module/blockchain/blockchain_start.go:11
交易分類:配置交易和普通交易
根據流程圖閱讀源碼
1. 交易生成分為客戶端發(fā)送市咽,其他節(jié)點同步過來
用戶/APP發(fā)送交易:用戶通過sdk創(chuàng)建交易并簽名后,通過RPC方式調用遠程服務器invoke
方法module/rpcserver/api_service.go:149
核心代碼如下
// invoke contract according to TxType
func (s *ApiService) invoke(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
switch tx.Payload.TxType {
//查詢交易
case commonPb.TxType_QUERY_CONTRACT:
return s.dealQuery(tx, source)
//invoke交易
case commonPb.TxType_INVOKE_CONTRACT:
return s.dealTransact(tx, source)
}
}
// dealTransact - deal transact tx
func (s *ApiService) dealTransact(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
...
err = s.chainMakerServer.AddTx(tx.Payload.ChainId, tx, source)
...
}
其中chainMakerServer
對象是管理整個區(qū)塊鏈項目的最頂層的類module/blockchain/chainmaker_server.go
代碼如下
// ChainMakerServer manage all blockchains
type ChainMakerServer struct {
// net shared by all chains 管理網絡 libp2p 或者是Liquid
net protocol.Net
// blockchains known by this node 管理多個鏈存儲
blockchains sync.Map // map[string]*Blockchain
}
2. 將新交易加入到交易池txPool
位于chainmaker.org/chainmaker/txpool-single/v2@v2.1.0/tx_pool_impl.go:175
核心代碼如下
func (pool *txPoolImpl) AddTx(tx *commonPb.Transaction, source protocol.TxSource) error {
...
// 存儲交易
memTx := &mempoolTxs{isConfigTxs: false, txs: []*commonPb.Transaction{tx}, source: source}
if utils.IsConfigTx(tx) || utils.IsManageContractAsConfigTx(tx,
pool.chainConf.ChainConfig().Contract.EnableSqlSupport) {
memTx.isConfigTxs = true
}
t := time.NewTimer(time.Second)
defer t.Stop()
select {
// 交易存儲
case pool.addTxsCh <- memTx:
case <-t.C:
pool.log.Warnf("add transaction timeout")
return fmt.Errorf("add transaction timeout")
}
// 節(jié)點廣播交易
if source == protocol.RPC {
pool.broadcastTx(tx.Payload.TxId, txMsg)
}
return nil
}
3. 節(jié)點廣播交易pool.broadcastTx(tx.Payload.TxId, txMsg)
這也就是第二種方式對應交易流程圖中的1',節(jié)點收到廣播的交易后加入交易池抵蚊。
4. 當節(jié)點啟動時施绎,交易池服務txPool
會創(chuàng)建一個定時任務,來監(jiān)聽新交易
func (pool *txPoolImpl) listen() {
// 定時觸發(fā)
flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second)
defer flushTicker.Stop()
for {
select {
// 當收到新交易時進行觸發(fā)
case poolTxs := <-pool.addTxsCh:
pool.flushOrAddTxsToCache(poolTxs)
case <-flushTicker.C:
// 從緩存池中撈數據
if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 {
pool.flushCommonTxToQueue(nil)
}
case <-pool.stopCh:
return
}
}
}
當收到新交易時贞绳,會判斷交易是否是配置交易谷醉,如果是配置交易直接放入到配置交易隊列中,否則放入普通交易隊列 核心代碼如下
func (pool *txPoolImpl) flushOrAddTxsToCache(memTxs *mempoolTxs) {
...
// 如果是配置交易冈闭,則加入配置消息隊列
if memTxs.isConfigTxs {
pool.flushConfigTxToQueue(memTxs)
return
}
// 普通交易孤紧,如果到的交易緩存臨界值 就加入消息隊列中
if pool.cache.isFlushByTxCount(memTxs) {
pool.flushCommonTxToQueue(memTxs)
} else {
pool.cache.addMemoryTxs(memTxs)
}
}
加入隊列后,會向msgbus
發(fā)送一個交易池信號拒秘,通知其他模塊
func (pool *txPoolImpl) flushCommonTxToQueue(memTxs *mempoolTxs) {
defer func() {
// 向msgbus 發(fā)出塊信號
pool.updateAndPublishSignal()
pool.cache.reset()
}()
// 加入消息隊列
// RPC:來自RPC的交易不驗證基礎的交易信息(如交易ID、時間戳是否符合規(guī)范)臭猜、不驗證交易者的證書躺酒;因為RPC模塊已做此類校驗;成功添加至交易池的交易會廣播給其它連接的節(jié)點
// P2P:其它節(jié)點的廣播的交易蔑歌,進行所有的校驗
// INTERNAL:如果節(jié)點在同一高度接收到多個驗證有效的區(qū)塊羹应,當其中某個區(qū)塊上鏈后,其余的相同高度區(qū)塊內的交易會被重新添加進交易池次屠,防止這些交易被拋棄园匹。
rpcTxs, p2pTxs, internalTxs := pool.cache.mergeAndSplitTxsBySource(memTxs)
pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: rpcTxs, source: protocol.RPC})
pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: p2pTxs, source: protocol.P2P})
pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: internalTxs, source: protocol.INTERNAL})
}
5. 根據交易池是否已滿,觸發(fā)新區(qū)塊生成
在CoreEngine
位于module/core/syncmode/core_syncmode_impl.go
里監(jiān)聽消息總線發(fā)出的消息劫灶,代碼如下
// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
// 1. receive proposal status from consensus
// 2. receive verify block from consensus
// 3. receive commit block message from consensus
// 4. receive propose signal from txpool
// 5. receive build proposal signal from chained bft consensus
switch message.Topic {
...
// 收到交易池信號
case msgbus.TxPoolSignal:
if signal, ok := message.Payload.(*txpoolpb.TxPoolSignal); ok {
c.blockProposer.OnReceiveTxPoolSignal(signal)
}
...
}
}
其中blockProposer
位于module/core/syncmode/proposer/block_proposer_impl.go
發(fā)出生成提議候選區(qū)塊裸违,核心代碼如下
// OnReceiveTxPoolSignal, receive txpool signal and deliver to chan txpool signal 收到交易信號
func (bp *BlockProposerImpl) OnReceiveTxPoolSignal(txPoolSignal *txpoolpb.TxPoolSignal) {
bp.txPoolSignalC <- txPoolSignal
}
// Start, start proposing loop
func (bp *BlockProposerImpl) startProposingLoop() {
for {
select {
// 普通交易區(qū)塊則由定時器來生成區(qū)塊,由配置文件決定
case <-bp.proposeTimer.C:
if !bp.isSelfProposer() {
break
}
// 生成候選區(qū)塊
go bp.proposeBlock()
case signal := <-bp.txPoolSignalC:
if !bp.isSelfProposer() {
break
}
// 如果接受到的是修改配置的交易,則立馬生成候選區(qū)塊
if signal.SignalType != txpoolpb.SignalType_BLOCK_PROPOSE {
break
}
// 生成候選區(qū)塊
go bp.proposeBlock()
case <-bp.exitC:
bp.proposeTimer.Stop()
bp.log.Info("block proposer loop stoped")
return
}
}
}
// proposeBlock, to check if proposer can propose block right now
// if so, start proposing
func (bp *BlockProposerImpl) proposeBlock() {
...
//開始創(chuàng)建提案區(qū)塊
go bp.proposing(proposingHeight, lastBlock.Header.BlockHash)
...
//等提案區(qū)塊完成
<-bp.finishProposeC
}
// proposing, propose a block in new height
func (bp *BlockProposerImpl) proposing(height uint64, preHash []byte) *commonpb.Block {
startTick := utils.CurrentTimeMillisSeconds()
defer bp.yieldProposing()
//從提案緩存里獲取 當前高度的提案
selfProposedBlock := bp.proposalCache.GetSelfProposedBlockAt(height)
if selfProposedBlock != nil {
if bytes.Equal(selfProposedBlock.Header.PreBlockHash, preHash) {
// Repeat propose block if node has proposed before at the same height
bp.proposalCache.SetProposedAt(height)
_, txsRwSet, _ := bp.proposalCache.GetProposedBlock(selfProposedBlock)
bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: selfProposedBlock, TxsRwSet: txsRwSet})
bp.log.Infof("proposer success repeat [%d](txs:%d,hash:%x)",
selfProposedBlock.Header.BlockHeight, selfProposedBlock.Header.TxCount, selfProposedBlock.Header.BlockHash)
return nil
}
bp.proposalCache.ClearTheBlock(selfProposedBlock)
// Note: It is not possible to re-add the transactions in the deleted block to txpool; because some transactions may
// be included in other blocks to be confirmed, and it is impossible to quickly exclude these pending transactions
// that have been entered into the block. Comprehensive considerations, directly discard this block is the optimal
// choice. This processing method may only cause partial transaction loss at the current node, but it can be solved
// by rebroadcasting on the client side.
bp.txPool.RetryAndRemoveTxs(nil, selfProposedBlock.Txs)
}
// retrieve tx batch from tx pool
startFetchTick := utils.CurrentTimeMillisSeconds()
//從交易池里 撈出一批交易
fetchBatch := bp.txPool.FetchTxBatch(height)
// 檢查重復交易
checkedBatch := bp.txDuplicateCheck(fetchBatch)
txCapacity := int(bp.chainConf.ChainConfig().Block.BlockTxCapacity)
// 根據配置獲取每塊最大的交易數 默認100 超過的就放到交易池里
if len(checkedBatch) > txCapacity {
// check if checkedBatch > txCapacity, if so, strict block tx count according to config,
// and put other txs back to txpool.
txRetry := checkedBatch[txCapacity:]
checkedBatch = checkedBatch[:txCapacity]
bp.txPool.RetryAndRemoveTxs(txRetry, nil)
}
// 生成區(qū)塊 生成讀寫集 生成提案區(qū)塊并放入提案區(qū)塊緩存proposalCache中
block, timeLasts, err := bp.generateNewBlock(height, preHash, checkedBatch)
...
_, rwSetMap, _ := bp.proposalCache.GetProposedBlock(block)
newBlock := new(commonpb.Block)
if common.IfOpenConsensusMessageTurbo(bp.chainConf) {
...
} else {
newBlock = block
}
...
//提交生成的提案區(qū)塊
bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: newBlock, TxsRwSet: rwSetMap})
...
return block
}
關于如何生成新區(qū)塊請看http://www.reibang.com/p/fb6a9644246d
由于本運行環(huán)境采用的是TBFT共識本昏,所以由TBFT共識模塊接收msgbus.ProposedBlock
消息供汛,
6. 開始共識tbft
接收總線消息module/consensus/tbft/consensus_tbft_impl.go
代碼如下
//接收MsgBus總線消息
func (consensus *ConsensusTBFTImpl) OnMessage(message *msgbus.Message) {
switch message.Topic {
// 獲取提案區(qū)塊 放入通道
case msgbus.ProposedBlock:
consensus.proposedBlockC <- proposedBlock
case msgbus.VerifyResult:
consensus.verifyResultC <- verifyResult
case msgbus.RecvConsensusMsg:
consensus.externalMsgC <- tbftMsg
case msgbus.BlockInfo:
consensus.blockHeightC <- blockInfo.Block.Header.BlockHeight
}
}
//通道處理
func (consensus *ConsensusTBFTImpl) handle() {
loop := true
for loop {
select {
case proposedBlock := <-consensus.proposedBlockC:
//處理提案區(qū)塊
consensus.handleProposedBlock(proposedBlock, false)
case result := <-consensus.verifyResultC:
consensus.handleVerifyResult(result, false)
case height := <-consensus.blockHeightC:
consensus.handleBlockHeight(height)
case msg := <-consensus.externalMsgC:
consensus.handleConsensusMsg(msg)
// 共識內部消息
case msg := <-consensus.internalMsgC:
consensus.handleConsensusMsg(msg)
case ti := <-consensus.timeScheduler.GetTimeoutC():
consensus.handleTimeout(ti, false)
case <-consensus.closeC:
loop = false
}
}
}
關于共識具體實現(xiàn) 后期寫一篇學習筆記記錄一下,這里先看共識完成后的流程,bft共識最后階段就是commitBlock
代碼如下
func (consensus *ConsensusTBFTImpl) commitBlock(block *common.Block) {
if localconf.ChainMakerConfig.DebugConfig.IsCommitWithoutPublish {
...
} else {
// 通知總線完成共識
consensus.msgbus.Publish(msgbus.CommitBlock, block)
}
}
監(jiān)聽共識完成的消息 是在CoreEngine
里module/core/syncmode/core_syncmode_impl.go
代碼如下
// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
// 1. receive proposal status from consensus
// 2. receive verify block from consensus
// 3. receive commit block message from consensus
// 4. receive propose signal from txpool
// 5. receive build proposal signal from chained bft consensus
switch message.Topic {
...
case msgbus.CommitBlock:
if block, ok := message.Payload.(*commonpb.Block); ok {
// 落塊
if err := c.BlockCommitter.AddBlock(block); err != nil {
...
}
}
...
}
}
具體的落塊流程 請參考http://www.reibang.com/p/1e169e8c7df4