前言
這篇文章從區(qū)塊傳播策略入手冠绢,介紹新區(qū)塊是如何傳播到遠(yuǎn)端節(jié)點(diǎn)雁刷,以及新區(qū)塊加入到遠(yuǎn)端節(jié)點(diǎn)本地鏈的過(guò)程泽艘,同時(shí)會(huì)介紹fetcher模塊猖毫,fetcher的功能是處理Peer通知的區(qū)塊信息台谍。在介紹過(guò)程中,還會(huì)涉及到p2p吁断,eth等模塊趁蕊,不會(huì)專門(mén)介紹,而是專注區(qū)塊的傳播和加入?yún)^(qū)塊鏈的過(guò)程仔役。
當(dāng)前代碼是以太坊Release 1.8掷伙,如果版本不同,代碼上可能存在差異又兵。
總體過(guò)程和傳播策略
本節(jié)從宏觀角度介紹任柜,節(jié)點(diǎn)產(chǎn)生區(qū)塊后,為了傳播給遠(yuǎn)端節(jié)點(diǎn)做了啥沛厨,遠(yuǎn)端節(jié)點(diǎn)收到區(qū)塊后又做了什么乘盼,每個(gè)節(jié)點(diǎn)都連接了很多Peer,它傳播的策略是什么樣的俄烁?
總體流程和策略可以總結(jié)為绸栅,傳播給遠(yuǎn)端Peer節(jié)點(diǎn),Peer驗(yàn)證區(qū)塊無(wú)誤后页屠,加入到本地區(qū)塊鏈粹胯,繼續(xù)傳播新區(qū)塊信息蓖柔。具體過(guò)程如下。
先看總體過(guò)程风纠。產(chǎn)生區(qū)塊后况鸣,miner
模塊會(huì)發(fā)布一個(gè)事件NewMinedBlockEvent
,訂閱事件的協(xié)程收到事件后竹观,就會(huì)把新區(qū)塊的消息镐捧,廣播給它所有的peer,peer收到消息后臭增,會(huì)交給自己的fetcher模塊處理懂酱,fetcher進(jìn)行基本的驗(yàn)證后,區(qū)塊沒(méi)問(wèn)題誊抛,發(fā)現(xiàn)這個(gè)區(qū)塊就是本地鏈需要的下一個(gè)區(qū)塊列牺,則交給blockChain
進(jìn)一步進(jìn)行完整的驗(yàn)證,這個(gè)過(guò)程會(huì)執(zhí)行區(qū)塊所有的交易拗窃,無(wú)誤后把區(qū)塊加入到本地鏈瞎领,寫(xiě)入數(shù)據(jù)庫(kù),這個(gè)過(guò)程就是下面的流程圖随夸,圖1九默。
總體流程圖,能看到有個(gè)分叉宾毒,是因?yàn)楣?jié)點(diǎn)傳播新區(qū)塊是有策略的驼修。它的傳播策略為:
- 假如節(jié)點(diǎn)連接了
N
個(gè)Peer,它只向Peer列表的sqrt(N)
個(gè)Peer廣播完整的區(qū)塊消息伍俘。 - 向所有的Peer廣播只包含區(qū)塊Hash的消息。
收到區(qū)塊Hash的節(jié)點(diǎn)癌瘾,需要從發(fā)送給它消息的Peer那里獲取對(duì)應(yīng)的完整區(qū)塊,獲取區(qū)塊后就會(huì)按照?qǐng)D1的流程饵溅,加入到fetcher隊(duì)列妨退,最終插入本地區(qū)塊鏈后,將區(qū)塊的Hash值廣播給和它相連蜕企,但還不知道這個(gè)區(qū)塊的Peer咬荷。非產(chǎn)生區(qū)塊節(jié)點(diǎn)的策略圖,如圖3轻掩,黃色節(jié)點(diǎn)將區(qū)塊Hash傳播給青色節(jié)點(diǎn):
至此幸乒,可以看出以太坊采用以石擊水的方式,像水紋一樣唇牧,層層擴(kuò)散新產(chǎn)生的區(qū)塊罕扎。
Fetcher模塊是干啥的
fetcher模塊的功能聚唐,就是收集其他Peer通知它的區(qū)塊信息:1)完整的區(qū)塊2)區(qū)塊Hash消息。根據(jù)通知的消息腔召,獲取完整的區(qū)塊杆查,然后傳遞給eth
模塊把區(qū)塊插入?yún)^(qū)塊鏈。
如果是完整區(qū)塊臀蛛,就可以傳遞給eth插入?yún)^(qū)塊亲桦,如果只有區(qū)塊Hash,則需要從其他的Peer獲取此完整的區(qū)塊浊仆,然后再傳遞給eth插入?yún)^(qū)塊客峭。
源碼解讀
本節(jié)介紹區(qū)塊傳播和處理的細(xì)節(jié)東西,方式仍然是先用圖解釋流程氧卧,再是代碼流程桃笙。
產(chǎn)塊節(jié)點(diǎn)的傳播新區(qū)塊
節(jié)點(diǎn)產(chǎn)生區(qū)塊后,廣播的流程可以表示為圖4:
- 發(fā)布事件
- 事件處理函數(shù)選擇要廣播完整的Peer沙绝,然后將區(qū)塊加入到它們的隊(duì)列
- 事件處理函數(shù)把區(qū)塊Hash添加到所有Peer的另外一個(gè)通知隊(duì)列
- 每個(gè)Peer的廣播處理函數(shù)搏明,會(huì)遍歷它的待廣播區(qū)塊隊(duì)列和通知隊(duì)列,把數(shù)據(jù)封裝成消息闪檬,調(diào)用P2P接口發(fā)送出去
再看下代碼上的細(xì)節(jié)星著。
-
worker.wait()
函數(shù)發(fā)布事件NewMinedBlockEvent
。 -
ProtocolManager.minedBroadcastLoop()
是事件處理函數(shù)粗悯。它調(diào)用了2次pm.BroadcastBlock()
虚循。
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range pm.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
}
}
-
pm.BroadcastBlock()
的入?yún)?code>propagate為真時(shí),向部分Peer廣播完整的區(qū)塊样傍,調(diào)用peer.AsyncSendNewBlock()
横缔,否則向所有Peer廣播區(qū)塊頭,調(diào)用peer.AsyncSendNewBlockHash()
衫哥,這2個(gè)函數(shù)就是把數(shù)據(jù)放入隊(duì)列茎刚,此處不再放代碼。
// BroadcastBlock will either propagate a block to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer
// 這種情況撤逢,要把區(qū)塊廣播給部分peer
if propagate {
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
// 計(jì)算新的總難度
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
// Send the block to a subset of our peers
// 廣播區(qū)塊給部分peer
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it
// 把區(qū)塊hash值廣播給所有peer
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.AsyncSendNewBlockHash(block)
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}
-
peer.broadcase()
是每個(gè)Peer連接的廣播函數(shù)膛锭,它只廣播3種消息:交易、完整的區(qū)塊蚊荣、區(qū)塊的Hash初狰,這樣表明了節(jié)點(diǎn)只會(huì)主動(dòng)廣播這3中類(lèi)型的數(shù)據(jù),剩余的數(shù)據(jù)同步互例,都是通過(guò)請(qǐng)求-響應(yīng)的方式奢入。// broadcast is a write loop that multiplexes block propagations, announcements // and transaction broadcasts into the remote peer. The goal is to have an async // writer that does not lock up node internals. func (p *peer) broadcast() { for { select { // 廣播交易 case txs := <-p.queuedTxs: if err := p.SendTransactions(txs); err != nil { return } p.Log().Trace("Broadcast transactions", "count", len(txs)) // 廣播完整的新區(qū)塊 case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) // 廣播區(qū)塊Hash case block := <-p.queuedAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) case <-p.term: return } } }
Peer節(jié)點(diǎn)處理新區(qū)塊
本節(jié)介紹遠(yuǎn)端節(jié)點(diǎn)收到2種區(qū)塊同步消息的處理,其中NewBlockMsg
的處理流程比較清晰媳叨,也簡(jiǎn)潔俊马。NewBlockHashesMsg
消息的處理就繞了2繞丁存,從總體流程圖1上能看出來(lái),它需要先從給他發(fā)送消息Peer那里獲取到完整的區(qū)塊柴我,剩下的流程和NewBlockMsg
又一致了解寝。
這部分涉及的模塊多,畫(huà)出來(lái)有種眼花繚亂的感覺(jué)艘儒,但只要抓住上面的主線聋伦,代碼看起來(lái)還是很清晰的。通過(guò)圖5先看下整體流程界睁。
消息處理的起點(diǎn)是ProtocolManager.handleMsg
觉增,NewBlockMsg
的處理流程是藍(lán)色標(biāo)記的區(qū)域,紅色區(qū)域是單獨(dú)的協(xié)程翻斟,是fetcher處理隊(duì)列中區(qū)塊的流程逾礁,如果從隊(duì)列中取出的區(qū)塊是當(dāng)前鏈需要的,校驗(yàn)后访惜,調(diào)用blockchian.InsertChain()
把區(qū)塊插入到區(qū)塊鏈嘹履,最后寫(xiě)入數(shù)據(jù)庫(kù),這是黃色部分债热。最后砾嫉,綠色部分是NewBlockHashesMsg
的處理流程,代碼流程上是比較復(fù)雜的窒篱,為了能通過(guò)圖描述整體流程焕刮,我把它簡(jiǎn)化掉了。
仔細(xì)看看這幅圖墙杯,掌握整體的流程后配并,接下來(lái)看每個(gè)步驟的細(xì)節(jié)。
NewBlockMsg的處理
本節(jié)介紹節(jié)點(diǎn)收到完整區(qū)塊的處理高镐,流程如下:
- 首先進(jìn)行RLP編解碼溉旋,然后標(biāo)記發(fā)送消息的Peer已經(jīng)知道這個(gè)區(qū)塊,這樣本節(jié)點(diǎn)最后廣播這個(gè)區(qū)塊的Hash時(shí)避消,不會(huì)再發(fā)送給該P(yáng)eer低滩。
- 將區(qū)塊存入到fetcher的隊(duì)列召夹,
調(diào)用fetcher.Enqueue
岩喷。 - 更新Peer的Head位置,然后判斷本地鏈?zhǔn)欠衤浜笥赑eer的鏈监憎,如果是纱意,則通過(guò)Peer更新本地鏈。
只看handle.Msg()
的NewBlockMsg
相關(guān)的部分鲸阔。
case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
// 收到新區(qū)塊偷霉,解碼迄委,賦值接收數(shù)據(jù)
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt
request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import
// 標(biāo)記peer知道這個(gè)區(qū)塊
p.MarkBlock(request.Block.Hash())
// 為啥要如隊(duì)列?已經(jīng)得到完整的區(qū)塊了
// 答:存入fetcher的優(yōu)先級(jí)隊(duì)列类少,fetcher會(huì)從隊(duì)列中選取當(dāng)前高度需要的塊
pm.fetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
// 截止到parent區(qū)塊的頭和難度
var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
// Update the peers total difficulty if better than the previous
// 如果收到的塊的難度大于peer之前的叙身,以及自己本地的,就去和這個(gè)peer同步
// 問(wèn)題:就只用了一下塊里的hash指硫狞,為啥不直接使用這個(gè)塊呢信轿,如果這個(gè)塊不能用,干嘛不少發(fā)送些數(shù)據(jù)残吩,減少網(wǎng)絡(luò)負(fù)載呢财忽。
// 答案:實(shí)際上,這個(gè)塊加入到了優(yōu)先級(jí)隊(duì)列中泣侮,當(dāng)fetcher的loop檢查到當(dāng)前下一個(gè)區(qū)塊的高度即彪,正是隊(duì)列中有的,則不再向peer請(qǐng)求
// 該區(qū)塊活尊,而是直接使用該區(qū)塊隶校,檢查無(wú)誤后交給block chain執(zhí)行insertChain
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a singe block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}
//------------------------ 以上 handleMsg
// Enqueue tries to fill gaps the the fetcher's future import queue.
// 發(fā)給inject通道,當(dāng)前協(xié)程在handleMsg酬凳,通過(guò)通道發(fā)送給fetcher的協(xié)程處理
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
op := &inject{
origin: peer,
block: block,
}
select {
case f.inject <- op:
return nil
case <-f.quit:
return errTerminated
}
}
//------------------------ 以下 fetcher.loop處理inject部分
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
// 區(qū)塊加入隊(duì)列惠况,首先也填入未決的間距
propBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
//------------------------ 如隊(duì)列函數(shù)
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
// 把導(dǎo)入的新區(qū)塊放進(jìn)來(lái)
func (f *Fetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()
// Ensure the peer isn't DOSing us
// 防止peer的DOS攻擊
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
propBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
// 高度檢查:未來(lái)太遠(yuǎn)的塊丟棄
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
propBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
// 塊先加入優(yōu)先級(jí)隊(duì)列,加入鏈之前宁仔,還有很多要做
if _, ok := f.queued[hash]; !ok {
op := &inject{
origin: peer,
block: block,
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
}
}
fetcher隊(duì)列處理
本節(jié)我們看看稠屠,區(qū)塊加入隊(duì)列后,fetcher如何處理區(qū)塊翎苫,為何不直接校驗(yàn)區(qū)塊权埠,插入到本地鏈?
由于以太坊又Uncle的機(jī)制煎谍,節(jié)點(diǎn)可能收到老一點(diǎn)的一些區(qū)塊攘蔽。另外,節(jié)點(diǎn)可能由于網(wǎng)絡(luò)原因呐粘,落后了幾個(gè)區(qū)塊满俗,所以可能收到“未來(lái)”的一些區(qū)塊,這些區(qū)塊都不能直接插入到本地鏈作岖。
區(qū)塊入的隊(duì)列是一個(gè)優(yōu)先級(jí)隊(duì)列唆垃,高度低的區(qū)塊會(huì)被優(yōu)先取出來(lái)。fetcher.loop
是單獨(dú)協(xié)程痘儡,不斷運(yùn)轉(zhuǎn)辕万,清理fecther中的事務(wù)和事件。首先會(huì)清理正在fetching
的區(qū)塊,但已經(jīng)超時(shí)渐尿。然后處理優(yōu)先級(jí)隊(duì)列中的區(qū)塊醉途,判斷高度是否是下一個(gè)區(qū)塊,如果是則調(diào)用f.insert()
函數(shù)砖茸,校驗(yàn)后調(diào)用BlockChain.InsertChain()
隘擎,成功插入后,廣播新區(qū)塊的Hash凉夯。
// Loop is the main fetcher loop, checking and processing various notification
// events.
func (f *Fetcher) loop() {
// Iterate the block fetching until a quit is requested
fetchTimer := time.NewTimer(0)
completeTimer := time.NewTimer(0)
for {
// Clean up any expired block fetches
// 清理過(guò)期的區(qū)塊
for hash, announce := range f.fetching {
if time.Since(announce.time) > fetchTimeout {
f.forgetHash(hash)
}
}
// Import any queued blocks that could potentially fit
// 導(dǎo)入隊(duì)列中合適的塊
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*inject)
hash := op.block.Hash()
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false)
}
// If too high up the chain or phase, continue later
// 塊不是鏈需要的下一個(gè)塊嵌屎,再入優(yōu)先級(jí)隊(duì)列,停止循環(huán)
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(number))
if f.queueChangeHook != nil {
f.queueChangeHook(hash, true)
}
break
}
// Otherwise if fresh and still unknown, try and import
// 高度正好是我們想要的恍涂,并且鏈上也沒(méi)有這個(gè)塊
if number+maxUncleDist < height || f.getBlock(hash) != nil {
f.forgetBlock(hash)
continue
}
// 那么宝惰,塊插入鏈
f.insert(op.origin, op.block)
}
//省略
}
}
func (f *Fetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
return
}
// Quickly validate the header and propagate the block if it passes
// 驗(yàn)證區(qū)塊頭,成功后廣播區(qū)塊
switch err := f.verifyHeader(block.Header()); err {
case nil:
// All ok, quickly propagate to our peers
propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock:
// Weird future block, don't fail, but neither propagate
default:
// Something went very wrong, drop the peer
log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
// 調(diào)用回調(diào)函數(shù)再沧,實(shí)際是blockChain.insertChain
if _, err := f.insertChain(types.Blocks{block}); err != nil {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}
// If import succeeded, broadcast the block
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(block)
}
}()
}
NewBlockHashesMsg的處理
本節(jié)介紹NewBlockHashesMsg的處理尼夺,其實(shí),消息處理是簡(jiǎn)單的炒瘸,而復(fù)雜一點(diǎn)的是從Peer哪獲取完整的區(qū)塊淤堵,下節(jié)再看。
流程如下:
- 對(duì)消息進(jìn)行RLP解碼顷扩,然后標(biāo)記Peer已經(jīng)知道此區(qū)塊拐邪。
- 尋找出本地區(qū)塊鏈不存在的區(qū)塊Hash值,把這些未知的Hash通知給fetcher隘截。
-
fetcher.Notify
記錄好通知信息扎阶,塞入notify
通道,以便交給fetcher的協(xié)程婶芭。 -
fetcher.loop()
會(huì)對(duì)notify
中的消息進(jìn)行處理东臀,確認(rèn)區(qū)塊并非DOS攻擊,然后檢查區(qū)塊的高度犀农,判斷該區(qū)塊是否已經(jīng)在fetching
或者comleting(代表已經(jīng)下載區(qū)塊頭惰赋,在下載body)
,如果都沒(méi)有呵哨,則加入到announced
中赁濒,觸發(fā)0s定時(shí)器,進(jìn)行處理孟害。
關(guān)于announced
下節(jié)再介紹拒炎。
// handleMsg()部分
case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
// Mark the hashes as present at the remote node
for _, block := range announces {
p.MarkBlock(block.Hash)
}
// Schedule all the unknown hashes for retrieval
// 把本地鏈沒(méi)有的塊hash找出來(lái),交給fetcher去下載
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
// 通知fetcher(自己)有新塊產(chǎn)生纹坐,沒(méi)有塊實(shí)體枝冀,有hash、高度等信息
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{
hash: hash,
number: number,
time: time,
origin: peer,
fetchHeader: headerFetcher,
fetchBodies: bodyFetcher,
}
select {
case f.notify <- block:
return nil
case <-f.quit:
return errTerminated
}
}
// fetcher.loop()的notify通道消息處理
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
propAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
propAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
// 高度檢查
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
propAnnounceDropMeter.Mark(1)
break
}
}
// All is well, schedule the announce if block's not yet downloading
// 檢查是否已經(jīng)在下載耘子,已下載則忽略
if _, ok := f.fetching[notification.hash]; ok {
break
}
if _, ok := f.completing[notification.hash]; ok {
break
}
// 更新peer已經(jīng)通知給我們的區(qū)塊數(shù)量
f.announces[notification.origin] = count
// 把通知信息加入到announced果漾,供調(diào)度
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
f.announceChangeHook(notification.hash, true)
}
if len(f.announced) == 1 {
// 有通知放入到announced,則重設(shè)0s定時(shí)器谷誓,loop的另外一個(gè)分支會(huì)處理這些通知
f.rescheduleFetch(fetchTimer)
}
fetcher獲取完整區(qū)塊
本節(jié)介紹fetcher獲取完整區(qū)塊的過(guò)程绒障,這也是fetcher最重要的功能,會(huì)涉及到fetcher至少80%的代碼捍歪。單獨(dú)拉放一大節(jié)吧户辱。
Fetcher的大頭
Fetcher最主要的功能就是獲取完整的區(qū)塊,然后在合適的實(shí)際交給InsertChain去驗(yàn)證和插入到本地區(qū)塊鏈糙臼。我們還是從宏觀入手庐镐,看Fetcher是如何工作的,一定要先掌握好宏觀变逃,因?yàn)榇a層面上沒(méi)有這么清晰必逆。
宏觀
首先,看兩個(gè)節(jié)點(diǎn)是如何交互揽乱,獲取完整區(qū)塊名眉,使用時(shí)序圖的方式看一下,見(jiàn)圖6凰棉,流程很清晰不再文字介紹损拢。
再看下獲取區(qū)塊過(guò)程中,fetcher內(nèi)部的狀態(tài)轉(zhuǎn)移撒犀,它使用狀態(tài)來(lái)記錄福压,要獲取的區(qū)塊在什么階段,見(jiàn)圖7或舞。我稍微解釋一下:
- 收到
NewBlockHashesMsg
后隧膏,相關(guān)信息會(huì)記錄到announced
,進(jìn)入announced
狀態(tài)嚷那,代表了本節(jié)點(diǎn)接收了消息胞枕。 -
announced
由fetcher協(xié)程處理,經(jīng)過(guò)校驗(yàn)后魏宽,會(huì)向給他發(fā)送消息的Peer發(fā)送請(qǐng)求腐泻,請(qǐng)求該區(qū)塊的區(qū)塊頭,然后進(jìn)入fetching
狀態(tài)队询。 - 獲取區(qū)塊頭后派桩,如果區(qū)塊頭表示沒(méi)有交易和uncle,則轉(zhuǎn)移到
completing
狀態(tài)蚌斩,并且使用區(qū)塊頭合成完整的區(qū)塊铆惑,加入到queued
優(yōu)先級(jí)隊(duì)列。 - 獲取區(qū)塊頭后,如果區(qū)塊頭表示該區(qū)塊有交易和uncle员魏,則轉(zhuǎn)移到
fetched
狀態(tài)丑蛤,然后發(fā)送請(qǐng)求,請(qǐng)求交易和uncle撕阎,然后轉(zhuǎn)移到completing
狀態(tài)受裹。 - 收到交易和uncle后,使用頭虏束、交易棉饶、uncle這3個(gè)信息,生成完整的區(qū)塊镇匀,加入到隊(duì)列
queued
照藻。
微觀
接下來(lái)就是從代碼角度看如何獲取完整區(qū)塊的流程了,有點(diǎn)多汗侵,看不懂的時(shí)候岩梳,再回顧下上面宏觀的介紹圖。
首先看Fetcher的定義晃择,它存放了通信數(shù)據(jù)和狀態(tài)管理冀值,撿加注釋的看,上文提到的狀態(tài)宫屠,里面都有列疗。
// Fetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
// 積累塊通知,然后調(diào)度獲取這些塊
type Fetcher struct {
// Various event channels
// 收到區(qū)塊hash值的通道
notify chan *announce
// 收到完整區(qū)塊的通道
inject chan *inject
blockFilter chan chan []*types.Block
// 過(guò)濾header的通道的通道
headerFilter chan chan *headerFilterTask
// 過(guò)濾body的通道的通道
bodyFilter chan chan *bodyFilterTask
done chan common.Hash
quit chan struct{}
// Announce states
// Peer已經(jīng)給了本節(jié)點(diǎn)多少區(qū)塊頭通知
announces map[string]int // Per peer announce counts to prevent memory exhaustion
// 已經(jīng)announced的區(qū)塊列表
announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
// 正在fetching區(qū)塊頭的請(qǐng)求
fetching map[common.Hash]*announce // Announced blocks, currently fetching
// 已經(jīng)fetch到區(qū)塊頭浪蹂,還差body的請(qǐng)求抵栈,用來(lái)獲取body
fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
// 已經(jīng)得到區(qū)塊頭的
completing map[common.Hash]*announce // Blocks with headers, currently body-completing
// Block cache
// queue,優(yōu)先級(jí)隊(duì)列坤次,高度做優(yōu)先級(jí)
// queues古劲,統(tǒng)計(jì)peer通告了多少塊
// queued,代表這個(gè)塊如隊(duì)列了缰猴,
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
// Callbacks
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work产艾,驗(yàn)證區(qū)塊頭,包含了PoW驗(yàn)證
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers滑绒,廣播給peer
chainHeight chainHeightFn // Retrieves the current chain's height
insertChain chainInsertFn // Injects a batch of blocks into the chain闷堡,插入?yún)^(qū)塊到鏈的函數(shù)
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
}
NewBlockHashesMsg
消息的處理前面的小節(jié)已經(jīng)講過(guò)了,不記得可向前翻看疑故。這里從announced
的狀態(tài)處理說(shuō)起杠览。loop()
中,fetchTimer
超時(shí)后纵势,代表了收到了消息通知踱阿,需要處理管钳,會(huì)從announced
中選擇出需要處理的通知,然后創(chuàng)建請(qǐng)求软舌,請(qǐng)求區(qū)塊頭才漆,由于可能有很多節(jié)點(diǎn)都通知了它某個(gè)區(qū)塊的Hash,所以隨機(jī)的從這些發(fā)送消息的Peer中選擇一個(gè)Peer葫隙,發(fā)送請(qǐng)求的時(shí)候,為每個(gè)Peer都創(chuàng)建了單獨(dú)的協(xié)程躏仇。
case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
// 有區(qū)塊通知恋脚,去處理
request := make(map[string][]common.Hash)
for hash, announces := range f.announced {
if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
// Pick a random peer to retrieve from, reset all others
// 可能有很多peer都發(fā)送了這個(gè)區(qū)塊的hash值,隨機(jī)選擇一個(gè)peer
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching
// 本地還沒(méi)有這個(gè)區(qū)塊焰手,創(chuàng)建獲取區(qū)塊的請(qǐng)求
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
}
}
// Send out all block header requests
// 把所有的request發(fā)送出去
// 為每一個(gè)peer都創(chuàng)建一個(gè)協(xié)程糟描,然后請(qǐng)求所有需要從該peer獲取的請(qǐng)求
for peer, hashes := range request {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
}
for _, hash := range hashes {
headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
}
}()
}
// Schedule the next fetch if blocks are still pending
f.rescheduleFetch(fetchTimer)
從Notify
的調(diào)用中,可以看出书妻,fetcherHeader()
的實(shí)際函數(shù)是RequestOneHeader()
船响,該函數(shù)使用的消息是GetBlockHeadersMsg
澡谭,可以用來(lái)請(qǐng)求多個(gè)區(qū)塊頭崭歧,不過(guò)fetcher只請(qǐng)求一個(gè)缚态。
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}
GetBlockHeadersMsg
的處理如下:因?yàn)樗谦@取多個(gè)區(qū)塊頭的弯院,所以處理起來(lái)比較“麻煩”缸兔,還好子刮,fetcher只獲取一個(gè)區(qū)塊頭肛著,其處理在20行~33行游两,獲取下一個(gè)區(qū)塊頭的處理邏輯篷帅,這里就不看了史侣,最后調(diào)用SendBlockHeaders()
將區(qū)塊頭發(fā)送給請(qǐng)求的節(jié)點(diǎn),消息是BlockHeadersMsg
魏身。
// handleMsg()
// Block header query, collect the requested headers and reply
case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached
// 收集區(qū)塊頭惊橱,直到達(dá)到限制
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
// 自己已知區(qū)塊 && 少于查詢的數(shù)量 && 大小小于2MB && 小于能下載的最大數(shù)量
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
// Retrieve the next header satisfying the query
// 獲取區(qū)塊頭
var origin *types.Header
if hashMode {
// fetcher 使用的模式
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
number := origin.Number.Uint64()
headers = append(headers, origin)
bytes += estHeaderRlpSize
// Advance to the next header of the query
// 下一個(gè)區(qū)塊頭的獲取,不同策略箭昵,方式不同
switch {
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
// ...
}
}
return p.SendBlockHeaders(headers)
BlockHeadersMsg
的處理很有意思税朴,因?yàn)?code>GetBlockHeadersMsg并不是fetcher獨(dú)占的消息,downloader也可以調(diào)用家制,所以掉房,響應(yīng)消息的處理需要分辨出是fetcher請(qǐng)求的,還是downloader請(qǐng)求的慰丛。它的處理邏輯是:fetcher先過(guò)濾收到的區(qū)塊頭卓囚,如果fetcher不要的,那就是downloader的诅病,在調(diào)用fetcher.FilterHeaders
的時(shí)候哪亿,fetcher就將自己要的區(qū)塊頭拿走了粥烁。
// handleMsg()
case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// If no headers were received, but we're expending a DAO fork check, maybe it's that
// 檢查是不是當(dāng)前DAO的硬分叉
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
}
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
// 過(guò)濾是不是fetcher請(qǐng)求的區(qū)塊頭,去掉fetcher請(qǐng)求的區(qū)塊頭再交給downloader
filter := len(headers) == 1
if filter {
// If it's a potential DAO fork check, validate against the rules
// 檢查是否硬分叉
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer
p.forkDrop.Stop()
p.forkDrop = nil
// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
// 使用fetcher過(guò)濾區(qū)塊頭
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
// 剩下的區(qū)塊頭交給downloader
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
FilterHeaders()
是一個(gè)很有大智慧的函數(shù)蝇棉,看起來(lái)耐人尋味讨阻,但實(shí)在妙。它要把所有的區(qū)塊頭篡殷,都傳遞給fetcher協(xié)程钝吮,還要獲取fetcher協(xié)程處理后的結(jié)果。fetcher.headerFilter
是存放通道的通道板辽,而filter
是存放包含區(qū)塊頭過(guò)濾任務(wù)的通道奇瘦。它先把filter
傳遞給了headerFilter
,這樣fetcher
協(xié)程就在另外一段等待了劲弦,而后將headerFilterTask
傳入filter
耳标,fetcher就能讀到數(shù)據(jù)了,處理后邑跪,再將數(shù)據(jù)寫(xiě)回filter
而剛好被FilterHeaders
函數(shù)處理了次坡,該函數(shù)實(shí)際運(yùn)行在handleMsg()
的協(xié)程中。
每個(gè)Peer都會(huì)分配一個(gè)ProtocolManager然后處理該P(yáng)eer的消息画畅,但fetcher
只有一個(gè)事件處理協(xié)程砸琅,如果不創(chuàng)建一個(gè)filter
,fetcher哪知道是誰(shuí)發(fā)給它的區(qū)塊頭呢轴踱?過(guò)濾之后明棍,該如何發(fā)回去呢?
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
// 尋找出fetcher請(qǐng)求的區(qū)塊頭
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
// 任務(wù)通道
filter := make(chan *headerFilterTask)
select {
// 任務(wù)通道發(fā)送到這個(gè)通道
case f.headerFilter <- filter:
case <-f.quit:
return nil
}
// Request the filtering of the header list
// 創(chuàng)建過(guò)濾任務(wù)寇僧,發(fā)送到任務(wù)通道
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case <-f.quit:
return nil
}
// Retrieve the headers remaining after filtering
// 從任務(wù)通道摊腋,獲取過(guò)濾的結(jié)果并返回
select {
case task := <-filter:
return task.headers
case <-f.quit:
return nil
}
}
接下來(lái)要看f.headerFilter
的處理,這段代碼有90行嘁傀,它做了一下幾件事:
- 從
f.headerFilter
取出filter
兴蒸,然后取出過(guò)濾任務(wù)task
。 - 它把區(qū)塊頭分成3類(lèi):
unknown
這不是分是要返回給調(diào)用者的细办,即handleMsg()
,incomplete
存放還需要獲取body的區(qū)塊頭橙凳,complete
存放只包含區(qū)塊頭的區(qū)塊。遍歷所有的區(qū)塊頭笑撞,填到到對(duì)應(yīng)的分類(lèi)中岛啸,具體的判斷可看18行的注釋,記住宏觀中將的狀態(tài)轉(zhuǎn)移圖茴肥。 - 把
unknonw
中的區(qū)塊返回給handleMsg()
坚踩。 - 把
incomplete
的區(qū)塊頭獲取狀態(tài)移動(dòng)到fetched
狀態(tài),然后觸發(fā)定時(shí)器瓤狐,以便去處理complete
的區(qū)塊瞬铸。 - 把
compelete
的區(qū)塊加入到queued
批幌。
// fetcher.loop()
case filter := <-f.headerFilter:
// Headers arrived from a remote peer. Extract those that were explicitly
// requested by the fetcher, and return everything else so it's delivered
// to other parts of the system.
// 收到從遠(yuǎn)端節(jié)點(diǎn)發(fā)送的區(qū)塊頭,過(guò)濾出fetcher請(qǐng)求的
// 從任務(wù)通道獲取過(guò)濾任務(wù)
var task *headerFilterTask
select {
case task = <-filter:
case <-f.quit:
return
}
headerFilterInMeter.Mark(int64(len(task.headers)))
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
// unknown的不是fetcher請(qǐng)求的嗓节,complete放沒(méi)有交易和uncle的區(qū)塊荧缘,有頭就夠了,incomplete放
// 還需要獲取uncle和交易的區(qū)塊
unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
// 遍歷所有收到的header
for _, header := range task.headers {
hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms
// 是正在獲取的hash拦宣,并且對(duì)應(yīng)請(qǐng)求的peer截粗,并且未fetched,未completing鸵隧,未queued
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
// 高度校驗(yàn)绸罗,竟然不匹配,擾亂秩序掰派,peer肯定是壞蛋从诲。
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
f.dropPeer(announce.origin)
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
// 本地鏈沒(méi)有當(dāng)前區(qū)塊
if f.getBlock(hash) == nil {
announce.header = header
announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue
// 如果區(qū)塊沒(méi)有交易和uncle左痢,加入到complete
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
complete = append(complete, block)
f.completing[hash] = announce
continue
}
// Otherwise add to the list of blocks needing completion
// 否則就是不完整的區(qū)塊
incomplete = append(incomplete, announce)
} else {
log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
f.forgetHash(hash)
}
} else {
// Fetcher doesn't know about it, add to the return list
// 沒(méi)請(qǐng)求過(guò)的header
unknown = append(unknown, header)
}
}
// 把未知的區(qū)塊頭靡羡,再傳遞會(huì)filter
headerFilterOutMeter.Mark(int64(len(unknown)))
select {
case filter <- &headerFilterTask{headers: unknown, time: task.time}:
case <-f.quit:
return
}
// Schedule the retrieved headers for body completion
// 把未完整的區(qū)塊加入到fetched,跳過(guò)已經(jīng)在completeing中的俊性,然后觸發(fā)completeTimer定時(shí)器
for _, announce := range incomplete {
hash := announce.header.Hash()
if _, ok := f.completing[hash]; ok {
continue
}
f.fetched[hash] = append(f.fetched[hash], announce)
if len(f.fetched) == 1 {
f.rescheduleComplete(completeTimer)
}
}
// Schedule the header-only blocks for import
// 把只有頭的區(qū)塊入隊(duì)列
for _, block := range complete {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
跟隨狀態(tài)圖的轉(zhuǎn)義略步,剩下的工作是fetched
轉(zhuǎn)移到completing
,上面的流程已經(jīng)觸發(fā)了completeTimer
定時(shí)器定页,超時(shí)后就會(huì)處理趟薄,流程與請(qǐng)求Header類(lèi)似,不再贅述典徊,此時(shí)發(fā)送的請(qǐng)求消息是GetBlockBodiesMsg
杭煎,實(shí)際調(diào)的函數(shù)是RequestBodies
。
// fetcher.loop()
case <-completeTimer.C:
// At least one header's timer ran out, retrieve everything
// 至少有1個(gè)header已經(jīng)獲取完了
request := make(map[string][]common.Hash)
// 遍歷所有待獲取body的announce
for hash, announces := range f.fetched {
// Pick a random peer to retrieve from, reset all others
// 隨機(jī)選一個(gè)Peer發(fā)送請(qǐng)求卒落,因?yàn)榭赡芤呀?jīng)有很多Peer通知它這個(gè)區(qū)塊了
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)
// If the block still didn't arrive, queue for completion
// 如果本地沒(méi)有這個(gè)區(qū)塊羡铲,則放入到completing,創(chuàng)建請(qǐng)求
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.completing[hash] = announce
}
}
// Send out all block body requests
// 發(fā)送所有的請(qǐng)求儡毕,獲取body也切,依然是每個(gè)peer一個(gè)單獨(dú)協(xié)程
for peer, hashes := range request {
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread
if f.completingHook != nil {
f.completingHook(hashes)
}
bodyFetchMeter.Mark(int64(len(hashes)))
go f.completing[hashes[0]].fetchBodies(hashes)
}
// Schedule the next fetch if blocks are still pending
f.rescheduleComplete(completeTimer)
handleMsg()
處理該消息也是干凈利落,直接獲取RLP格式的body腰湾,然后發(fā)送響應(yīng)消息雷恃。
// handleMsg()
case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
bytes int
bodies []rlp.RawValue
)
// 遍歷所有請(qǐng)求
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block body, stopping if enough was found
// 獲取body,RLP格式
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
}
}
return p.SendBlockBodiesRLP(bodies)
響應(yīng)消息BlockBodiesMsg
的處理與處理獲取header的處理原理相同费坊,先交給fetcher過(guò)濾倒槐,然后剩下的才是downloader的。需要注意一點(diǎn)附井,響應(yīng)消息里只包含交易列表和叔塊列表导犹。
// handleMsg()
case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver them all to the downloader for queuing
// 傳遞給downloader去處理
transactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))
for i, body := range request {
transactions[i] = body.Transactions
uncles[i] = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader
// 先讓fetcher過(guò)濾去fetcher請(qǐng)求的body唱凯,剩下的給downloader
filter := len(transactions) > 0 || len(uncles) > 0
if filter {
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
// 剩下的body交給downloader
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err)
}
}
過(guò)濾函數(shù)的原理也與Header相同。
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
// 過(guò)去出fetcher請(qǐng)求的body谎痢,返回它沒(méi)有處理的磕昼,過(guò)程類(lèi)型header的處理
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher
filter := make(chan *bodyFilterTask)
select {
case f.bodyFilter <- filter:
case <-f.quit:
return nil, nil
}
// Request the filtering of the body list
select {
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case <-f.quit:
return nil, nil
}
// Retrieve the bodies remaining after filtering
select {
case task := <-filter:
return task.transactions, task.uncles
case <-f.quit:
return nil, nil
}
}
實(shí)際過(guò)濾body的處理瞧一下,這和Header的處理是不同的节猿。直接看不點(diǎn):
- 它要的區(qū)塊票从,單獨(dú)取出來(lái)存到
blocks
中,它不要的繼續(xù)留在task
中滨嘱。 - 判斷是不是fetcher請(qǐng)求的方法:如果交易列表和叔塊列表計(jì)算出的hash值與區(qū)塊頭中的一樣峰鄙,并且消息來(lái)自請(qǐng)求的Peer,則就是fetcher請(qǐng)求的太雨。
- 將
blocks
中的區(qū)塊加入到queued
吟榴,終結(jié)。
case filter := <-f.bodyFilter:
// Block bodies arrived, extract any explicitly requested blocks, return the rest
var task *bodyFilterTask
select {
case task = <-filter:
case <-f.quit:
return
}
bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{}
// 獲取的每個(gè)body的txs列表和uncle列表
// 遍歷每個(gè)區(qū)塊的txs列表和uncle列表囊扳,計(jì)算hash后判斷是否是當(dāng)前fetcher請(qǐng)求的body
for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
// Match up a body to any possible completion request
matched := false
// 遍歷所有保存的請(qǐng)求吩翻,因?yàn)閠x和uncle,不知道它是屬于哪個(gè)區(qū)塊的锥咸,只能去遍歷所有的請(qǐng)求狭瞎,通常量不大,所以遍歷沒(méi)有性能影響
for hash, announce := range f.completing {
if f.queued[hash] == nil {
// 把傳入的每個(gè)塊的hash和unclehash和它請(qǐng)求出去的記錄進(jìn)行對(duì)比搏予,匹配則說(shuō)明是fetcher請(qǐng)求的區(qū)塊body
txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
uncleHash := types.CalcUncleHash(task.uncles[i])
if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
// Mark the body matched, reassemble if still unknown
matched = true
// 如果當(dāng)前鏈還沒(méi)有這個(gè)區(qū)塊熊锭,則收集這個(gè)區(qū)塊,合并成新區(qū)塊
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
}
}
}
}
// 從task中移除fetcher請(qǐng)求的數(shù)據(jù)
if matched {
task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
i--
continue
}
}
// 將剩余的數(shù)據(jù)返回
bodyFilterOutMeter.Mark(int64(len(task.transactions)))
select {
case filter <- task:
case <-f.quit:
return
}
// Schedule the retrieved blocks for ordered import
// 把收集的區(qū)塊加入到隊(duì)列
for _, block := range blocks {
if announce := f.completing[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
}
至此雪侥,fetcher獲取完整區(qū)塊的流程講完了碗殷,fetcher模塊中80%的代碼也都貼出來(lái)了,還有2個(gè)值得看看的函數(shù):
-
forgetHash(hash common.Hash)
:用于清空指定hash指的記/狀態(tài)錄信息速缨。 -
forgetBlock(hash common.Hash)
:用于從隊(duì)列中移除一個(gè)區(qū)塊锌妻。
最后了,再回到開(kāi)始看看fetcher模塊和新區(qū)塊的傳播流程鸟廓,有沒(méi)有豁然開(kāi)朗从祝。