以太坊源碼閱讀-網(wǎng)絡(luò)處理-P2P

Kademlia協(xié)議
https://zhuanlan.zhihu.com/p/38425656 給出了專業(yè)的解讀。

discover/discv5節(jié)點發(fā)現(xiàn)
database.go實現(xiàn)了節(jié)點的持久化,使用了單獨的leveldb實例已添。
// newNodeDB creates a new node database for storing and retrieving infos about
// known peers in the network. If no path is given, an in-memory, temporary
// database is constructed.
func newNodeDB(path string, version int, self NodeID) (*nodeDB, error) {
if path == "" {
return newMemoryNodeDB(self)
}
return newPersistentNodeDB(path, version, self)
}

// newMemoryNodeDB creates a new in-memory node database without a persistent
// backend.
func newMemoryNodeDB(self NodeID) (*nodeDB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return &nodeDB{
lvl: db,
self: self,
quit: make(chan struct{}),
}, nil
}

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentNodeDB(path string, version int, self NodeID) (nodeDB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(
errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
// The nodes contained in the cache correspond to a certain protocol version.
// Flush all nodes if the version doesn't match.
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(version))]

blob, err := db.Get(nodeDBVersionKey, nil)
switch err {
case leveldb.ErrNotFound:
    // Version not found (i.e. empty cache), insert it
    if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil {
        db.Close()
        return nil, err
    }
case nil:
    // Version present, flush if different
    if !bytes.Equal(blob, currentVer) {
        db.Close()
        if err = os.RemoveAll(path); err != nil {
            return nil, err
        }
        return newPersistentNodeDB(path, version, self)
    }
}
return &nodeDB{
    lvl:  db,
    self: self,
    quit: make(chan struct{}),
}, nil

}

node的存儲、查詢和刪除
// node retrieves a node with a given id from the database.
func (db *nodeDB) node(id NodeID) *Node {
blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
if err != nil {
return nil
}
node := new(Node)
if err := rlp.DecodeBytes(blob, node); err != nil {
log.Error("Failed to decode node RLP", "err", err)
return nil
}
node.sha = crypto.Keccak256Hash(node.ID[:])
return node
}

// updateNode inserts - potentially overwriting - a node into the peer database.
func (db *nodeDB) updateNode(node *Node) error {
blob, err := rlp.EncodeToBytes(node)
if err != nil {
return err
}
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
}

// deleteNode deletes all information/keys associated with a node.
func (db *nodeDB) deleteNode(id NodeID) error {
deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
for deleter.Next() {
if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
return err
}
}
return nil
}

node的數(shù)據(jù)結(jié)構(gòu)
type Node struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
// whether this node is currently being pinged in order to replace
// it in a bucket
contested bool
}

// ensureExpirer is a small helper method ensuring that the data expiration
// mechanism is running. If the expiration goroutine is already running, this
// method simply returns.
//
// The goal is to start the data evacuation only after the network successfully
// bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
// it would require significant overhead to exactly trace the first successful
// convergence, it's simpler to "ensure" the correct state when an appropriate
// condition occurs (i.e. a successful bonding), and discard further events.
func (db *nodeDB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })
}

// expirer should be started in a go routine, and is responsible for looping ad
// infinitum and dropping stale data from the database.
func (db *nodeDB) expirer() {
tick := time.NewTicker(nodeDBCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
if err := db.expireNodes(); err != nil {
log.Error("Failed to expire nodedb items", "err", err)
}
case <-db.quit:
return
}
}
}

// expireNodes iterates over the database and deletes all nodes that have not
// been seen (i.e. received a pong from) for some allotted time.
//這個方法遍歷所有的節(jié)點隘谣,如果某個節(jié)點最后接收消息超過指定值吼虎,那么就刪除這個節(jié)點。
func (db *nodeDB) expireNodes() error {
threshold := time.Now().Add(-nodeDBNodeExpiration)
// Find discovered nodes that are older than the allowance
it := db.lvl.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
// Skip the item if not a discovery node
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
// Skip the node if not expired yet (and not self)
if !bytes.Equal(id[:], db.self[:]) {
if seen := db.lastPong(id); seen.After(threshold) {
continue
}
}
// Otherwise delete all associated information
db.deleteNode(id)
}
return nil
}

