以太坊源碼(3)——節(jié)點發(fā)現(xiàn)協(xié)議甸祭,KAD協(xié)議

ethereum協(xié)議

3缕碎、P2P節(jié)點發(fā)現(xiàn)

(1)分布式哈希表(DHT)

DHT全稱叫分布式哈希表(Distributed Hash Table),是一種分布式存儲方法池户。在不需要服務器的情況下咏雌,每個客戶端負責一個小范圍的路由,并負責存儲一小部分數(shù)據(jù)校焦,從而實現(xiàn)整個DHT網(wǎng)絡的尋址和存儲赊抖。DHT技術(shù)的應用來源于p2p網(wǎng)絡發(fā)展的需要。第二代p2p文件共享系統(tǒng)正是由于查找節(jié)點十分困難且耗費網(wǎng)絡資源而促進了第三代系統(tǒng)引入了DHT技術(shù)寨典,用以快速的查找節(jié)點以及資源氛雪。

分布式哈希表與哈希表的共同之處在于能夠?qū)崿F(xiàn)快速的查找。它與上面哈希表的不同在于:1)哈希表通常是本地的耸成,用于在本地快速的插入和查找數(shù)據(jù)报亩。而分布式哈希表相當于將哈希表中的bucket(桶)分散到不同的節(jié)點計算機中。2)哈希表增添墓猎、刪除桶會導致所有的數(shù)據(jù)需要重新hash,但分布式哈希表支持動態(tài)的節(jié)點的數(shù)目赚楚,節(jié)點可以隨意的進入或退出毙沾。

在以太坊中,DHT使用的是KAD協(xié)議宠页。
引自:https://blog.csdn.net/lj900911/article/details/83861438

在Kad網(wǎng)絡中左胞,所有節(jié)點都被當作一顆二叉樹的葉子寇仓,并且每一個節(jié)點的位置都由其ID值的最短前綴唯一確定。

ID值是512位公鑰經(jīng)過Hash出來的256位地址烤宙。

1遍烦、如何將ID映射到二叉樹

如何把節(jié)點映射到二叉樹?

1)先把key(nodeID)以二進制的形式表示躺枕,進行“最短唯一前綴”來處理服猪;
2)二進制的第n位代表二叉樹的第n層,這樣一個子樹的每個節(jié)點連起來就是完整的id二進制表示拐云;
3)“1”代表進入左子樹罢猪,“0”代表進入右子樹(反過來也行)
4)按上面的步驟處理后得到到最后的葉子節(jié)點,就是該“key”對應的節(jié)點叉瘩。

在以太坊中膳帕,KAD協(xié)議的核心邏輯由Discover/table.go中進行實現(xiàn)。KAD協(xié)議中薇缅,有四種RPC類型危彩,包括PING、STORE泳桦、FINDNODE汤徽、FINDVALUE。以太坊的KAD只實現(xiàn)了PING和FindNode蓬痒。

首先是新建Table

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error) {
    //新建 參數(shù)包括transport:KAD的兩個操作泻骤, db 以及引導節(jié)點 
    tab := &Table{
        net:        t,
        db:         db,
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
    }
    //加載引導節(jié)點
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    //對每一個bucket[i]創(chuàng)建bucket對象
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
    //產(chǎn)生隨機種子 后面用于讀取table里面的randomnode 代碼見下面的代碼塊。
    tab.seedRand()
    //讀取SeedNodes
    tab.loadSeedNodes()
    //goroutine 負責刷新table以及關(guān)閉
    go tab.loop()
    return tab, nil
}

func (tab *Table) seedRand() {
    var b [8]byte
    crand.Read(b[:])

    tab.mutex.Lock()
    tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
    tab.mutex.Unlock()
}

func (tab *Table) loadSeedNodes() {
    seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
    seeds = append(seeds, tab.nursery...)
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
        log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
        //將種子節(jié)點加入到addSeenNode中
        tab.addSeenNode(seed)
    }
}

看一下tab.loop()梧奢。


// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
    var (
        revalidate     = time.NewTimer(tab.nextRevalidateTime())
        refresh        = time.NewTicker(refreshInterval)
        copyNodes      = time.NewTicker(copyNodesInterval)
        refreshDone    = make(chan struct{})           // where doRefresh reports completion
        revalidateDone chan struct{}                   // where doRevalidate reports completion
        waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
    )
    defer refresh.Stop()
    defer revalidate.Stop()
    defer copyNodes.Stop()

    // Start initial refresh.
    go tab.doRefresh(refreshDone)

