ChainMaker2.1.0交易詳解

節(jié)點啟動時會啟動以下幾種服務

1歉秫、net service p2p服務模塊
2触徐、core engine 區(qū)塊處理模塊
3、consensus module 共識服務模塊
4券册、tx pool 交易服務模塊
5蚣常、sync service 異步服務模塊
代碼在module/blockchain/blockchain_start.go:11

交易分類:配置交易和普通交易

交易流程

根據流程圖閱讀源碼

1. 交易生成分為客戶端發(fā)送市咽,其他節(jié)點同步過來

用戶/APP發(fā)送交易:用戶通過sdk創(chuàng)建交易并簽名后,通過RPC方式調用遠程服務器invoke方法module/rpcserver/api_service.go:149 核心代碼如下

// invoke contract according to TxType
func (s *ApiService) invoke(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
    switch tx.Payload.TxType {
    //查詢交易
    case commonPb.TxType_QUERY_CONTRACT:
        return s.dealQuery(tx, source)
     //invoke交易
    case commonPb.TxType_INVOKE_CONTRACT:
        return s.dealTransact(tx, source)
    }
}
// dealTransact - deal transact tx
func (s *ApiService) dealTransact(tx *commonPb.Transaction, source protocol.TxSource) *commonPb.TxResponse {
...
    err = s.chainMakerServer.AddTx(tx.Payload.ChainId, tx, source)
...
}

其中chainMakerServer對象是管理整個區(qū)塊鏈項目的最頂層的類module/blockchain/chainmaker_server.go代碼如下

// ChainMakerServer manage all blockchains
type ChainMakerServer struct {
    // net shared by all chains 管理網絡 libp2p 或者是Liquid
    net protocol.Net
    // blockchains known by this node 管理多個鏈存儲 
    blockchains sync.Map // map[string]*Blockchain
}
2. 將新交易加入到交易池txPool位于chainmaker.org/chainmaker/txpool-single/v2@v2.1.0/tx_pool_impl.go:175 核心代碼如下
func (pool *txPoolImpl) AddTx(tx *commonPb.Transaction, source protocol.TxSource) error {
    ...
    // 存儲交易
    memTx := &mempoolTxs{isConfigTxs: false, txs: []*commonPb.Transaction{tx}, source: source}
    if utils.IsConfigTx(tx) || utils.IsManageContractAsConfigTx(tx,
        pool.chainConf.ChainConfig().Contract.EnableSqlSupport) {
        memTx.isConfigTxs = true
    }
    t := time.NewTimer(time.Second)
    defer t.Stop()
    select {
       // 交易存儲
    case pool.addTxsCh <- memTx:
    case <-t.C:
        pool.log.Warnf("add transaction timeout")
        return fmt.Errorf("add transaction timeout")
    }
    // 節(jié)點廣播交易 
    if source == protocol.RPC {
        pool.broadcastTx(tx.Payload.TxId, txMsg)
    }
    return nil
}

3. 節(jié)點廣播交易pool.broadcastTx(tx.Payload.TxId, txMsg)這也就是第二種方式對應交易流程圖中的1',節(jié)點收到廣播的交易后加入交易池抵蚊。
4. 當節(jié)點啟動時施绎,交易池服務txPool 會創(chuàng)建一個定時任務,來監(jiān)聽新交易
func (pool *txPoolImpl) listen() {
    // 定時觸發(fā)
    flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second)
    defer flushTicker.Stop()
    for {
        select {
          // 當收到新交易時進行觸發(fā)
        case poolTxs := <-pool.addTxsCh:
            pool.flushOrAddTxsToCache(poolTxs)
        case <-flushTicker.C:
          // 從緩存池中撈數據
            if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 {
                pool.flushCommonTxToQueue(nil)
            }
        case <-pool.stopCh:
            return
        }
    }
}

當收到新交易時贞绳,會判斷交易是否是配置交易谷醉,如果是配置交易直接放入到配置交易隊列中,否則放入普通交易隊列 核心代碼如下

