在“以太坊源碼深入分析(2)”一文中憔四,我們提到Ethereum作為一個service,被Node 注冊進(jìn)去颗圣。Node start的時候會啟動其注冊的所有服務(wù)利朵,Ethereum service也是一樣。
一逞壁,ethereum service的初始化和啟動
初始化方法
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
stopDbUpgrade := upgradeDeduplicateData(chainDb)
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
shutdownChan: make(chan bool),
stopDbUpgrade: stopDbUpgrade,
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
bcVersion := core.GetBlockChainVersion(chainDb)
if bcVersion != core.BlockChainVersion && bcVersion != 0 {
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
}
core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
}
var (
vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
eth.bloomIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.ApiBackend = &EthApiBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)
return eth, nil
}
1流济,如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法腌闯。
2绳瘟,chainDb, err := CreateDB(ctx, config, "chaindata")打開leveldb,leveldb是eth存儲數(shù)據(jù)庫姿骏。
3糖声,stopDbUpgrade := upgradeDeduplicateData(chainDb) 檢查chainDb版本,如果需要的話分瘦,啟動后臺進(jìn)程進(jìn)行升級蘸泻。
4,chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)裝載創(chuàng)世區(qū)塊嘲玫。 根據(jù)節(jié)點條件判斷是從數(shù)據(jù)庫里面讀取悦施,還是從默認(rèn)配置文件讀取,還是從自定義配置文件讀取去团,或者是從代碼里面獲取默認(rèn)值抡诞。并返回區(qū)塊鏈的config和創(chuàng)世塊的hash穷蛹。
5,裝載Etherum struct的各個成員沐绒。eventMux和accountManager 是Node 啟動 eth service的時候傳入的俩莽。eventMux可以認(rèn)為是一個全局的事件多路復(fù)用器,accountManager認(rèn)為是一個全局的賬戶管理器乔遮。engine創(chuàng)建共識引擎扮超。etherbase 配置此Etherum的主賬號地址。初始化bloomRequests 通道和bloom過濾器蹋肮。
6出刷,判斷客戶端版本號和數(shù)據(jù)庫版本號是否一致
7,eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain坯辩,也就是eth的區(qū)塊鏈
8馁龟,eth.blockchain.SetHead(compat.RewindTo) 根據(jù)創(chuàng)始區(qū)塊設(shè)置區(qū)塊頭
9,eth.bloomIndexer.Start(eth.blockchain)啟動bloomIndexer
10漆魔,eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 區(qū)塊鏈的交易池坷檩,存儲本地生產(chǎn)的和P2P網(wǎng)絡(luò)同步過來的交易。
11改抡,eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊協(xié)議管理器矢炼,用于區(qū)塊鏈P2P通訊
12, miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化礦工
13阿纤,eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 創(chuàng)建預(yù)言最新gasprice的預(yù)言機(jī)句灌。
ethereum service的初始化配置了不少東西,基本上涉及到了以太坊區(qū)塊鏈系統(tǒng)的所有內(nèi)容欠拾,后續(xù)一一分解各個模塊胰锌。
啟動方法
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
s.startBloomHandlers()
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
首先啟動bloom過濾器
eth 的net 相關(guān)Api 加入RPC 服務(wù)。
s.protocolManager.Start(maxPeers) 設(shè)置最大同步節(jié)點數(shù)藐窄,并啟動eth P2P通訊资昧。
如果ethereum service 出問題了才會啟動lesServer。
二枷邪,ProtocolManager 以太坊P2P通訊協(xié)議管理
首先分析一下同在eth目錄下的eth/handler.go榛搔。
ProtocolManager 的初始化方法
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
// If fast sync is running, deny importing weird blocks
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
return manager.blockchain.InsertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
1,peers 為以太坊臨近的同步網(wǎng)絡(luò)節(jié)點东揣,newPeerCh践惑、noMorePeers、txsyncCh嘶卧、quitSync對應(yīng)同步的通知
2尔觉,manager.SubProtocols 創(chuàng)建以太坊 P2P server 的 通訊協(xié)議,通常只有一個值芥吟。manager.SubProtocols侦铜,在Node start的時候傳給以太坊 P2P server并同時start P2P server专甩。協(xié)議里面三個函數(shù)指針(Run、NodeInfo钉稍、PeerInfo)非常重要涤躲,后面會用到。
3贡未,manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
創(chuàng)建了一個下載器种樱,從遠(yuǎn)程網(wǎng)絡(luò)節(jié)點中獲取hashes和blocks。
4俊卤,manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)收集網(wǎng)絡(luò)其他以太坊節(jié)點發(fā)過來的同步通知嫩挤,進(jìn)行驗證,并做出相應(yīng)的處理消恍。初始化傳入的幾個參數(shù) 都是用于處理同步區(qū)塊鏈數(shù)據(jù)的函數(shù)指針
Ethereum service 啟動的時候會同時啟動ProtocolManager岂昭,ProtocolManager的start()方法:
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txCh = make(chan core.TxPreEvent, txChanSize)
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
1,創(chuàng)建一個新交易的訂閱通道狠怨,并啟動交易廣播的goroutine
2约啊,創(chuàng)建一個挖坑的訂閱通道,并啟動挖坑廣播的goroutine
注:同為訂閱通道為什么pm.txSub和pm.minedBlockSub的實現(xiàn)不一樣佣赖?深入代碼會發(fā)現(xiàn)pm.txSub用的是event/feed通知方式棍苹,pm.minedBlockSub用的是event/TypeMuxEvent通知方式,event/TypeMuxEvent方式將要被Deprecated茵汰。
3,pm.syncer() 啟動同步goroutine孽鸡,定時的和網(wǎng)絡(luò)其他節(jié)點同步蹂午,并處理網(wǎng)絡(luò)節(jié)點的相關(guān)通知
4,pm.txsyncLoop() 啟動交易同步goroutine彬碱,把新的交易均勻的同步給網(wǎng)路節(jié)點
三豆胸,ProtocolManager主動向網(wǎng)絡(luò)節(jié)點廣播
ProtocolManager Start()方法里面的4個goroutine都是處理ProtocolManager向以太坊網(wǎng)絡(luò)節(jié)點進(jìn)行廣播的。
1巷疼,pm.txBroadcastLoop()方法
func (self *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-self.txCh:
self.BroadcastTx(event.Tx.Hash(), event.Tx)
// Err() channel will be closed when unsubscribing.
case <-self.txSub.Err():
return
}
}
}
core/tx_pool.go 產(chǎn)生新的交易的時候會send self.txCh晚胡,這時候會激活
self.BroadcastTx(event.Tx.Hash(), event.Tx)
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.SendTransactions(types.Transactions{tx})
}
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
向緩存的沒有這個交易hash的網(wǎng)絡(luò)節(jié)點廣播此次交易。
2嚼沿,pm.minedBroadcastLoop()方法
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
}
}
收到miner.go 里面NewMinedBlockEvent 挖到新區(qū)塊的事件通知估盘,激活self.BroadcastBlock(ev.Block, true)
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer
if propagate {
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.SendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}
如果propagate為true 向網(wǎng)絡(luò)節(jié)點廣播整個挖到的block,為false 只廣播挖到的區(qū)塊的hash值和number值骡尽。廣播的區(qū)塊還包括這個區(qū)塊打包的所有交易遣妥。
3,pm.syncer() 方法
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle)
defer forceSync.Stop()
for {
select {
case <-pm.newPeerCh:
// Make sure we have peers to select from, then sync
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
case <-forceSync.C:
// Force a sync even if not enough peers are present
go pm.synchronise(pm.peers.BestPeer())
case <-pm.noMorePeers:
return
}
}
}
pm.fetcher.Start()啟動 fetcher攀细,輔助同步區(qū)塊數(shù)據(jù)
當(dāng)P2P server執(zhí)行 ProtocolManager 的p2p.Protocol 的Run指針的時候會send pm.newPeerCh箫踩,這時候選擇最優(yōu)的網(wǎng)絡(luò)節(jié)點(TD 總難度最大的)啟動pm.synchronise(pm.peers.BestPeer()) goroutine爱态。
// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
// Short circuit if no peers are available
if peer == nil {
return
}
// Make sure the peer's TD is higher than our own
currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
pHead, pTd := peer.Head()
if pTd.Cmp(td) <= 0 {
return
}
// Otherwise try to sync with the downloader
mode := downloader.FullSync
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Fast sync was explicitly requested, and explicitly granted
mode = downloader.FastSync
} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
// The only scenario where this can happen is if the user manually (or via a
// bad block) rolled back a fast sync node below the sync point. In this case
// however it's safe to reenable fast sync.
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}
// Run the sync cycle, and disable fast sync if we've went past the pivot block
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return
}
if atomic.LoadUint32(&pm.fastSync) == 1 {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&pm.fastSync, 0)
}
atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
// all its out-of-date peers of the availability of a new block. This failure
// scenario will most often crop up in private and hackathon networks with
// degenerate connectivity, but it should be healthy for the mainnet too to
// more reliably update peers or the local TD state.
go pm.BroadcastBlock(head, false)
}
}
如果最優(yōu)的網(wǎng)絡(luò)節(jié)點的TD值大于本地最新區(qū)塊的TD值,調(diào)用pm.downloader.Synchronise(peer.id, pHead, pTd, mode)進(jìn)行同步境钟。同步完成后再屌用go pm.BroadcastBlock(head, false)锦担,把自己最新的區(qū)塊狀態(tài)廣播出去。
4慨削,pm.txsyncLoop()方法
func (pm *ProtocolManager) txsyncLoop() {
var (
pending = make(map[discover.NodeID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
// Schedule the next send.
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
}
當(dāng)從網(wǎng)絡(luò)節(jié)點同步過來最新的交易數(shù)據(jù)后洞渔,本地也會把新同步下來的交易數(shù)據(jù)廣播給網(wǎng)絡(luò)中的其他節(jié)點。
總結(jié)一下
這四個goroutine 基本上就在不停的做廣播區(qū)塊理盆、廣播交易痘煤,同步到區(qū)塊、同步到交易猿规,再廣播區(qū)塊衷快、廣播交易。