some state-update method
// lastPingReceived retrieves the time of the last ping packet sent by the remote node.
func (db *nodeDB) lastPingReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0)
}

// updateLastPing updates the last time remote node pinged us.
func (db *nodeDB) updateLastPingReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix())
}

// lastPongReceived retrieves the time of the last successful pong from remote node.
func (db *nodeDB) lastPongReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPong)), 0)
}

// hasBond reports whether the given node is considered bonded.
func (db *nodeDB) hasBond(id NodeID) bool {
return time.Since(db.lastPongReceived(id)) < nodeDBNodeExpiration
}

// updateLastPongReceived updates the last pong time of a node.
func (db *nodeDB) updateLastPongReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix())
}

// findFails retrieves the number of findnode failures since bonding.
func (db *nodeDB) findFails(id NodeID) int {
return int(db.fetchInt64(makeKey(id, nodeDBDiscoverFindFails)))
}

// updateFindFails updates the number of findnode failures since bonding.
func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
}

// querySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db nodeDB) querySeeds(n int, maxAge time.Duration) []Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id NodeID
)
defer it.Release()

seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
// Seek to a random entry. The first byte is incremented by a
// random amount each time in order to increase the likelihood
// of hitting all existing nodes in very small databases.
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(makeKey(id, nodeDBDiscoverRoot))

    n := nextNode(it)
    if n == nil {
        id[0] = 0
        continue seek // iterator exhausted
    }
    if n.ID == db.self {
        continue seek
    }
    if now.Sub(db.lastPongReceived(n.ID)) > maxAge {
        continue seek
    }
    for i := range nodes {
        if nodes[i].ID == n.ID {
            continue seek // duplicate
        }
    }
    nodes = append(nodes, n)
}
return nodes

}

// reads the next node record from the iterator, skipping over other
// database entries.
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
var n Node
if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
log.Warn("Failed to decode node RLP", "id", id, "err", err)
continue
}
return &n
}
return nil
}

table.go kademlia協(xié)議實現(xiàn)
基本數(shù)據(jù)結(jié)構(gòu)
const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits + 1 // Number of buckets

maxBondingPingPongs = 16
maxFindnodeFailures = 5

autoRefreshInterval = 1 * time.Hour
seedCount           = 30
seedMaxAge          = 5 * 24 * time.Hour

)

type Table struct {
mutex sync.Mutex // protects buckets, their content, and nursery
buckets [nBuckets]bucket // index of known nodes by distance
nursery []
Node // bootstrap nodes
db *nodeDB // database of known nodes

refreshReq chan chan struct{}
closeReq   chan struct{}
closed     chan struct{}

bondmu    sync.Mutex
bonding   map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes

nodeAddedHook func(*Node) // for testing

net  transport
self *Node // metadata of the local node

}

func newTable(t transport, ourID NodeID, ourAddr net.UDPAddr, nodeDBPath string, bootnodes []Node) (*Table, error) {
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
if err != nil {
return nil, err
}
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
tab.loadSeedNodes()
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
tab.db.ensureExpirer()
go tab.loop()
return tab, nil
}

go tab.loop()篷扩,這個函數(shù)主要完成以下的工作:
1.每半小時進(jìn)行一次刷新工作(autoRefreshInterval)
2.如果接收到refreshReq/revalidate/copyNode等請求。那么進(jìn)行刷新/重新生效/復(fù)制等工作茉盏。
3.如果接收到關(guān)閉消息鉴未。那么進(jìn)行關(guān)閉。
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
case <-copyNodes.C:
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
tab.db.close()
close(tab.closed)
}

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes()
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
//我們不可以堅持這個鸠姨,因為findnode目標(biāo)是512位值(不是哈希大型选)并且
//不容易生成sha3-preimage落入選定的存儲桶中,所以這里lookup 3個隨機(jī)的
//目標(biāo)
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}