func (pool *txPoolImpl) flushOrAddTxsToCache(memTxs *mempoolTxs) {
    ...
     // 如果是配置交易冈闭,則加入配置消息隊列
    if memTxs.isConfigTxs {
        pool.flushConfigTxToQueue(memTxs)
        return
    }
     // 普通交易孤紧,如果到的交易緩存臨界值 就加入消息隊列中
    if pool.cache.isFlushByTxCount(memTxs) {
        pool.flushCommonTxToQueue(memTxs)
    } else {
        pool.cache.addMemoryTxs(memTxs)
    }
}

加入隊列后,會向msgbus發(fā)送一個交易池信號拒秘,通知其他模塊

func (pool *txPoolImpl) flushCommonTxToQueue(memTxs *mempoolTxs) {
    defer func() {
       //  向msgbus 發(fā)出塊信號
        pool.updateAndPublishSignal()
        pool.cache.reset()
    }()
    // 加入消息隊列
    // RPC:來自RPC的交易不驗證基礎的交易信息(如交易ID、時間戳是否符合規(guī)范)臭猜、不驗證交易者的證書躺酒;因為RPC模塊已做此類校驗;成功添加至交易池的交易會廣播給其它連接的節(jié)點
    // P2P:其它節(jié)點的廣播的交易蔑歌,進行所有的校驗
    // INTERNAL:如果節(jié)點在同一高度接收到多個驗證有效的區(qū)塊羹应,當其中某個區(qū)塊上鏈后,其余的相同高度區(qū)塊內的交易會被重新添加進交易池次屠,防止這些交易被拋棄园匹。
    rpcTxs, p2pTxs, internalTxs := pool.cache.mergeAndSplitTxsBySource(memTxs)
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: rpcTxs, source: protocol.RPC})
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: p2pTxs, source: protocol.P2P})
    pool.queue.addTxsToCommonQueue(&mempoolTxs{txs: internalTxs, source: protocol.INTERNAL})
}
5. 根據交易池是否已滿,觸發(fā)新區(qū)塊生成

CoreEngine 位于module/core/syncmode/core_syncmode_impl.go里監(jiān)聽消息總線發(fā)出的消息劫灶,代碼如下

// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
    // 1. receive proposal status from consensus
    // 2. receive verify block from consensus
    // 3. receive commit block message from consensus
    // 4. receive propose signal from txpool
    // 5. receive build proposal signal from chained bft consensus
    switch message.Topic {
    ... 
// 收到交易池信號
    case msgbus.TxPoolSignal:
        if signal, ok := message.Payload.(*txpoolpb.TxPoolSignal); ok {
            c.blockProposer.OnReceiveTxPoolSignal(signal)
        }
    ...
    }
}

其中blockProposer 位于module/core/syncmode/proposer/block_proposer_impl.go發(fā)出生成提議候選區(qū)塊裸违,核心代碼如下

// OnReceiveTxPoolSignal, receive txpool signal and deliver to chan txpool signal 收到交易信號
func (bp *BlockProposerImpl) OnReceiveTxPoolSignal(txPoolSignal *txpoolpb.TxPoolSignal) {
    bp.txPoolSignalC <- txPoolSignal
}
// Start, start proposing loop
func (bp *BlockProposerImpl) startProposingLoop() {
    for {
        select {
            // 普通交易區(qū)塊則由定時器來生成區(qū)塊,由配置文件決定
        case <-bp.proposeTimer.C:
            if !bp.isSelfProposer() {
                break
            }
            // 生成候選區(qū)塊
            go bp.proposeBlock()

        case signal := <-bp.txPoolSignalC:
            if !bp.isSelfProposer() {
                break
            }
            // 如果接受到的是修改配置的交易,則立馬生成候選區(qū)塊
            if signal.SignalType != txpoolpb.SignalType_BLOCK_PROPOSE {
                break
            }
           // 生成候選區(qū)塊
            go bp.proposeBlock()

        case <-bp.exitC:
            bp.proposeTimer.Stop()
            bp.log.Info("block proposer loop stoped")
            return
        }
    }
}
// proposeBlock, to check if proposer can propose block right now
// if so, start proposing
func (bp *BlockProposerImpl) proposeBlock() {
...
 //開始創(chuàng)建提案區(qū)塊
    go bp.proposing(proposingHeight, lastBlock.Header.BlockHash)
...
//等提案區(qū)塊完成
    <-bp.finishProposeC
}

