以太坊源碼深入分析(6)-- 以太坊P2P協(xié)議接收廣播的處理和Fetcher源碼分析



type Protocol struct {
    Name string
    Version uint
    Length uint64

    Run func(peer *Peer, rw MsgReadWriter) error

    NodeInfo func() interface{}

    PeerInfo func(id discover.NodeID) interface{}


 manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
            Name:    ProtocolName,
            Version: version,
            Length:  ProtocolLengths[i],
            Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                peer := manager.newPeer(int(version), p, rw)
                select {
                case manager.newPeerCh <- peer:
                    defer manager.wg.Done()
                    return manager.handle(peer)
                case <-manager.quitSync:
                    return p2p.DiscQuitting
            NodeInfo: func() interface{} {
                return manager.NodeInfo()
            PeerInfo: func(id discover.NodeID) interface{} {
                if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                    return p.Info()
                return nil

這三個(gè)函數(shù)指針實(shí)質(zhì)是注入p2p server的回調(diào)邦蜜,用于處理網(wǎng)絡(luò)中其他節(jié)點(diǎn)的廣播通知、獲取本以太坊Node 的info亥至、本地節(jié)點(diǎn)的info悼沈。

func (pm *ProtocolManager) handle(p *peer) error {
    if pm.peers.Len() >= pm.maxPeers {
        return p2p.DiscTooManyPeers
    p.Log().Debug("Ethereum peer connected", "name", p.Name())

    // Execute the Ethereum handshake
    var (
        genesis = pm.blockchain.Genesis()
        head    = pm.blockchain.CurrentHeader()
        hash    = head.Hash()
        number  = head.Number.Uint64()
        td      = pm.blockchain.GetTd(hash, number)
    if err := p.Handshake(pm.networkId, td, hash, genesis.Hash()); err != nil {
        p.Log().Debug("Ethereum handshake failed", "err", err)
        return err
    if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
    // Register the peer locally
    if err := pm.peers.Register(p); err != nil {
        p.Log().Error("Ethereum peer registration failed", "err", err)
        return err
    defer pm.removePeer(p.id)

    // Register the peer in the downloader. If the downloader considers it banned, we disconnect
    if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
        return err
    // Propagate existing transactions. new transactions appearing
    // after this will be sent via broadcasts.

    // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
    if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
        // Request the peer's DAO fork header for extra-data validation
        if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
            return err
        // Start a timer to disconnect if the peer doesn't reply in time
        p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
            p.Log().Debug("Timed out DAO fork-check, dropping")
        // Make sure it's cleaned up if the peer dies off
        defer func() {
            if p.forkDrop != nil {
                p.forkDrop = nil
    // main loop. handle incoming messages.
    for {
        if err := pm.handleMsg(p); err != nil {
            p.Log().Debug("Ethereum message handling failed", "err", err)
            return err


func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
    // Send out own handshake in a new thread
    errc := make(chan error, 2)
    var status statusData // safe to read after two values have been received from errc

    go func() {
        errc <- p2p.Send(p.rw, StatusMsg, &statusData{
            ProtocolVersion: uint32(p.version),
            NetworkId:       network,
            TD:              td,
            CurrentBlock:    head,
            GenesisBlock:    genesis,
    go func() {
        errc <- p.readStatus(network, &status, genesis)
    timeout := time.NewTimer(handshakeTimeout)
    defer timeout.Stop()
    for i := 0; i < 2; i++ {
        select {
        case err := <-errc:
            if err != nil {
                return err
        case <-timeout.C:
            return p2p.DiscReadTimeout
    p.td, p.head = status.TD, status.CurrentBlock
    return nil


驗(yàn)證Dao 硬分叉宿崭,如果超時(shí)則從緩存節(jié)點(diǎn)列表中刪除這個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)亲铡。

const (
    // Protocol messages belonging to eth/62
    StatusMsg          = 0x00
    NewBlockHashesMsg  = 0x01
    TxMsg              = 0x02
    GetBlockHeadersMsg = 0x03
    BlockHeadersMsg    = 0x04
    GetBlockBodiesMsg  = 0x05
    BlockBodiesMsg     = 0x06
    NewBlockMsg        = 0x07

    // Protocol messages belonging to eth/63
    GetNodeDataMsg = 0x0d
    NodeDataMsg    = 0x0e
    GetReceiptsMsg = 0x0f
    ReceiptsMsg    = 0x10


我們先看看pm.handleMsg 在收到 NewBlockHashesMsg廣播通知的處理代碼:

case msg.Code == NewBlockHashesMsg:
        var announces newBlockHashesData
        if err := msg.Decode(&announces); err != nil {
            return errResp(ErrDecode, "%v: %v", msg, err)
        // Mark the hashes as present at the remote node
        for _, block := range announces {
        // Schedule all the unknown hashes for retrieval
        unknown := make(newBlockHashesData, 0, len(announces))
        for _, block := range announces {
            if !pm.blockchain.HasBlock(block.Hash, block.Number) {
                unknown = append(unknown, block)
        for _, block := range unknown {
            pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)

然后每個(gè)newBlockHashesData調(diào)用pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)方法配椭,除了傳入block的hash值和block的number值,還需要傳入當(dāng)前的時(shí)間戳雹姊,peer.go的兩個(gè)函數(shù)指針股缸。

func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
    headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
    block := &announce{
        hash:        hash,
        number:      number,
        time:        time,
        origin:      peer,
        fetchHeader: headerFetcher,
        fetchBodies: bodyFetcher,
    select {
    case f.notify <- block:
        return nil
    case <-f.quit:
        return errTerminated

Notify()方法把傳進(jìn)來的參數(shù)拼成一個(gè)announce對象,然后send給f.notify吱雏。fetcher的loop()主回路里f.notify receive 到這個(gè)notification, 進(jìn)行處理敦姻。

case notification := <-f.notify:
            // A block was announced, make sure the peer isn't DOSing us

            count := f.announces[notification.origin] + 1
            if count > hashLimit {
                log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
            // If we have a valid block number, check that it's potentially useful
            if notification.number > 0 {
                if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
                    log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
            // All is well, schedule the announce if block's not yet downloading
            if _, ok := f.fetching[notification.hash]; ok {
            if _, ok := f.completing[notification.hash]; ok {
            f.announces[notification.origin] = count
            f.announced[notification.hash] = append(f.announced[notification.hash], notification)
            if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
                f.announceChangeHook(notification.hash, true)
            if len(f.announced) == 1 {

1,將收到的不滿足條件的通知都丟棄掉歧杏,如果在f.fetching 狀態(tài)列表里和f.completing 狀態(tài)列表里镰惦,也直接返回。接著更新notification.origin 這個(gè)節(jié)點(diǎn)的announces 數(shù)量犬绒,添加到f.announced 等待fetch的表里旺入。
2,如果len(f.announced[notification.hash]) == 1 說明f.announced只有這一個(gè)通知凯力,則調(diào)用f.announceChangeHook茵瘾。
3,如果len(f.announced) == 1 也說明只有一個(gè)通知咐鹤,則啟動(dòng)fetchTimer的調(diào)度拗秘。

case <-fetchTimer.C:
            // At least one block's timer ran out, check for needing retrieval
            request := make(map[string][]common.Hash)

            for hash, announces := range f.announced {
                if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
                    // Pick a random peer to retrieve from, reset all others
                    announce := announces[rand.Intn(len(announces))]

                    // If the block still didn't arrive, queue for fetching
                    if f.getBlock(hash) == nil {
                        request[announce.origin] = append(request[announce.origin], hash)
                        f.fetching[hash] = announce
            // Send out all block header requests
            for peer, hashes := range request {
                log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

                // Create a closure of the fetch and schedule in on a new thread
                fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
                go func() {
                    if f.fetchingHook != nil {
                    for _, hash := range hashes {
                        fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
            // Schedule the next fetch if blocks are still pending

3版仔, 這時(shí)候NewBlockHashesMsg 的fetcher處理就結(jié)束了,最后再啟動(dòng)fetchTimer的調(diào)度误墓。

三蛮粮,F(xiàn)etcher分析, 之FilterHeaders()
fetchHeader(hash)方法谜慌,調(diào)用了peer.go 里面的全局方法RequestOneHeader(hash common.Hash) Send給網(wǎng)絡(luò)節(jié)點(diǎn)一個(gè)GetBlockHeadersMsg 消息然想。
然后pm.handleMsg 收到 BlockHashesMsg廣播通知

case msg.Code == BlockHeadersMsg:
        // A batch of headers arrived to one of our previous requests
        var headers []*types.Header
        if err := msg.Decode(&headers); err != nil {
            return errResp(ErrDecode, "msg %v: %v", msg, err)
        // If no headers were received, but we're expending a DAO fork check, maybe it's that
        if len(headers) == 0 && p.forkDrop != nil {
            // Possibly an empty reply to the fork header checks, sanity check TDs
            verifyDAO := true

            // If we already have a DAO header, we can check the peer's TD against it. If
            // the peer's ahead of this, it too must have a reply to the DAO check
            if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
                    verifyDAO = false
            // If we're seemingly on the same chain, disable the drop timer
            if verifyDAO {
                p.Log().Debug("Seems to be on the same side of the DAO fork")
                p.forkDrop = nil
                return nil
        // Filter out any explicitly requested headers, deliver the rest to the downloader
        filter := len(headers) == 1
        if filter {
            // If it's a potential DAO fork check, validate against the rules
            if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
                // Disable the fork drop timer
                p.forkDrop = nil

                // Validate the header and either drop the peer or continue
                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
                    p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
                    return err
                p.Log().Debug("Verified to be on the same side of the DAO fork")
                return nil
            // Irrelevant of the fork checks, send the header to the fetcher just in case
            headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
        if len(headers) > 0 || !filter {
            err := pm.downloader.DeliverHeaders(p.id, headers)
            if err != nil {
                log.Debug("Failed to deliver headers", "err", err)

如果不是硬分叉的daoHeader,同時(shí)len(headers) == 1欣范,則執(zhí)行pm.fetcher.FilterHeaders(p.id, headers, time.Now())方法

func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
    log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

    // Send the filter channel to the fetcher
    filter := make(chan *headerFilterTask)

    select {
    case f.headerFilter <- filter:
    case <-f.quit:
        return nil
    // Request the filtering of the header list
    select {
    case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
    case <-f.quit:
        return nil
    // Retrieve the headers remaining after filtering
    select {
    case task := <-filter:
        return task.headers
    case <-f.quit:
        return nil

send 一個(gè)filter 到f.headerFilter变泄,fetcher的loop()主回路里f.headerFilter receive 到這個(gè)filter,進(jìn)行處理恼琼。

case filter := <-f.headerFilter:
            // Headers arrived from a remote peer. Extract those that were explicitly
            // requested by the fetcher, and return everything else so it's delivered
            // to other parts of the system.
            var task *headerFilterTask
            select {
            case task = <-filter:
            case <-f.quit:

            // Split the batch of headers into unknown ones (to return to the caller),
            // known incomplete ones (requiring body retrievals) and completed blocks.
            unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
            for _, header := range task.headers {
                hash := header.Hash()

                // Filter fetcher-requested headers from other synchronisation algorithms
                if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
                    // If the delivered header does not match the promised number, drop the announcer
                    if header.Number.Uint64() != announce.number {
                        log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
                    // Only keep if not imported by other means
                    if f.getBlock(hash) == nil {
                        announce.header = header
                        announce.time = task.time

                        // If the block is empty (header only), short circuit into the final import queue
                        if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
                            log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())

                            block := types.NewBlockWithHeader(header)
                            block.ReceivedAt = task.time

                            complete = append(complete, block)
                            f.completing[hash] = announce
                        // Otherwise add to the list of blocks needing completion
                        incomplete = append(incomplete, announce)
                    } else {
                        log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
                } else {
                    // Fetcher doesn't know about it, add to the return list
                    unknown = append(unknown, header)
            select {
            case filter <- &headerFilterTask{headers: unknown, time: task.time}:
            case <-f.quit:
            // Schedule the retrieved headers for body completion
            for _, announce := range incomplete {
                hash := announce.header.Hash()
                if _, ok := f.completing[hash]; ok {
                f.fetched[hash] = append(f.fetched[hash], announce)
                if len(f.fetched) == 1 {
            // Schedule the header-only blocks for import
            for _, block := range complete {
                if announce := f.completing[block.Hash()]; announce != nil {
                    f.enqueue(announce.origin, block)

1妨蛹,遍歷headerFilter里面的各個(gè)header,如果在 f.fetching狀態(tài)列表晴竞,且不在f.fetched狀態(tài)列表和 f.completing狀態(tài)列表蛙卤,就繼續(xù)進(jìn)行過濾,否則塞進(jìn)unknown隊(duì)列 發(fā)送給filter噩死,F(xiàn)ilterHeaders里面task 接收到filter颤难,并作為FilterHeaders的返回值返回。
5简僧,在complete列表里面,同時(shí)也在f.completing狀態(tài)列表雕欺,則調(diào)用f.enqueue(announce.origin, block)方法岛马。

case <-completeTimer.C:
            // At least one header's timer ran out, retrieve everything
            request := make(map[string][]common.Hash)

            for hash, announces := range f.fetched {
                // Pick a random peer to retrieve from, reset all others
                announce := announces[rand.Intn(len(announces))]

                // If the block still didn't arrive, queue for completion
                if f.getBlock(hash) == nil {
                    request[announce.origin] = append(request[announce.origin], hash)
                    f.completing[hash] = announce
            // Send out all block body requests
            for peer, hashes := range request {
                log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)

                // Create a closure of the fetch and schedule in on a new thread
                if f.completingHook != nil {
                go f.completing[hashes[0]].fetchBodies(hashes)
            // Schedule the next fetch if blocks are still pending

3溉贿, 這時(shí)候BlockHashesMsg 的fetcher處理就結(jié)束了,最后再啟動(dòng)completeTimer循環(huán)調(diào)度浦旱。

四宇色,F(xiàn)etcher分析, 之FilterBodies() 颁湖,Enqueue()宣蠕,
1,fetchBodies(hash)方法甥捺,調(diào)用了peer.go 里面的全局方法RequestBodies(hashes []common.Hash) Send給網(wǎng)絡(luò)節(jié)點(diǎn)一個(gè)GetBlockBodiesMsg 消息抢蚀。
2,然后pm.handleMsg 會收到 BlockBodiesMsg廣播通知镰禾。
3皿曲,執(zhí)行 pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())唱逢。
5奶陈,在FilterHeaders()和FilterBodies()最后都走到了f.enqueue(announce.origin, block)方法

func (f *Fetcher) enqueue(peer string, block *types.Block) {
    hash := block.Hash()

    // Ensure the peer isn't DOSing us
    count := f.queues[peer] + 1
    if count > blockLimit {
        log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
    // Discard any past or too distant blocks
    if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
        log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
    // Schedule the block for future importing
    if _, ok := f.queued[hash]; !ok {
        op := &inject{
            origin: peer,
            block:  block,
        f.queues[peer] = count
        f.queued[hash] = op
        f.queue.Push(op, -float32(block.NumberU64()))
        if f.queueChangeHook != nil {
            f.queueChangeHook(op.block.Hash(), true)
        log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())

在loop主回路里面遍歷f.queue列表吃粒,并把列表中的block insert到本地的block chain中。

func (f *Fetcher) insert(peer string, block *types.Block) {
    hash := block.Hash()

    // Run the import on a new thread
    log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
    go func() {
        defer func() { f.done <- hash }()

        // If the parent's unknown, abort insertion
        parent := f.getBlock(block.ParentHash())
        if parent == nil {
            log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
        // Quickly validate the header and propagate the block if it passes
        switch err := f.verifyHeader(block.Header()); err {
        case nil:
            // All ok, quickly propagate to our peers
            go f.broadcastBlock(block, true)

        case consensus.ErrFutureBlock:
            // Weird future block, don't fail, but neither propagate

            // Something went very wrong, drop the peer
            log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
        // Run the actual import and log any issues
        if _, err := f.insertChain(types.Blocks{block}); err != nil {
            log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
        // If import succeeded, broadcast the block
        go f.broadcastBlock(block, false)

        // Invoke the testing hook if needed
        if f.importedHook != nil {

然后調(diào)用f.insertChain(types.Blocks{block}) 插入本地區(qū)塊鏈。

fetcher.go 作為以太坊同步區(qū)塊的一個(gè)輔助類遏匆,它的職責(zé)就是層層把關(guān)法挨,層層過濾,抵制無效的區(qū)塊進(jìn)入幅聘,杜絕無用的同步請求凡纳。這塊代碼很多很亂,第一次看可能會有點(diǎn)暈帝蒿,第二次看可能還是很暈荐糜,多看幾次可能還會暈??,不過只要知道它做什么就好了葛超。

