死磕以太坊源碼分析之p2p節(jié)點發(fā)現(xiàn)
在閱讀節(jié)點發(fā)現(xiàn)源碼之前必須要理解kadmilia算法坐榆,可以參考:KAD算法詳解。
節(jié)點發(fā)現(xiàn)概述
節(jié)點發(fā)現(xiàn)墓塌,使本地節(jié)點得知其他節(jié)點的信息,進而加入到p2p網(wǎng)絡中垫挨。
以太坊的節(jié)點發(fā)現(xiàn)基于類似的kademlia算法九榔,源碼中有兩個版本哲泊,v4和v5切威。v4適用于全節(jié)點丙号,通過discover.ListenUDP
使用,v5適用于輕節(jié)點通過discv5.ListenUDP
使用喳魏,本文介紹的是v4版本怀薛。
節(jié)點發(fā)現(xiàn)功能主要涉及 Server Table udp 這幾個數(shù)據(jù)結(jié)構,它們有獨自的事件響應循環(huán)三热,節(jié)點發(fā)現(xiàn)功能便是它們互相協(xié)作完成的三幻。其中念搬,每個以太坊客戶端啟動后都會在本地運行一個Server朗徊,并將網(wǎng)絡拓撲中相鄰的節(jié)點視為Node有缆,而Table是Node的容器棚壁,udp則是負責維持底層的連接袖外。這些結(jié)構的關系如下圖:
p2p服務開啟節(jié)點發(fā)現(xiàn)
在P2p的server.go 的start方法中:
if err := srv.setupDiscovery(); err != nil {
return err
}
進入到setupDiscovery
中:
// Discovery V4
var unhandled chan discover.ReadPacket
var sconn *sharedUDPConn
if !srv.NoDiscovery {
...
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
....
}
discover.ListenUDP
方法即開啟了節(jié)點發(fā)現(xiàn)的功能.
首先解析出監(jiān)聽地址的UDP端口曼验,根據(jù)端口返回與之相連的UDP連接粘姜,之后返回連接的本地網(wǎng)絡地址鬓照,接著設置最后一個UDP-on-IPv4端口。到此為止節(jié)點發(fā)現(xiàn)的一些準備工作做好豺裆,接下下來開始UDP的監(jiān)聽:
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
然后進行UDP 的監(jiān)聽,下面是監(jiān)聽的過程:
監(jiān)聽UDP
// 監(jiān)聽給定的socket 上的發(fā)現(xiàn)的包
func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(c, ln, cfg)
}
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
closeCtx, cancel := context.WithCancel(context.Background())
t := &UDPv4{
conn: c,
priv: cfg.PrivateKey,
netrestrict: cfg.NetRestrict,
localNode: ln,
db: ln.Database(),
gotreply: make(chan reply),
addReplyMatcher: make(chan *replyMatcher),
closeCtx: closeCtx,
cancelCloseCtx: cancel,
log: cfg.Log,
}
if t.log == nil {
t.log = log.Root()
}
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) //
if err != nil {
return nil, err
}
t.tab = tab
go tab.loop() //
t.wg.Add(2)
go t.loop() //
go t.readLoop(cfg.Unhandled) //
return t, nil
}
主要做了以下幾件事:
1.新建路由表
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
新建路由表做了以下幾件事:
- 初始化table對象
- 設置bootnode(setFallbackNodes)
- 節(jié)點第一次啟動的時候留储,節(jié)點會與硬編碼在以太坊源碼中的
bootnode
進行連接,所有的節(jié)點加入幾乎都先連接了它咙轩。連接上bootnode
后,獲取bootnode
部分的鄰居節(jié)點活喊,然后進行節(jié)點發(fā)現(xiàn)丐膝,獲取更多的活躍的鄰居節(jié)點 - nursery 是在 Table 為空并且數(shù)據(jù)庫中沒有存儲節(jié)點時的初始連接節(jié)點(上文中的 6 個節(jié)點),通過 bootnode 可以發(fā)現(xiàn)新的鄰居
- 節(jié)點第一次啟動的時候留储,節(jié)點會與硬編碼在以太坊源碼中的
- tab.seedRand:使用提供的種子值將生成器初始化為確定性狀態(tài)
- loadSeedNodes:加載種子節(jié)點;從保留已知節(jié)點的數(shù)據(jù)庫中隨機的抽取30個節(jié)點帅矗,再加上引導節(jié)點列表中的節(jié)點,放置入k桶中浑此,如果K桶沒有空間凛俱,則假如到替換列表中。
2.測試鄰居節(jié)點連通性
首先知道UDP協(xié)議是沒有連接的概念的原叮,所以需要不斷的ping 來測試對端節(jié)點是否正常,在新建路由表之后,就來到下面的循環(huán)达布,不斷的去做上面的事团甲。
go tab.loop()
定時運行doRefresh
、doRevalidate
黍聂、copyLiveNodes
進行刷新K桶。
以太坊的k桶設置:
const (
alpha = 3 // Kademlia并發(fā)參數(shù), 是系統(tǒng)內(nèi)一個優(yōu)化參數(shù),控制每次從K桶最多取出節(jié)點個數(shù),ethereum取值3
bucketSize = 16 // K桶大小(可容納節(jié)點數(shù))
maxReplacements = 10 // 每桶更換列表的大小
hashBits = len(common.Hash{}) * 8 //每個節(jié)點ID長度,32*8=256, 32位16進制
nBuckets = hashBits / 15 // K桶個數(shù)
)
首先搞清楚這三個定時器運行的時間:
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
doRefresh
doRefresh對隨機目標執(zhí)行查找以保持K桶已滿身腻。如果表為空(初始引導程序或丟棄的有故障)产还,則插入種子節(jié)點。
主要以下幾步:
-
從數(shù)據(jù)庫加載隨機節(jié)點和引導節(jié)點嘀趟。這應該會產(chǎn)生一些以前見過的節(jié)點
tab.loadSeedNodes()
-
將本地節(jié)點ID作為目標節(jié)點進行查找最近的鄰居節(jié)點
tab.net.lookupSelf()
func (t *UDPv4) lookupSelf() []*enode.Node { return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run() }
func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup { ... return t.findnode(n.ID(), n.addr(), targetKey) }) return it }
向這些節(jié)點發(fā)起
findnode
操作查詢離target節(jié)點最近的節(jié)點列表,將查詢得到的節(jié)點進行ping-pong
測試,將測試通過的節(jié)點落庫保存經(jīng)過這個流程后,節(jié)點的K桶就能夠比較均勻地將不同網(wǎng)絡節(jié)點更新到本地K桶中脐区。
unc (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) { t.ensureBond(toid, toaddr) nodes := make([]*node, 0, bucketSize) nreceived := 0 // 設置回應回調(diào)函數(shù),等待類型為neighborsPacket的鄰近節(jié)點包她按,如果類型對牛隅,就執(zhí)行回調(diào)請求 rm := t.pending(toid, toaddr.IP, p_neighborsV4, func(r interface{}) (matched bool, requestDone bool) { reply := r.(*neighborsV4) for _, rn := range reply.Nodes { nreceived++ // 得到一個簡單的node結(jié)構 n, err := t.nodeFromRPC(toaddr, rn) if err != nil { t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) } return true, nreceived >= bucketSize }) //上面了一個管道事件,下面開始發(fā)送真正的findnode報文酌泰,然后進行等待了 t.send(toaddr, toid, &findnodeV4{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) return nodes, <-rm.errc }
-
查找3個隨機的目標節(jié)點
for i := 0; i < 3; i++ { tab.net.lookupRandom() }
doRevalidate
doRevalidate檢查隨機存儲桶中的最后一個節(jié)點是否仍然存在媒佣,如果不是,則替換或刪除該節(jié)點陵刹。
主要以下幾步:
-
返回隨機的非空K桶中的最后一個節(jié)點
last, bi := tab.nodeToRevalidate()
-
對最后的節(jié)點執(zhí)行Ping操作默伍,然后等待Pong
remoteSeq, err := tab.net.ping(unwrapNode(last))
-
如果節(jié)點ping通了的話,將節(jié)點移動到最前面
tab.bumpInBucket(b, last)
-
沒有收到回復,選擇一個替換節(jié)點也糊,或者如果沒有任何替換節(jié)點炼蹦,則刪除該節(jié)點
tab.replace(b, last)
copyLiveNodes
copyLiveNodes將表中的節(jié)點添加到數(shù)據(jù)庫,如果節(jié)點在表中的時間超過了5分鐘。
這部分代碼比較簡單狸剃,就伸展闡述掐隐。
if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.UpdateNode(unwrapNode(n))
}
3.檢測各類信息
go t.loop()
loop循環(huán)主要監(jiān)聽以下幾類消息:
- case <-t.closeCtx.Done():檢測是否停止
- p := <-t.addReplyMatcher:檢測是否有添加新的待處理消息
- r := <-t.gotreply:檢測是否接收到其他節(jié)點的回復消息
4. 處理UDP數(shù)據(jù)包
go t.readLoop(cfg.Unhandled)
主要有以下兩件事:
-
循環(huán)接收其他節(jié)點發(fā)來的udp消息
nbytes, from, err := t.conn.ReadFromUDP(buf)
-
處理接收到的UDP消息
t.handlePacket(from, buf[:nbytes])
接下來對這兩個函數(shù)進行進一步的解析。
接收UDP消息
接收UDP消息比較的簡單钞馁,就是不斷的從連接中讀取Packet數(shù)據(jù)虑省,它有以下幾種消息:
ping
:用于判斷遠程節(jié)點是否在線。pong
:用于回復ping
消息的響應指攒。findnode
:查找與給定的目標節(jié)點相近的節(jié)點慷妙。neighbors
:用于回復findnode
的響應,與給定的目標節(jié)點相近的節(jié)點列表
處理UDP消息
主要做了以下幾件事:
-
數(shù)據(jù)包解碼
packet, fromKey, hash, err := decodeV4(buf)
-
檢查數(shù)據(jù)包是否有效允悦,是否可以處理
packet.preverify(t, from, fromID, fromKey)
在校驗這一塊膝擂,涉及不同的消息類型不同的校驗,我們來分別對各種消息進行分析隙弛。
①:
ping
- 校驗消息是否過期
- 校驗公鑰是否有效
②:
pong
- 校驗消息是否過期
- 校驗回復是否正確
③:
findNodes
- 校驗消息是否過期
- 校驗節(jié)點是否是最近的節(jié)點
④:
neighbors
- 校驗消息是否過期
- 用于回復
findnode
的響應架馋,校驗回復是否正確
-
處理packet數(shù)據(jù)
packet.handle(t, from, fromID, hash)
相同的,也會有4種消息全闷,但是我們這邊重點講處理findNodes的消息:
func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
...
}
我們這里就稍微介紹下如何處理`findnode`的消息:
```go
func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
// 確定最近的節(jié)點
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
t.tab.mutex.Lock()
//最接近的返回表中最接近給定id的n個節(jié)點
closest := t.tab.closest(target, bucketSize, true).entries
t.tab.mutex.Unlock()
// 以每個數(shù)據(jù)包最多maxNeighbors的塊的形式發(fā)送鄰居叉寂,以保持在數(shù)據(jù)包大小限制以下。
p := neighborsV4{Expiration: uint64(time.Now().Add(expiration).Unix())}
var sent bool
for _, n := range closest { //掃描這些最近的節(jié)點列表总珠,然后一個包一個包的發(fā)送給對方
if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
p.Nodes = append(p.Nodes, nodeToRPC(n))
}
if len(p.Nodes) == maxNeighbors {
t.send(from, fromID, &p)//給對方發(fā)送 neighborsPacket 包屏鳍,里面包含節(jié)點列表
p.Nodes = p.Nodes[:0]
sent = true
}
}
if len(p.Nodes) > 0 || !sent {
t.send(from, fromID, &p)
}
}
首先先確定最近的節(jié)點,再一個包一個包的發(fā)給對方局服,并校驗節(jié)點的IP钓瞭,最后把有效的節(jié)點發(fā)送給請求方。
涉及的結(jié)構體:
UDP
- conn :接口淫奔,包括了從UDP中讀取和寫入山涡,關閉UDP連接以及獲取本地地址。
- netrestrict:IP網(wǎng)絡列表
- localNode:本地節(jié)點
- tab:路由表
Table
buckets:所有節(jié)點都加到這個里面唆迁,按照距離
nursery:啟動節(jié)點
rand:隨機來源
ips:跟蹤IP鸭丛,確保IP中最多N個屬于同一網(wǎng)絡范圍
-
net: UDP 傳輸?shù)慕涌?/p>
- 返回本地節(jié)點
- 將enrRequest發(fā)送到給定的節(jié)點并等待響應
- findnode向給定節(jié)點發(fā)送一個findnode請求,并等待該節(jié)點最多發(fā)送了k個鄰居
- 返回查找最近的節(jié)點
- 將ping消息發(fā)送到給定的節(jié)點唐责,然后等待答復
以下是table的結(jié)構圖:
思維導圖
參考文檔
http://mindcarver.cn/ ????????
https://github.com/blockchainGuide/ ????????
https://www.cnblogs.com/xiaolincoding/p/12571184.html
http://qjpcpu.github.io/blog/2018/01/29/shen-ru-ethereumyuan-ma-p2pmo-kuai-ji-chu-jie-gou/
http://www.reibang.com/p/b232c870dcd2