lookup函數(shù)從table中最近的16個節(jié)點開始查找并結(jié)果排序讶迁,取錢16個節(jié)點连茧。
func (tab Table) lookup(targetID NodeID, refreshIfEmpty bool) []Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
for {
tab.mutex.Lock()
// generate initial result set
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go tab.findnode(n, targetID, reply)
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
return result.entries
}

findnode基于一個查詢失敗記錄來查詢節(jié)點,如果找到,將nodes添加到table中啸驯,同時向reply channel發(fā)送nodes客扎。
func (tab *Table) findnode(n Node, targetID NodeID, reply chan<- []Node) {
fails := tab.db.findFails(n.ID)
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil || len(r) == 0 {
fails++
tab.db.updateFindFails(n.ID, fails)
log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.updateFindFails(n.ID, fails-1)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
    tab.add(n)
}
reply <- r

}

add查詢節(jié)點是否在table中,如果沒有的話罚斗,分成兩種情況徙鱼,第一是bucket有空間,則直接添加针姿;第二是bucket沒有空間袱吆,則先添加進(jìn)replacement list,替換掉bucket的活躍節(jié)點中最久沒有受到ping包的節(jié)點距淫。
func (tab *Table) add(n *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()

b := tab.bucket(n.sha)
if !tab.bumpOrAdd(b, n) {
    // Node is not in table. Add it to the replacement list.
    tab.addReplacement(b, n)
}

}

result.push方法绞绒,這個方法會根據(jù) 所有的節(jié)點對于target的距離進(jìn)行排序。 按照從近到遠(yuǎn)的方式?jīng)Q定新節(jié)點的插入順序榕暇。(隊列中最大會包含16個元素)蓬衡。 這樣會導(dǎo)致隊列里面的元素和target的距離越來越近。距離相對遠(yuǎn)的會被踢出隊列拐揭。
// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
entries []*Node
target common.Hash
}

// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末撤蟆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子堂污,更是在濱河造成了極大的恐慌家肯,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盟猖,死亡現(xiàn)場離奇詭異讨衣,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)式镐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門反镇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人娘汞,你說我怎么就攤上這事歹茶。” “怎么了你弦?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵惊豺,是天一觀的道長。 經(jīng)常有香客問我禽作,道長尸昧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任旷偿,我火速辦了婚禮烹俗,結(jié)果婚禮上爆侣,老公的妹妹穿的比我還像新娘。我一直安慰自己幢妄,他們只是感情好兔仰,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著磁浇,像睡著了一般斋陪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上置吓,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機(jī)與錄音缔赠,去河邊找鬼衍锚。 笑死,一個胖子當(dāng)著我的面吹牛嗤堰,可吹牛的內(nèi)容都是我干的戴质。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼踢匣,長吁一口氣:“原來是場噩夢啊……” “哼告匠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起离唬,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤后专,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后输莺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體戚哎,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年嫂用,在試婚紗的時候發(fā)現(xiàn)自己被綠了型凳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡嘱函,死狀恐怖甘畅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情往弓,我是刑警寧澤疏唾,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站亮航,受9級特大地震影響荸实,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜缴淋,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一准给、第九天 我趴在偏房一處隱蔽的房頂上張望泄朴。 院中可真熱鬧,春花似錦露氮、人聲如沸祖灰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽局扶。三九已至,卻和暖如春叁扫,著一層夾襖步出監(jiān)牢的瞬間三妈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工莫绣, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留畴蒲,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓对室,卻偏偏與公主長得像模燥,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子掩宜,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容