loop:
    for {
        select {
            //每一個小時執(zhí)行的刷新節(jié)點
        case <-refresh.C:
            tab.seedRand()
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
            //收到刷新請求也刷新
        case req := <-tab.refreshReq:
            waiting = append(waiting, req)
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
            //刷新完 關(guān)閉channel
        case <-refreshDone:
            for _, ch := range waiting {
                close(ch)
            }
            waiting, refreshDone = nil, nil
            //驗證bucket最后一個的節(jié)點是不是還存貨
        case <-revalidate.C:
            revalidateDone = make(chan struct{})
            go tab.doRevalidate(revalidateDone)
            //驗證完了 重置驗證時間
        case <-revalidateDone:
            revalidate.Reset(tab.nextRevalidateTime())
            revalidateDone = nil
            //如果存活節(jié)點存貨時間比seedMinTableTime長狱掂,存入db
        case <-copyNodes.C:
            go tab.copyLiveNodes()
            //關(guān)閉接收請求 跳出循環(huán)
        case <-tab.closeReq:
            break loop
        }
    }

    if refreshDone != nil {
        <-refreshDone
    }
    for _, ch := range waiting {
        close(ch)
    }
    if revalidateDone != nil {
        <-revalidateDone
    }
    close(tab.closed)
}

看一下負責刷新的doRefresh函數(shù)。

func (tab *Table) doRefresh(done chan struct{}) {
    defer close(done)

    // Load nodes from the database and insert
    // them. This should yield a few previously seen nodes that are
    // (hopefully) still alive.
    tab.loadSeedNodes()

    // Run self lookup to discover new neighbor nodes.
    // We can only do this if we have a secp256k1 identity.
    var key ecdsa.PublicKey
    if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
        tab.lookup(encodePubkey(&key), false)
    }

    // The Kademlia paper specifies that the bucket refresh should
    // perform a lookup in the least recently used bucket. We cannot
    // adhere to this because the findnode target is a 512bit value
    // (not hash-sized) and it is not easily possible to generate a
    // sha3 preimage that falls into a chosen bucket.
    // We perform a few lookups with a random target instead.
    for i := 0; i < 3; i++ {
        var target encPubkey
        crand.Read(target[:])
        tab.lookup(target, false)
    }
}

主要是用tab.lookup()進行節(jié)點查找的亲轨∏鞑遥看一下lookup函數(shù)。

func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
    var (
        target         = enode.ID(crypto.Keccak256Hash(targetKey[:]))
        asked          = make(map[enode.ID]bool)
        seen           = make(map[enode.ID]bool)
        reply          = make(chan []*node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    asked[tab.self().ID()] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
        //返回和target最近的集合惦蚊,最多bucketSize=16個
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.
        <-tab.refresh()
        refreshIfEmpty = false
    }

    for {
        // ask the alpha closest nodes that we haven't asked yet
        //從result set 中的α=3個節(jié)點發(fā)起findnode請求器虾,詢問其離target最近的節(jié)點集合
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID()] {
                asked[n.ID()] = true
                pendingQueries++
                //執(zhí)行findnode方法
                go tab.findnode(n, targetKey, reply)
            }
        }
        //上面詢問過程的節(jié)點都問過了
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        select {
            //nodes放入表
        case nodes := <-reply:
            for _, n := range nodes {
                if n != nil && !seen[n.ID()] {
                    seen[n.ID()] = true
                    //按照距離排序放入
                    result.push(n, bucketSize)
                }
            }
            //關(guān)閉請求
        case <-tab.closeReq:
            return nil // shutdown, no need to continue.
        }
        pendingQueries--
    }
    return result.entries
}

看一下findnode

func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
    //findnode失敗次數(shù)
    fails := tab.db.FindFails(n.ID(), n.IP())
    //udp發(fā)送findnode請求
    r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
    if err == errClosed {
        // Avoid recording failures on shutdown.
        reply <- nil
        return
    } else if len(r) == 0 {
        fails++
        tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
        log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
        //大于失敗次數(shù),從table中刪去節(jié)點
        if fails >= maxFindnodeFailures {
            log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
            tab.delete(n)
        }
    } else if fails > 0 {
        tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
    }

    // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
    // just remove those again during revalidation.

    //盡可能將節(jié)點添加到Table中
    for _, n := range r {
        tab.addSeenNode(n)
    }
    reply <- r
}

再看看table.addSeenNode()

// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
    //等于自己ID蹦锋,就不加了
    if n.ID() == tab.self().ID() {
        return
    }

    tab.mutex.Lock()
    defer tab.mutex.Unlock()

    
    b := tab.bucket(n.ID())
    //b在bucket里兆沙,就不加了
    if contains(b.entries, n.ID()) {
        // Already in bucket, don't add.
        return
    }
    //bucket滿了,可能作為替代添加莉掂,舍棄頭部節(jié)點
    if len(b.entries) >= bucketSize {
        // Bucket full, maybe add as replacement.
        tab.addReplacement(b, n)
        return
    }
    if !tab.addIP(b, n.IP()) {
        // Can't add: IP limit reached.

        return
    }
    // Add to end of bucket:
    //直接在尾部加葛圃,將n從replacement中刪除
    b.entries = append(b.entries, n)
    b.replacements = deleteNode(b.replacements, n)
    n.addedAt = time.Now()
    if tab.nodeAddedHook != nil {
        tab.nodeAddedHook(n)
    }
}