// proposing, propose a block in new height
func (bp *BlockProposerImpl) proposing(height uint64, preHash []byte) *commonpb.Block {
    startTick := utils.CurrentTimeMillisSeconds()
    defer bp.yieldProposing()
//從提案緩存里獲取 當前高度的提案
    selfProposedBlock := bp.proposalCache.GetSelfProposedBlockAt(height)
    if selfProposedBlock != nil {
        if bytes.Equal(selfProposedBlock.Header.PreBlockHash, preHash) {
            // Repeat propose block if node has proposed before at the same height
            bp.proposalCache.SetProposedAt(height)
            _, txsRwSet, _ := bp.proposalCache.GetProposedBlock(selfProposedBlock)
            bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: selfProposedBlock, TxsRwSet: txsRwSet})
            bp.log.Infof("proposer success repeat [%d](txs:%d,hash:%x)",
                selfProposedBlock.Header.BlockHeight, selfProposedBlock.Header.TxCount, selfProposedBlock.Header.BlockHash)
            return nil
        }
        bp.proposalCache.ClearTheBlock(selfProposedBlock)
        // Note: It is not possible to re-add the transactions in the deleted block to txpool; because some transactions may
        // be included in other blocks to be confirmed, and it is impossible to quickly exclude these pending transactions
        // that have been entered into the block. Comprehensive considerations, directly discard this block is the optimal
        // choice. This processing method may only cause partial transaction loss at the current node, but it can be solved
        // by rebroadcasting on the client side.
        bp.txPool.RetryAndRemoveTxs(nil, selfProposedBlock.Txs)
    }

    // retrieve tx batch from tx pool
    startFetchTick := utils.CurrentTimeMillisSeconds()
    //從交易池里 撈出一批交易
    fetchBatch := bp.txPool.FetchTxBatch(height)
  // 檢查重復交易
    checkedBatch := bp.txDuplicateCheck(fetchBatch)
    txCapacity := int(bp.chainConf.ChainConfig().Block.BlockTxCapacity)
// 根據配置獲取每塊最大的交易數 默認100 超過的就放到交易池里
    if len(checkedBatch) > txCapacity {
        // check if checkedBatch > txCapacity, if so, strict block tx count according to  config,
        // and put other txs back to txpool.
        txRetry := checkedBatch[txCapacity:]
        checkedBatch = checkedBatch[:txCapacity]
        bp.txPool.RetryAndRemoveTxs(txRetry, nil)
    }
// 生成區(qū)塊 生成讀寫集 生成提案區(qū)塊并放入提案區(qū)塊緩存proposalCache中
    block, timeLasts, err := bp.generateNewBlock(height, preHash, checkedBatch)
...
    _, rwSetMap, _ := bp.proposalCache.GetProposedBlock(block)

    newBlock := new(commonpb.Block)
    if common.IfOpenConsensusMessageTurbo(bp.chainConf) {
        ...
    } else {
        newBlock = block
    }
...
//提交生成的提案區(qū)塊
    bp.msgBus.Publish(msgbus.ProposedBlock, &consensuspb.ProposalBlock{Block: newBlock, TxsRwSet: rwSetMap})
...
    return block
}

關于如何生成新區(qū)塊請看http://www.reibang.com/p/fb6a9644246d
由于本運行環(huán)境采用的是TBFT共識本昏,所以由TBFT共識模塊接收msgbus.ProposedBlock 消息供汛,

6. 開始共識tbft

接收總線消息module/consensus/tbft/consensus_tbft_impl.go代碼如下

