
https://zhuanlan.zhihu.com/p/38425656 給出了專業(yè)的解讀。

// newNodeDB creates a new node database for storing and retrieving infos about
// known peers in the network. If no path is given, an in-memory, temporary
// database is constructed.
func newNodeDB(path string, version int, self NodeID) (*nodeDB, error) {
if path == "" {
return newMemoryNodeDB(self)
return newPersistentNodeDB(path, version, self)

// newMemoryNodeDB creates a new in-memory node database without a persistent
// backend.
func newMemoryNodeDB(self NodeID) (*nodeDB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
return &nodeDB{
lvl: db,
self: self,
quit: make(chan struct{}),
}, nil

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentNodeDB(path string, version int, self NodeID) (nodeDB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(
errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
if err != nil {
return nil, err
// The nodes contained in the cache correspond to a certain protocol version.
// Flush all nodes if the version doesn't match.
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(version))]

blob, err := db.Get(nodeDBVersionKey, nil)
switch err {
case leveldb.ErrNotFound:
    // Version not found (i.e. empty cache), insert it
    if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil {
        return nil, err
case nil:
    // Version present, flush if different
    if !bytes.Equal(blob, currentVer) {
        if err = os.RemoveAll(path); err != nil {
            return nil, err
        return newPersistentNodeDB(path, version, self)
return &nodeDB{
    lvl:  db,
    self: self,
    quit: make(chan struct{}),
}, nil


// node retrieves a node with a given id from the database.
func (db *nodeDB) node(id NodeID) *Node {
blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
if err != nil {
return nil
node := new(Node)
if err := rlp.DecodeBytes(blob, node); err != nil {
log.Error("Failed to decode node RLP", "err", err)
return nil
node.sha = crypto.Keccak256Hash(node.ID[:])
return node

// updateNode inserts - potentially overwriting - a node into the peer database.
func (db *nodeDB) updateNode(node *Node) error {
blob, err := rlp.EncodeToBytes(node)
if err != nil {
return err
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)

// deleteNode deletes all information/keys associated with a node.
func (db *nodeDB) deleteNode(id NodeID) error {
deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
for deleter.Next() {
if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
return err
return nil

type Node struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
// whether this node is currently being pinged in order to replace
// it in a bucket
contested bool

// ensureExpirer is a small helper method ensuring that the data expiration
// mechanism is running. If the expiration goroutine is already running, this
// method simply returns.
// The goal is to start the data evacuation only after the network successfully
// bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
// it would require significant overhead to exactly trace the first successful
// convergence, it's simpler to "ensure" the correct state when an appropriate
// condition occurs (i.e. a successful bonding), and discard further events.
func (db *nodeDB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })

// expirer should be started in a go routine, and is responsible for looping ad
// infinitum and dropping stale data from the database.
func (db *nodeDB) expirer() {
tick := time.NewTicker(nodeDBCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
if err := db.expireNodes(); err != nil {
log.Error("Failed to expire nodedb items", "err", err)
case <-db.quit:

// expireNodes iterates over the database and deletes all nodes that have not
// been seen (i.e. received a pong from) for some allotted time.
func (db *nodeDB) expireNodes() error {
threshold := time.Now().Add(-nodeDBNodeExpiration)
// Find discovered nodes that are older than the allowance
it := db.lvl.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
// Skip the item if not a discovery node
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
// Skip the node if not expired yet (and not self)
if !bytes.Equal(id[:], db.self[:]) {
if seen := db.lastPong(id); seen.After(threshold) {
// Otherwise delete all associated information
return nil

some state-update method
// lastPingReceived retrieves the time of the last ping packet sent by the remote node.
func (db *nodeDB) lastPingReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0)

// updateLastPing updates the last time remote node pinged us.
func (db *nodeDB) updateLastPingReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix())

// lastPongReceived retrieves the time of the last successful pong from remote node.
func (db *nodeDB) lastPongReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPong)), 0)

// hasBond reports whether the given node is considered bonded.
func (db *nodeDB) hasBond(id NodeID) bool {
return time.Since(db.lastPongReceived(id)) < nodeDBNodeExpiration

// updateLastPongReceived updates the last pong time of a node.
func (db *nodeDB) updateLastPongReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix())

// findFails retrieves the number of findnode failures since bonding.
func (db *nodeDB) findFails(id NodeID) int {
return int(db.fetchInt64(makeKey(id, nodeDBDiscoverFindFails)))

// updateFindFails updates the number of findnode failures since bonding.
func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))

// querySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db nodeDB) querySeeds(n int, maxAge time.Duration) []Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id NodeID
defer it.Release()

for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
// Seek to a random entry. The first byte is incremented by a
// random amount each time in order to increase the likelihood
// of hitting all existing nodes in very small databases.
ctr := id[0]
id[0] = ctr + id[0]%16
it.Seek(makeKey(id, nodeDBDiscoverRoot))

    n := nextNode(it)
    if n == nil {
        id[0] = 0
        continue seek // iterator exhausted
    if n.ID == db.self {
        continue seek
    if now.Sub(db.lastPongReceived(n.ID)) > maxAge {
        continue seek
    for i := range nodes {
        if nodes[i].ID == n.ID {
            continue seek // duplicate
    nodes = append(nodes, n)
return nodes


// reads the next node record from the iterator, skipping over other
// database entries.
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
var n Node
if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
log.Warn("Failed to decode node RLP", "id", id, "err", err)
return &n
return nil

table.go kademlia協(xié)議實現(xiàn)
const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits + 1 // Number of buckets

maxBondingPingPongs = 16
maxFindnodeFailures = 5

autoRefreshInterval = 1 * time.Hour
seedCount           = 30
seedMaxAge          = 5 * 24 * time.Hour


type Table struct {
mutex sync.Mutex // protects buckets, their content, and nursery
buckets [nBuckets]bucket // index of known nodes by distance
nursery []
Node // bootstrap nodes
db *nodeDB // database of known nodes

refreshReq chan chan struct{}
closeReq   chan struct{}
closed     chan struct{}

bondmu    sync.Mutex
bonding   map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes

nodeAddedHook func(*Node) // for testing

net  transport
self *Node // metadata of the local node


func newTable(t transport, ourID NodeID, ourAddr net.UDPAddr, nodeDBPath string, bootnodes []Node) (*Table, error) {
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
if err != nil {
return nil, err
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
go tab.loop()
return tab, nil

go tab.loop()篷扩,這個函數(shù)主要完成以下的工作:
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
for {
select {
case <-refresh.C:
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
case <-refreshDone:
for _, ch := range waiting {
waiting, refreshDone = nil, nil
case <-revalidate.C:
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
case <-copyNodes.C:
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
if tab.net != nil {
if refreshDone != nil {
for _, ch := range waiting {

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
//不容易生成sha3-preimage落入選定的存儲桶中,所以這里lookup 3個隨機(jī)的
for i := 0; i < 3; i++ {
var target NodeID
tab.lookup(target, false)

func (tab Table) lookup(targetID NodeID, refreshIfEmpty bool) []Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
for {
// generate initial result set
result = tab.closest(target, bucketSize)
if len(result.entries) > 0 || !refreshIfEmpty {
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
refreshIfEmpty = false
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
go tab.findnode(n, targetID, reply)
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
return result.entries

findnode基于一個查詢失敗記錄來查詢節(jié)點,如果找到,將nodes添加到table中啸驯,同時向reply channel發(fā)送nodes客扎。
func (tab *Table) findnode(n Node, targetID NodeID, reply chan<- []Node) {
fails := tab.db.findFails(n.ID)
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil || len(r) == 0 {
tab.db.updateFindFails(n.ID, fails)
log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
} else if fails > 0 {
tab.db.updateFindFails(n.ID, fails-1)

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
reply <- r


add查詢節(jié)點是否在table中,如果沒有的話罚斗,分成兩種情況徙鱼,第一是bucket有空間,則直接添加针姿;第二是bucket沒有空間袱吆,則先添加進(jìn)replacement list,替換掉bucket的活躍節(jié)點中最久沒有受到ping包的節(jié)點距淫。
func (tab *Table) add(n *Node) {
defer tab.mutex.Unlock()

b := tab.bucket(n.sha)
if !tab.bumpOrAdd(b, n) {
    // Node is not in table. Add it to the replacement list.
    tab.addReplacement(b, n)


result.push方法绞绒,這個方法會根據(jù) 所有的節(jié)點對于target的距離進(jìn)行排序。 按照從近到遠(yuǎn)的方式?jīng)Q定新節(jié)點的插入順序榕暇。(隊列中最大會包含16個元素)蓬衡。 這樣會導(dǎo)致隊列里面的元素和target的距離越來越近。距離相對遠(yuǎn)的會被踢出隊列拐揭。
// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
entries []*Node
target common.Hash

// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n

