p2p網(wǎng)絡(luò)從底層到上層可以分為3層,地址 連接 節(jié)點(diǎn)稠集,每一層都有自己的功能
聲明:文章代碼和源碼有不一致地方
btcd的p2p網(wǎng)絡(luò)之地址
主要有四個(gè)結(jié)構(gòu)體,兩兩對(duì)應(yīng)
AddrManager
serializedAddrManager
KnownAddress
serializedKnownAddress
我們先看peer.json
中的內(nèi)容,這個(gè)就是和serializedAddrManager
的內(nèi)容對(duì)應(yīng)的削茁。
//peers.json
{
"Version": 1,
"Key": [233,19,87,131,183,155,......,231,78,82,150,10,102],
"Addresses": [
{
"Addr": "109.157.120.169:8333",
"Src": "104.172.5.90:8333",
"Attempts": 0,
"TimeStamp": 1514967959,
"LastAttempt": -62135596800,
"LastSuccess": -62135596800
},
......
],
"NewBuckets": [
[
"[2001:0:9d38:78cf:3cb1:bb2:ab6f:e8b4]:8333",
"196.209.239.229:8333",
......
"65.130.177.198:8333"
],
......
[
"125.227.159.115:8333",
......
"alhlegtjkdmbqsvt.onion:8333",
......
"79.250.188.226:8333"
]
],
"TriedBuckets": [
[
"5.9.165.181:8333",
......
"5.9.17.24:8333"
],
[
"95.79.50.90:8333",
......
"[2a02:c207:2008:9136::1]:8333"
]
]
}
start()
start()先從peer.json中加載撑教,然后開(kāi)啟一個(gè)goroutine定期進(jìn)行跟蹤和添加地址厕怜。
func (a *AddrManager) Start(){
//alter stared ?
if atomic.AddInt32(&a.started,1) != 1{
log.Trace("AddrManager has stared")
return
}
log.Trace("Starting address manager")
// load peers we already know about from file
// 從文件中加載peers
a.loadPeers()
// start the address ticker to save addresses periodically(定期).
a.wg.Add(1) // 在下面的goroutine中锦溪,a.wg.Done()之后,start才會(huì)結(jié)束
go a.addressHandler()
}
我們?cè)倏?code>a.loadPeers(),主要是調(diào)用deserializePeers
,就是把peer.json中的文件反序列化综苔。主要是把Addresses``NewBuckets``TriedBuckets
,解析到AddrManager
的 addrIndex
addrNew
addrTried
func (a *AddrManager) deserializePeers(filePath string) error {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return nil
}
r, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("opening file:%v error:%v", filePath, err)
}
defer r.Close()
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam)
if err != nil {
return fmt.Errorf("error reading %s: %v", filePath, err)
}
// 檢查版本
if sam.Version != serialisationVersion {
return fmt.Errorf("unknown version %v in serialized "+
"addrmanager", sam.Version)
}
copy(a.key[:], sam.Key[:])
// 先解析Addresses
for _, v := range sam.Addresses {
ka := new(KnownAddress)
ka.na, err = a.DeserializeNetAddress(v.Addr)
if err != nil {
return fmt.Errorf("failed to deserialize netaddress "+
"%s: %v", v.Addr, err)
}
ka.srcAddr, err = a.DeserializeNetAddress(v.Src)
if err != nil {
return fmt.Errorf("failed to deserialize netaddress "+
"%s: %v", v.Src, err)
}
ka.attempts = v.Attempts
ka.lastattempt = time.Unix(v.LastAttempt, 0)
ka.lastsuccess = time.Unix(v.LastAttempt, 0)
// 遍歷sam.Addresses中的每一項(xiàng),然后辨析成一個(gè)KnownAddress后位岔,
// 添加到addrManager.addrIndex,是一個(gè)map
a.addrIndex[NetAddressKey(ka.na)] = ka
}
// 遍歷解析Newbuckets
for i := range sam.NewBuckets {
for _, val := range sam.NewBuckets[i] {
// 先看看addrIndex中有沒(méi)有這個(gè)ip:port
ka, ok := a.addrIndex[val]
if !ok {
return fmt.Errorf("newbucket contains %s but "+
"none in address list", val)
}
// addrIndex中已經(jīng)有了
if ka.refs == 0 {
a.nNew ++
}
ka.refs ++
a.addrNew[i][val] = ka
}
}
// 遍歷解析TriedBuckets
for i := range sam.TriedBuckets {
for _, val := range sam.TriedBuckets[i] {
ka, ok := a.addrIndex[val]
if !ok {
return fmt.Errorf("Newbucket contains %s but "+
"none in address list", val)
}
ka.tried = true
a.nTried++
// 一個(gè)bucket到這里就成了一個(gè)數(shù)組
a.addrTried[i].PushBack(ka)
}
}
// 檢查,保證一個(gè)地址要么在 NewBuckets中如筛,要么在TriedBuckets
for k, v := range a.addrIndex {
if v.refs == 0 && !v.tried {
return fmt.Errorf("address %s after serialisation "+
"with no references", k)
}
if v.refs > 0 && v.tried {
return fmt.Errorf("address %s after serialisation "+
"which is both new and tried!", k)
}
}
return nil
}
然后調(diào)用a.addressHandler(),主要邏輯就是定期保存地址
// addressHandler is the main handler for the address manager. It must be run
// as a goroutine.
func (a *AddrManager) addressHandler() {
dumpAddressTicker := time.NewTicker(dumpAddressInterval)
defer dumpAddressTicker.Stop()
out:
for {
select {
case <-dumpAddressTicker.C:
a.savePeers()
case <-a.quit:
break out
}
}
a.savePeers()
a.wg.Done() // start方法中已經(jīng)有wg.Add(1)在等著了
log.Trace("Address handler done")
}
a.savePeers(),大家可以查看源碼
updateAddress
節(jié)點(diǎn)直接交換etaddr和addr消息時(shí)抒抬,就會(huì)收到addr信息杨刨,調(diào)用AddAddress()
是實(shí)際上都是調(diào)用updateAddress
如果我們已經(jīng)有了,就進(jìn)行一些修改
如果我們沒(méi)有擦剑,就添加妖胀,如果滿(mǎn)了的話(huà),就把老的給移除掉
NewBucket到TriedBucket
在節(jié)點(diǎn)獲取地址惠勒,并建立peer連接成功后赚抡,會(huì)調(diào)用Good方法。就是說(shuō)這個(gè)地址是好的纠屋,可以從NewBucket移到TriedBucket了涂臣。
GetAddress
這個(gè)是供外面獲取地址的方法
按照50%的記錄隨機(jī)從NewBucket或者TriedBucket中選擇。隨機(jī)選擇bucket后售担,從中隨機(jī)選擇地址赁遗。選擇出來(lái)的地址要判斷一下
randval := a.rand.Intn(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %v from new bucket",NetAddressKey(ka.na))
return ka
}
決定是不是用這個(gè)選中地址,主要是由factor和ka.chance()決定族铆。
facket依次遞增
而ka.chance()則和這個(gè)地址的其他屬性密切相關(guān)岩四。比如上次嘗試到現(xiàn)在間隔,失敗的次數(shù)哥攘。
我們完全可以去除這個(gè)包剖煌,設(shè)置幾個(gè)固定的地址,但是這幾個(gè)地址不能用后逝淹,我們的節(jié)點(diǎn)也就沒(méi)法同步了末捣。就不是p2p了。搞這么復(fù)雜创橄,主要還是為了防止攻擊箩做。
AddLocalAddress
// AddLocalAddress adds na to the list of known local addresses to advertise with the given priority.
// 地址會(huì)先加到AddrManager.localAddresses
func (a *AddrManager) AddLocalAddress(na *wire.NetAddress,priority AddressPriority)error{
// TODO 判斷不是是公網(wǎng)路由
a.lamtx.Lock()
defer a.lamtx.Unlock()
// 得到特定格式的ip:端口
key := NetAddressKey(na)
// 判斷l(xiāng)ocalAddresses有沒(méi)有這個(gè)ip
la,ok := a.localAddresses[key]
if !ok || la.score < priority{
if ok {
la.score = priority + 1
}else{
a.localAddresses[key] = &localAddress{
na:na,
score:priority,
}
}
}
return nil
}