//接收MsgBus總線消息
func (consensus *ConsensusTBFTImpl) OnMessage(message *msgbus.Message) {
    switch message.Topic {
// 獲取提案區(qū)塊 放入通道
    case msgbus.ProposedBlock:
            consensus.proposedBlockC <- proposedBlock
    case msgbus.VerifyResult:
            consensus.verifyResultC <- verifyResult
    case msgbus.RecvConsensusMsg:
            consensus.externalMsgC <- tbftMsg
    case msgbus.BlockInfo:
            consensus.blockHeightC <- blockInfo.Block.Header.BlockHeight
    }
}
//通道處理
func (consensus *ConsensusTBFTImpl) handle() {
    loop := true
    for loop {
        select {
        case proposedBlock := <-consensus.proposedBlockC:
//處理提案區(qū)塊
            consensus.handleProposedBlock(proposedBlock, false)
        case result := <-consensus.verifyResultC:
            consensus.handleVerifyResult(result, false)
        case height := <-consensus.blockHeightC:
            consensus.handleBlockHeight(height)
        case msg := <-consensus.externalMsgC:
            consensus.handleConsensusMsg(msg)
// 共識內部消息
        case msg := <-consensus.internalMsgC:
            consensus.handleConsensusMsg(msg)
        case ti := <-consensus.timeScheduler.GetTimeoutC():
            consensus.handleTimeout(ti, false)
        case <-consensus.closeC:
            loop = false
        }
    }
}

關于共識具體實現(xiàn) 后期寫一篇學習筆記記錄一下,這里先看共識完成后的流程,bft共識最后階段就是commitBlock代碼如下

func (consensus *ConsensusTBFTImpl) commitBlock(block *common.Block) {
    if localconf.ChainMakerConfig.DebugConfig.IsCommitWithoutPublish {
        ...
    } else {
            // 通知總線完成共識
        consensus.msgbus.Publish(msgbus.CommitBlock, block)
    }
}

監(jiān)聽共識完成的消息 是在CoreEnginemodule/core/syncmode/core_syncmode_impl.go代碼如下

// OnMessage consume a message from message bus
func (c *CoreEngine) OnMessage(message *msgbus.Message) {
    // 1. receive proposal status from consensus
    // 2. receive verify block from consensus
    // 3. receive commit block message from consensus
    // 4. receive propose signal from txpool
    // 5. receive build proposal signal from chained bft consensus
    switch message.Topic {
    ...
    case msgbus.CommitBlock:
        if block, ok := message.Payload.(*commonpb.Block); ok {
// 落塊
            if err := c.BlockCommitter.AddBlock(block); err != nil {
                ...
            }
        }
    ...
    }
}

具體的落塊流程 請參考http://www.reibang.com/p/1e169e8c7df4

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末怔昨,一起剝皮案震驚了整個濱河市雀久,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌趁舀,老刑警劉巖赖捌,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異矮烹,居然都是意外死亡越庇,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門擂送,熙熙樓的掌柜王于貴愁眉苦臉地迎上來悦荒,“玉大人,你說我怎么就攤上這事嘹吨“嵛叮” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵蟀拷,是天一觀的道長碰纬。 經常有香客問我,道長问芬,這世上最難降的妖魔是什么悦析? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮此衅,結果婚禮上强戴,老公的妹妹穿的比我還像新娘。我一直安慰自己挡鞍,他們只是感情好骑歹,可當我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著墨微,像睡著了一般道媚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上翘县,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天最域,我揣著相機與錄音,去河邊找鬼锈麸。 笑死镀脂,一個胖子當著我的面吹牛,可吹牛的內容都是我干的忘伞。 我是一名探鬼主播狗热,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼钞馁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了匿刮?” 一聲冷哼從身側響起僧凰,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎熟丸,沒想到半個月后训措,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡光羞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年绩鸣,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纱兑。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡呀闻,死狀恐怖,靈堂內的尸體忽然破棺而出潜慎,到底是詐尸還是另有隱情捡多,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布铐炫,位于F島的核電站垒手,受9級特大地震影響,放射性物質發(fā)生泄漏倒信。R本人自食惡果不足惜科贬,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鳖悠。 院中可真熱鬧榜掌,春花似錦、人聲如沸乘综。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瘾带。三九已至,卻和暖如春熟菲,著一層夾襖步出監(jiān)牢的瞬間看政,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工抄罕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留允蚣,地道東北人。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓呆贿,卻偏偏與公主長得像嚷兔,于是被迫代替她去往敵國和親森渐。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內容