目前在項目的主線上已經(jīng)實現(xiàn)了kad協(xié)議,而kBucket作為存儲節(jié)點的一環(huán),值得進行一次分析。
Kbucket簡要介紹
在kad中热鞍,peer每獲取到一個節(jié)點的信息,會將其存放到自己的KBucket中衔彻。每個peer_id由公鑰經(jīng)過sha2_256運算之后得到薇宠,長度為32個字節(jié)。每個節(jié)點都可以與另外的節(jié)點經(jīng)過異或運算得到最長前綴艰额,即從第一位開始的連續(xù)0的個數(shù)澄港。0越多,代表兩個節(jié)點越接近悴晰,最多可以有32*8個連續(xù)的0。所以對KBucket而言逐工,桶的最大個數(shù)為256個铡溪。
KBucket結(jié)構(gòu)
在rust-libp2p中,每個KBucket內(nèi)部維護了一個Node類型的ArrayVector泪喊,大小為20棕硫,其中Node結(jié)構(gòu)使用key存放peer_id,value存放地址信息袒啼;first_connected_pos作為KBucket的連接標(biāo)記位哈扮,記錄可被清理的節(jié)點下標(biāo);同時還提供了apply_pending的屬性蚓再,存儲預(yù)備插入的節(jié)點
與rust-libp2p不同的是滑肉,libp2p-rs使用了一個新的設(shè)計方法。因為peerstore的存在摘仅,我們不需要在KBucket里面存儲peer所對應(yīng)的地址信息靶庙,相對應(yīng)的,peer相關(guān)的一些連接信息娃属,如最后連接時間六荒,就可以作為新的value被存放到node中护姆。這樣設(shè)計還有一個好處,就是也不需要apply_pending和connect_pos這些屬性了掏击,每個peer可以單獨維護一個自己的連接狀態(tài)信息卵皂,KBucket清理時,可以直接用filter的方式執(zhí)行相關(guān)操作砚亭。綜合以上情況灯变,我們設(shè)計了一個結(jié)構(gòu)體PeerInfo,用來作為新的Value類型
PeerInfo中記錄了三個屬性
/// The information of a peer in Kad routing table.
#[derive(Clone, Debug)]
pub struct PeerInfo {
/// The time instant at which we talk to the remote peer.
/// Sets to `Some` if it is deemed to be alive. Otherwise,
/// it is set to `None`
aliveness: Option<Instant>,
/// The time this peer was added to the routing table.
added_at: Instant,
/// reserved for future use?
replaceable: bool,
}
aliveness表示最后通信時刻钠惩,added_at記錄該節(jié)點被添加到路由表的時刻柒凉,replaceable標(biāo)記這條信息是否可以被替換(目前未啟用)。通過這種方式篓跛,在對KBucket進行增刪操作時膝捞,能夠更加容易判斷相關(guān)peer的狀態(tài)。
代碼分析
下面以KBucketTable的try_add_peer()進行分析:
- 首先判斷是否為已存在節(jié)點愧沟。如果存在蔬咬,且屬于迭代查詢過程中調(diào)用的方法,那就需要更新peer的最后通信時間
- 如果不是已存在節(jié)點沐寺,需要分情況討論
- 如果調(diào)用insert方法能夠成功添加林艘,就需要在peerstore中將該節(jié)點的GC標(biāo)記位設(shè)置為false,防止因為GC導(dǎo)致地址信息被清理混坞,進而重復(fù)進行迭代查詢狐援。
- 如果添加失敗,說明KBucket滿了究孕,需要進行清理啥酱。首先通過filter和min_by找出最久未通信的節(jié)點,將其從KBucket中驅(qū)逐厨诸,同時peerstore中修改為可被GC镶殷。之后再將新的節(jié)點插入到KBucket中,peerstore標(biāo)記不進行GC
fn try_add_peer(&mut self, peer: PeerId, queried: bool) {
let timeout = self.check_kad_peer_interval;
let now = Instant::now();
let key = kbucket::Key::new(peer.clone());
log::debug!(
"trying to add a peer: {:?} bucket-index={:?}, query={}",
peer,
self.kbuckets.bucket_index(&key),
queried
);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry) => {
// already in RT, update the node's aliveness if queried is true
if queried {
entry.value().set_aliveness(Some(Instant::now()));
log::debug!("{:?} updated: {:?}", peer, entry.value());
}
}
kbucket::Entry::Absent(mut entry) => {
let info = PeerInfo::new(queried);
if entry.insert(info.clone()) {
log::debug!("Peer added to routing table: {} {:?}", peer, info);
// pin this peer in PeerStore to prevent GC from recycling multiaddr
if let Some(s) = self.swarm.as_ref() {
s.pin(&peer)
}
} else {
log::debug!("Bucket full, trying to replace an old node for {}", peer);
// try replacing an 'old' peer
let bucket = entry.bucket();
let candidate = bucket
.iter()
.filter(|n| n.value.get_aliveness().map_or(true, |a| now.duration_since(a) > timeout))
.min_by(|x, y| x.value.get_aliveness().cmp(&y.value.get_aliveness()));
if let Some(candidate) = candidate {
let key = candidate.key.clone();
let evicted = bucket.remove(&key);
log::debug!("Bucket full. Peer node added, {} replacing {:?}", peer, evicted);
// unpin the evicted peer
if let Some(s) = self.swarm.as_ref() {
s.unpin(key.preimage())
}
// now try to insert the value again
let _ = entry.insert(info);
// pin this peer in PeerStore to prevent GC from recycling multiaddr
if let Some(s) = self.swarm.as_ref() {
s.pin(&peer)
}
} else {
log::debug!("Bucket full, but can't find an replaced node, give up {}", peer);
}
}
}
_ => {}
}
}
Netwarps 由國內(nèi)資深的云計算和分布式技術(shù)開發(fā)團隊組成微酬,該團隊在金融绘趋、電力、通信及互聯(lián)網(wǎng)行業(yè)有非常豐富的落地經(jīng)驗颗管。Netwarps 目前在深圳陷遮、北京均設(shè)立了研發(fā)中心,團隊規(guī)模30+垦江,其中大部分為具備十年以上開發(fā)經(jīng)驗的技術(shù)人員拷呆,分別來自互聯(lián)網(wǎng)、金融、云計算茬斧、區(qū)塊鏈以及科研機構(gòu)等專業(yè)領(lǐng)域腰懂。
Netwarps 專注于安全存儲技術(shù)產(chǎn)品的研發(fā)與應(yīng)用,主要產(chǎn)品有去中心化文件系統(tǒng)(DFS)项秉、去中心化計算平臺(DCP)绣溜,致力于提供基于去中心化網(wǎng)絡(luò)技術(shù)實現(xiàn)的分布式存儲和分布式計算平臺,具有高可用娄蔼、低功耗和低網(wǎng)絡(luò)的技術(shù)特點怖喻,適用于物聯(lián)網(wǎng)、工業(yè)互聯(lián)網(wǎng)等場景岁诉。