至此KAD的邏輯處理已經(jīng)完成,還有部分細節(jié)代碼沒有看,但總體流程如上库正。
具體的PINGPONG曲楚、FINDNODE請求在Discover/udp.go中,有空再看褥符。
這里只分析KAD節(jié)點發(fā)現(xiàn)龙誊。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市喷楣,隨后出現(xiàn)的幾起案子趟大,更是在濱河造成了極大的恐慌,老刑警劉巖抡蛙,帶你破解...
    沈念sama閱讀 222,681評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件护昧,死亡現(xiàn)場離奇詭異,居然都是意外死亡粗截,警方通過查閱死者的電腦和手機惋耙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來熊昌,“玉大人绽榛,你說我怎么就攤上這事⌒鲆伲” “怎么了灭美?”我有些...
    開封第一講書人閱讀 169,421評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長昂利。 經(jīng)常有香客問我届腐,道長,這世上最難降的妖魔是什么蜂奸? 我笑而不...
    開封第一講書人閱讀 60,114評論 1 300
  • 正文 為了忘掉前任犁苏,我火速辦了婚禮,結(jié)果婚禮上扩所,老公的妹妹穿的比我還像新娘围详。我一直安慰自己,他們只是感情好祖屏,可當我...
    茶點故事閱讀 69,116評論 6 398
  • 文/花漫 我一把揭開白布助赞。 她就那樣靜靜地躺著,像睡著了一般袁勺。 火紅的嫁衣襯著肌膚如雪雹食。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,713評論 1 312
  • 那天期丰,我揣著相機與錄音群叶,去河邊找鬼漠嵌。 笑死,一個胖子當著我的面吹牛盖呼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播化撕,決...
    沈念sama閱讀 41,170評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼几晤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了植阴?” 一聲冷哼從身側(cè)響起蟹瘾,我...
    開封第一講書人閱讀 40,116評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掠手,沒想到半個月后憾朴,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,651評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡喷鸽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,714評論 3 342
  • 正文 我和宋清朗相戀三年众雷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片做祝。...
    茶點故事閱讀 40,865評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡砾省,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出混槐,到底是詐尸還是另有隱情编兄,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布声登,位于F島的核電站狠鸳,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏悯嗓。R本人自食惡果不足惜件舵,卻給世界環(huán)境...
    茶點故事閱讀 42,211評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绅作。 院中可真熱鬧芦圾,春花似錦、人聲如沸俄认。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眯杏。三九已至夜焦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間岂贩,已是汗流浹背茫经。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人卸伞。 一個月前我還...
    沈念sama閱讀 49,299評論 3 379
  • 正文 我出身青樓抹镊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親荤傲。 傳聞我的和親對象是個殘疾皇子垮耳,可洞房花燭夜當晚...
    茶點故事閱讀 45,870評論 2 361

推薦閱讀更多精彩內(nèi)容

  • 一些概念 數(shù)據(jù)結(jié)構(gòu)就是研究數(shù)據(jù)的邏輯結(jié)構(gòu)和物理結(jié)構(gòu)以及它們之間相互關(guān)系,并對這種結(jié)構(gòu)定義相應的運算遂黍,而且確保經(jīng)過這...
    Winterfell_Z閱讀 5,854評論 0 13
  • 1终佛、P2P原理及協(xié)議概述 P2P 主要存在四種不同的網(wǎng)絡模型,也代表著 P2P 技術(shù)的四個發(fā)展階段:集中式雾家、純分布...
    牧碼人愛跑馬閱讀 14,623評論 1 18
  • 一铃彰、概念 常見的P2P網(wǎng)絡主要分為兩種類型:結(jié)構(gòu)化網(wǎng)絡和非結(jié)構(gòu)化網(wǎng)絡。 非結(jié)構(gòu)化的P2P網(wǎng)絡:并不給節(jié)點的連接覆...
    麻臉大叔閱讀 3,434評論 0 3
  • 分布式系統(tǒng)面臨的第一個問題就是數(shù)據(jù)分布芯咧,即將數(shù)據(jù)均勻地分布到多個存儲節(jié)點牙捉。另外,為了保證可靠性和可用性敬飒,需要將數(shù)據(jù)...
    olostin閱讀 4,589評論 2 26
  • --- layout: post title: "如果有人問你關(guān)系型數(shù)據(jù)庫的原理鹃共,叫他看這篇文章(轉(zhuǎn))" date...
    藍墜星閱讀 796評論 0 3