上一節(jié)主要講了Ethereum服務(wù)和以太坊P2P協(xié)議通訊模塊ProtocolManager的初始化和啟動(dòng),以及以太坊通訊協(xié)議如何廣播給其他的網(wǎng)絡(luò)節(jié)點(diǎn)镰踏。
這一節(jié)講講怜庸,以太坊通訊協(xié)議如何處理接收到的廣播消息上煤。以及fetcher怎么工作。
一他匪,ProtocolManager接收網(wǎng)絡(luò)節(jié)點(diǎn)廣播消息
首先看看p2p.Protocol的結(jié)構(gòu)
type Protocol struct {
Name string
Version uint
Length uint64
Run func(peer *Peer, rw MsgReadWriter) error
NodeInfo func() interface{}
PeerInfo func(id discover.NodeID) interface{}
}
上一節(jié)ProtocolManager初始化的時(shí)候會實(shí)例化一個(gè)p2p.Protocol,并實(shí)
現(xiàn)了Protocol里面的三個(gè)成員變量和三個(gè)成員函數(shù)指針夸研。
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
這三個(gè)函數(shù)指針實(shí)質(zhì)是注入p2p server的回調(diào)邦蜜,用于處理網(wǎng)絡(luò)中其他節(jié)點(diǎn)的廣播通知、獲取本以太坊Node 的info亥至、本地節(jié)點(diǎn)的info悼沈。
Run方法尤其重要,當(dāng)發(fā)現(xiàn)網(wǎng)絡(luò)中新的p2p節(jié)點(diǎn)時(shí)候就會執(zhí)行Run方法姐扮,這時(shí)候會執(zhí)行manager.handle(peer)絮供。
func (pm *ProtocolManager) handle(p *peer) error {
if pm.peers.Len() >= pm.maxPeers {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
首先根遠(yuǎn)程網(wǎng)絡(luò)節(jié)點(diǎn)握手Handshake()方法
func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
// Send out own handshake in a new thread
errc := make(chan error, 2)
var status statusData // safe to read after two values have been received from errc
go func() {
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkId: network,
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
})
}()
go func() {
errc <- p.readStatus(network, &status, genesis)
}()
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
for i := 0; i < 2; i++ {
select {
case err := <-errc:
if err != nil {
return err
}
case <-timeout.C:
return p2p.DiscReadTimeout
}
}
p.td, p.head = status.TD, status.CurrentBlock
return nil
}
把本地節(jié)點(diǎn)的狀態(tài)發(fā)送給遠(yuǎn)程節(jié)點(diǎn)包括ProtocolVersion、NetworkId茶敏、TD杯缺、CurrentBlock、GenesisBlock睡榆,然后讀取返回的狀態(tài)數(shù)據(jù)萍肆,并做對比,如果都滿足條件就握手成功胀屿。
然后將這個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)加入到緩存的節(jié)點(diǎn)列表中pm.peers.Register(p)塘揣。
把本地的產(chǎn)生的未打包的交易發(fā)送給網(wǎng)絡(luò)節(jié)點(diǎn)。
驗(yàn)證Dao 硬分叉宿崭,如果超時(shí)則從緩存節(jié)點(diǎn)列表中刪除這個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)亲铡。
最后進(jìn)入pm.handleMsg(p)主循環(huán),不停的監(jiān)聽網(wǎng)絡(luò)節(jié)點(diǎn)發(fā)過來的消息,并處理奖蔓。
目前以太坊節(jié)點(diǎn)可以接受如下網(wǎng)絡(luò)消息:
const (
// Protocol messages belonging to eth/62
StatusMsg = 0x00
NewBlockHashesMsg = 0x01
TxMsg = 0x02
GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06
NewBlockMsg = 0x07
// Protocol messages belonging to eth/63
GetNodeDataMsg = 0x0d
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
)
pm.handleMsg(p)很長赞草,不一一分析。純粹是每個(gè)消息一個(gè)case處理吆鹤。寫這個(gè)模塊的作者是不是可以考慮重構(gòu)一下代碼厨疙,這么核心的代碼模塊,可讀性和可擴(kuò)展性都太差了疑务。
二沾凄,F(xiàn)etcher分析,之Notify()
fetcher是用來輔助同步區(qū)塊數(shù)據(jù)的知允,記錄各個(gè)區(qū)塊頭和區(qū)塊體的同步狀態(tài)撒蟀,但它并不做真正下載區(qū)塊數(shù)據(jù)的事情,下載的事情交由downloader來做温鸽。那fetcher具體是怎么工作的呢保屯?
我們先看看pm.handleMsg 在收到 NewBlockHashesMsg廣播通知的處理代碼:
case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
// Mark the hashes as present at the remote node
for _, block := range announces {
p.MarkBlock(block.Hash)
}
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
從廣播通知里會獲取到一個(gè)newBlockHashesData的列表。newBlockHashesData只包括block的hash值和block的number值涤垫。
然后每個(gè)newBlockHashesData調(diào)用pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)方法配椭,除了傳入block的hash值和block的number值,還需要傳入當(dāng)前的時(shí)間戳雹姊,peer.go的兩個(gè)函數(shù)指針股缸。
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{
hash: hash,
number: number,
time: time,
origin: peer,
fetchHeader: headerFetcher,
fetchBodies: bodyFetcher,
}
select {
case f.notify <- block:
return nil
case <-f.quit:
return errTerminated
}
}
Notify()方法把傳進(jìn)來的參數(shù)拼成一個(gè)announce對象,然后send給f.notify吱雏。fetcher的loop()主回路里f.notify receive 到這個(gè)notification, 進(jìn)行處理敦姻。
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
propAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
propAnnounceDropMeter.Mark(1)
break
}
}
// All is well, schedule the announce if block's not yet downloading
if _, ok := f.fetching[notification.hash]; ok {
break
}
if _, ok := f.completing[notification.hash]; ok {
break
}
f.announces[notification.origin] = count
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
f.announceChangeHook(notification.hash, true)
}
if len(f.announced) == 1 {
f.rescheduleFetch(fetchTimer)
}
1,將收到的不滿足條件的通知都丟棄掉歧杏,如果在f.fetching 狀態(tài)列表里和f.completing 狀態(tài)列表里镰惦,也直接返回。接著更新notification.origin 這個(gè)節(jié)點(diǎn)的announces 數(shù)量犬绒,添加到f.announced 等待fetch的表里旺入。
2,如果len(f.announced[notification.hash]) == 1 說明f.announced只有這一個(gè)通知凯力,則調(diào)用f.announceChangeHook茵瘾。
3,如果len(f.announced) == 1 也說明只有一個(gè)通知咐鹤,則啟動(dòng)fetchTimer的調(diào)度拗秘。
case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
request := make(map[string][]common.Hash)
for hash, announces := range f.announced {
if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
}
}
// Send out all block header requests
for peer, hashes := range request {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
}
for _, hash := range hashes {
headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}()
}
// Schedule the next fetch if blocks are still pending
f.rescheduleFetch(fetchTimer)
1,首先遍歷f.announced祈惶,如果超過了arriveTimeout-gatherSlack這個(gè)時(shí)間雕旨,把這個(gè)hash對應(yīng)在fetcher里面的狀態(tài)都清了扮匠。
這里隨機(jī)拿這個(gè)announces里面任意一個(gè)announce,為啥隨機(jī)取一個(gè)呢凡涩?因?yàn)槎际峭粋€(gè)block的hash棒搜,這個(gè)hash下的哪一個(gè)announce都是一樣的。
如果發(fā)現(xiàn)超時(shí)了還沒有沒有獲取到這個(gè)hash的block活箕,則把這個(gè)announce加到request列表中力麸,同時(shí)重新把a(bǔ)nnounce放到f.fetching狀態(tài)列表。
2讹蘑,然后遍歷request列表末盔,request列表里面的每個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)過來的所有的block的hash筑舅,都會調(diào)用fetchHeader(hash)方法來獲取header數(shù)據(jù)座慰。
這個(gè)fetchHeader(hash)方法是pm.fetcher.Notify傳進(jìn)來的,peer.go
里面的一個(gè)全局方法翠拣。
3版仔, 這時(shí)候NewBlockHashesMsg 的fetcher處理就結(jié)束了,最后再啟動(dòng)fetchTimer的調(diào)度误墓。
三蛮粮,F(xiàn)etcher分析, 之FilterHeaders()
fetchHeader(hash)方法谜慌,調(diào)用了peer.go 里面的全局方法RequestOneHeader(hash common.Hash) Send給網(wǎng)絡(luò)節(jié)點(diǎn)一個(gè)GetBlockHeadersMsg 消息然想。
然后pm.handleMsg 收到 BlockHashesMsg廣播通知
case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// If no headers were received, but we're expending a DAO fork check, maybe it's that
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
}
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
// If it's a potential DAO fork check, validate against the rules
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer
p.forkDrop.Stop()
p.forkDrop = nil
// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
如果不是硬分叉的daoHeader,同時(shí)len(headers) == 1欣范,則執(zhí)行pm.fetcher.FilterHeaders(p.id, headers, time.Now())方法
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
filter := make(chan *headerFilterTask)
select {
case f.headerFilter <- filter:
case <-f.quit:
return nil
}
// Request the filtering of the header list
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case <-f.quit:
return nil
}
// Retrieve the headers remaining after filtering
select {
case task := <-filter:
return task.headers
case <-f.quit:
return nil
}
}
send 一個(gè)filter 到f.headerFilter变泄,fetcher的loop()主回路里f.headerFilter receive 到這個(gè)filter,進(jìn)行處理恼琼。
case filter := <-f.headerFilter:
// Headers arrived from a remote peer. Extract those that were explicitly
// requested by the fetcher, and return everything else so it's delivered
// to other parts of the system.
var task *headerFilterTask
select {
case task = <-filter:
case <-f.quit:
return
}
headerFilterInMeter.Mark(int64(len(task.headers)))
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
for _, header := range task.headers {
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
f.dropPeer(announce.origin)
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
if f.getBlock(hash) == nil {
announce.header = header
announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
complete = append(complete, block)
f.completing[hash] = announce
continue
}
// Otherwise add to the list of blocks needing completion
incomplete = append(incomplete, announce)
} else {
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
f.forgetHash(hash)
}
} else {
// Fetcher doesn't know about it, add to the return list
unknown = append(unknown, header)
}
}
headerFilterOutMeter.Mark(int64(len(unknown)))
select {
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
case <-f.quit:
return
}
// Schedule the retrieved headers for body completion
for _, announce := range incomplete {
hash := announce.header.Hash()
if _, ok := f.completing[hash]; ok {
continue
}
f.fetched[hash] = append(f.fetched[hash], announce)
if len(f.fetched) == 1 {
f.rescheduleComplete(completeTimer)
}
}
// Schedule the header-only blocks for import
for _, block := range complete {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
1妨蛹,遍歷headerFilter里面的各個(gè)header,如果在 f.fetching狀態(tài)列表晴竞,且不在f.fetched狀態(tài)列表和 f.completing狀態(tài)列表蛙卤,就繼續(xù)進(jìn)行過濾,否則塞進(jìn)unknown隊(duì)列 發(fā)送給filter噩死,F(xiàn)ilterHeaders里面task 接收到filter颤难,并作為FilterHeaders的返回值返回。
2已维,如果發(fā)現(xiàn)這個(gè)header的number和從f.fetching狀態(tài)列表取到的announce的number不一樣乐严,說明有可能收到一個(gè)偽造的區(qū)塊通知,此時(shí)就要把這個(gè)可能的偽造節(jié)點(diǎn)和可能的偽造的hash拋棄衣摩,另可錯(cuò)殺昂验,不能放過捂敌。
3,如果本節(jié)點(diǎn)已經(jīng)有這個(gè)hash的block既琴,則放棄這個(gè)hash占婉。如果這個(gè)block里面沒有任何交易也沒有任何叔區(qū)塊,則把這個(gè)hash放入complete列表同時(shí)加入f.completing狀態(tài)列表甫恩,否則放入incomplete列表逆济。
4,在incomplete列表里面磺箕,且不在f.completing狀態(tài)列表里奖慌,則加入f.fetched狀態(tài)列表,啟動(dòng)completeTimer的調(diào)度松靡。
5简僧,在complete列表里面,同時(shí)也在f.completing狀態(tài)列表雕欺,則調(diào)用f.enqueue(announce.origin, block)方法岛马。
case <-completeTimer.C:
// At least one header's timer ran out, retrieve everything
request := make(map[string][]common.Hash)
for hash, announces := range f.fetched {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for completion
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.completing[hash] = announce
}
}
// Send out all block body requests
for peer, hashes := range request {
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
if f.completingHook != nil {
f.completingHook(hashes)
}
bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending
f.rescheduleComplete(completeTimer)
1,首先遍歷f.fetched屠列,hash對應(yīng)在fetcher里面的狀態(tài)都清了啦逆。
如果發(fā)現(xiàn)超時(shí)了還沒有沒有獲取到這個(gè)hash的block,則把這個(gè)announce加到request列表中笛洛,同時(shí)重新把a(bǔ)nnounce放到f.completing狀態(tài)列表夏志。
2,然后遍歷request列表苛让,request列表里面的每個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)過來的所有的block的hash沟蔑,都會調(diào)用fetchBodies(hashes)方法來獲取區(qū)塊body數(shù)據(jù)。這個(gè)fetchBodies(hashes)方法是peer.go里面的一個(gè)全局方法蝌诡。
3溉贿, 這時(shí)候BlockHashesMsg 的fetcher處理就結(jié)束了,最后再啟動(dòng)completeTimer循環(huán)調(diào)度浦旱。
四宇色,F(xiàn)etcher分析, 之FilterBodies() 颁湖,Enqueue()宣蠕,
1,fetchBodies(hash)方法甥捺,調(diào)用了peer.go 里面的全局方法RequestBodies(hashes []common.Hash) Send給網(wǎng)絡(luò)節(jié)點(diǎn)一個(gè)GetBlockBodiesMsg 消息抢蚀。
2,然后pm.handleMsg 會收到 BlockBodiesMsg廣播通知镰禾。
3皿曲,執(zhí)行 pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())唱逢。
接下來就和FilterHeaders()流程類似,一頓啪啪啪驗(yàn)證屋休,一頓啪啪啪改變狀態(tài)坞古,一頓啪啪啪通道跳轉(zhuǎn)
4,慶幸的是劫樟,走完FilterBodies()就完事了痪枫,不用在走timer調(diào)度,也不用再發(fā)網(wǎng)絡(luò)請求了叠艳。
5奶陈,在FilterHeaders()和FilterBodies()最后都走到了f.enqueue(announce.origin, block)方法
func (f *Fetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
op := &inject{
origin: peer,
block: block,
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
}
}
過濾掉太遠(yuǎn)的區(qū)塊。并把hash加入到f.queue列表中附较。
在loop主回路里面遍歷f.queue列表吃粒,并把列表中的block insert到本地的block chain中。
func (f *Fetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
return
}
// Quickly validate the header and propagate the block if it passes
switch err := f.verifyHeader(block.Header()); err {
case nil:
// All ok, quickly propagate to our peers
propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock:
// Weird future block, don't fail, but neither propagate
default:
// Something went very wrong, drop the peer
log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertChain(types.Blocks{block}); err != nil {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}
// If import succeeded, broadcast the block
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(block)
}
}()
}
首先調(diào)用共識引擎的方法f.verifyHeader(block.Header())翅睛,驗(yàn)證blockHeader的有效性声搁。
如果沒問題就廣播出去黑竞,告訴全世界我的區(qū)塊鏈更新了一個(gè)新區(qū)塊捕发。
然后調(diào)用f.insertChain(types.Blocks{block}) 插入本地區(qū)塊鏈。
插入成功很魂,最后再廣播一次(這是多么的自戀啊)扎酷,這次只廣播block的hash。
總結(jié)
fetcher.go 作為以太坊同步區(qū)塊的一個(gè)輔助類遏匆,它的職責(zé)就是層層把關(guān)法挨,層層過濾,抵制無效的區(qū)塊進(jìn)入幅聘,杜絕無用的同步請求凡纳。這塊代碼很多很亂,第一次看可能會有點(diǎn)暈帝蒿,第二次看可能還是很暈荐糜,多看幾次可能還會暈??,不過只要知道它做什么就好了葛超。