ethereum協(xié)議
3缕碎、P2P節(jié)點發(fā)現(xiàn)
(1)分布式哈希表(DHT)
DHT全稱叫分布式哈希表(Distributed Hash Table),是一種分布式存儲方法池户。在不需要服務器的情況下咏雌,每個客戶端負責一個小范圍的路由,并負責存儲一小部分數(shù)據(jù)校焦,從而實現(xiàn)整個DHT網(wǎng)絡的尋址和存儲赊抖。DHT技術(shù)的應用來源于p2p網(wǎng)絡發(fā)展的需要。第二代p2p文件共享系統(tǒng)正是由于查找節(jié)點十分困難且耗費網(wǎng)絡資源而促進了第三代系統(tǒng)引入了DHT技術(shù)寨典,用以快速的查找節(jié)點以及資源氛雪。
分布式哈希表與哈希表的共同之處在于能夠?qū)崿F(xiàn)快速的查找。它與上面哈希表的不同在于:1)哈希表通常是本地的耸成,用于在本地快速的插入和查找數(shù)據(jù)报亩。而分布式哈希表相當于將哈希表中的bucket(桶)分散到不同的節(jié)點計算機中。2)哈希表增添墓猎、刪除桶會導致所有的數(shù)據(jù)需要重新hash,但分布式哈希表支持動態(tài)的節(jié)點的數(shù)目赚楚,節(jié)點可以隨意的進入或退出毙沾。
在以太坊中,DHT使用的是KAD協(xié)議宠页。
引自:https://blog.csdn.net/lj900911/article/details/83861438
在Kad網(wǎng)絡中左胞,所有節(jié)點都被當作一顆二叉樹的葉子寇仓,并且每一個節(jié)點的位置都由其ID值的最短前綴唯一確定。
ID值是512位公鑰經(jīng)過Hash出來的256位地址烤宙。
1遍烦、如何將ID映射到二叉樹
如何把節(jié)點映射到二叉樹?
1)先把key(nodeID)以二進制的形式表示躺枕,進行“最短唯一前綴”來處理服猪;
2)二進制的第n位代表二叉樹的第n層,這樣一個子樹的每個節(jié)點連起來就是完整的id二進制表示拐云;
3)“1”代表進入左子樹罢猪,“0”代表進入右子樹(反過來也行)
4)按上面的步驟處理后得到到最后的葉子節(jié)點,就是該“key”對應的節(jié)點叉瘩。
在以太坊中膳帕,KAD協(xié)議的核心邏輯由Discover/table.go中進行實現(xiàn)。KAD協(xié)議中薇缅,有四種RPC類型危彩,包括PING、STORE泳桦、FINDNODE汤徽、FINDVALUE。以太坊的KAD只實現(xiàn)了PING和FindNode蓬痒。
首先是新建Table
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error) {
//新建 參數(shù)包括transport:KAD的兩個操作泻骤, db 以及引導節(jié)點
tab := &Table{
net: t,
db: db,
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
//加載引導節(jié)點
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
//對每一個bucket[i]創(chuàng)建bucket對象
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
//產(chǎn)生隨機種子 后面用于讀取table里面的randomnode 代碼見下面的代碼塊。
tab.seedRand()
//讀取SeedNodes
tab.loadSeedNodes()
//goroutine 負責刷新table以及關(guān)閉
go tab.loop()
return tab, nil
}
func (tab *Table) seedRand() {
var b [8]byte
crand.Read(b[:])
tab.mutex.Lock()
tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
tab.mutex.Unlock()
}
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
//將種子節(jié)點加入到addSeenNode中
tab.addSeenNode(seed)
}
}
看一下tab.loop()梧奢。
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
//每一個小時執(zhí)行的刷新節(jié)點
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
//收到刷新請求也刷新
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
//刷新完 關(guān)閉channel
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
//驗證bucket最后一個的節(jié)點是不是還存貨
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
//驗證完了 重置驗證時間
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
//如果存活節(jié)點存貨時間比seedMinTableTime長狱掂,存入db
case <-copyNodes.C:
go tab.copyLiveNodes()
//關(guān)閉接收請求 跳出循環(huán)
case <-tab.closeReq:
break loop
}
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}
看一下負責刷新的doRefresh函數(shù)。
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes()
// Run self lookup to discover new neighbor nodes.
// We can only do this if we have a secp256k1 identity.
var key ecdsa.PublicKey
if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
tab.lookup(encodePubkey(&key), false)
}
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
for i := 0; i < 3; i++ {
var target encPubkey
crand.Read(target[:])
tab.lookup(target, false)
}
}
主要是用tab.lookup()進行節(jié)點查找的亲轨∏鞑遥看一下lookup函數(shù)。
func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
var (
target = enode.ID(crypto.Keccak256Hash(targetKey[:]))
asked = make(map[enode.ID]bool)
seen = make(map[enode.ID]bool)
reply = make(chan []*node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self().ID()] = true
for {
tab.mutex.Lock()
// generate initial result set
//返回和target最近的集合惦蚊,最多bucketSize=16個
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}
for {
// ask the alpha closest nodes that we haven't asked yet
//從result set 中的α=3個節(jié)點發(fā)起findnode請求器虾,詢問其離target最近的節(jié)點集合
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID()] {
asked[n.ID()] = true
pendingQueries++
//執(zhí)行findnode方法
go tab.findnode(n, targetKey, reply)
}
}
//上面詢問過程的節(jié)點都問過了
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
select {
//nodes放入表
case nodes := <-reply:
for _, n := range nodes {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
//按照距離排序放入
result.push(n, bucketSize)
}
}
//關(guān)閉請求
case <-tab.closeReq:
return nil // shutdown, no need to continue.
}
pendingQueries--
}
return result.entries
}
看一下findnode
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
//findnode失敗次數(shù)
fails := tab.db.FindFails(n.ID(), n.IP())
//udp發(fā)送findnode請求
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if len(r) == 0 {
fails++
tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
//大于失敗次數(shù),從table中刪去節(jié)點
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
//盡可能將節(jié)點添加到Table中
for _, n := range r {
tab.addSeenNode(n)
}
reply <- r
}
再看看table.addSeenNode()
// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
//等于自己ID蹦锋,就不加了
if n.ID() == tab.self().ID() {
return
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
//b在bucket里兆沙,就不加了
if contains(b.entries, n.ID()) {
// Already in bucket, don't add.
return
}
//bucket滿了,可能作為替代添加莉掂,舍棄頭部節(jié)點
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to end of bucket:
//直接在尾部加葛圃,將n從replacement中刪除
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}
至此KAD的邏輯處理已經(jīng)完成,還有部分細節(jié)代碼沒有看,但總體流程如上库正。
具體的PINGPONG曲楚、FINDNODE請求在Discover/udp.go中,有空再看褥符。
這里只分析KAD節(jié)點發(fā)現(xiàn)龙誊。