go-ethereum/whisper
先從whipserv6
看起查吊。
Whisper
定義
Whisper
代表了在以太坊網(wǎng)絡(luò)中的一個(gè)隱秘(dark
)通信接口霹娄,使用的是以太坊自有的P2P
通信層翎苫。所謂dark
兔院,意思是沒(méi)有可靠的方法可以來(lái)追蹤數(shù)據(jù)包(見(jiàn)于Specs)
// whisper/whisperv6/whisper.go
type Whisper struct {
// 協(xié)議描述和參數(shù)
protocol p2p.Protocol
// 訂閱功能安裝的消息過(guò)濾器
filters *Filters
// 私鑰存儲(chǔ)
privateKeys map[string]*ecdsa.PrivateKey
// 對(duì)稱密鑰存儲(chǔ)
symKeys map[string][]byte
// 和密鑰存儲(chǔ)相關(guān)的互斥鎖
keyMu sync.RWMutex
// 互斥地同步消息和過(guò)期池
poolMu sync.RWMutex
// 當(dāng)前由此節(jié)點(diǎn)跟蹤的信封(envelopes)池
envelopes map[common.Hash]*Envelope
// 消息過(guò)期池
expirations map[uint32]*set.SetNonTS
// 互斥地同步活躍節(jié)點(diǎn)集合
peerMu sync.RWMutex
// 當(dāng)前活躍節(jié)點(diǎn)的集合
peers map[*Peer]struct{}
// 普通whisper消息的消息隊(duì)列
messageQueue chan *Envelope
// 點(diǎn)對(duì)點(diǎn)消息的消息隊(duì)列(不會(huì)再被做任何轉(zhuǎn)發(fā)的消息)
p2pMsgQueue chan *Envelope
// 用于溫和的退出的channel
quit chan struct{}
// 存儲(chǔ)了配置項(xiàng)的設(shè)置信息,以便可以動(dòng)態(tài)修改谆吴。
settings syncmap.Map
// 允許處理whisper相關(guān)消息的最大時(shí)長(zhǎng)(以秒為單位)
syncAllowance int
// 指示這個(gè)節(jié)點(diǎn)是否是純粹的輕客戶端(不轉(zhuǎn)發(fā)任何消息)
lightClient bool
// 用于保護(hù)stats
statsMu sync.Mutex
// whisper節(jié)點(diǎn)的統(tǒng)計(jì)信息
stats Statistics
// MailServer接口
mailServer MailServer
}
Whisper
節(jié)點(diǎn)的配置Config
由于新建一個(gè)Whisper
實(shí)例倒源,需要接收Config
參數(shù),所以先來(lái)看看Config
的定義句狼,見(jiàn)于config.go
文件中相速。
// whisper/whisperv6/config.go
// Config描述了一個(gè)whisper節(jié)點(diǎn)的配置狀態(tài)
type Config struct {
// 最大消息長(zhǎng)度
MaxMessageSize uint32 `toml:",omitempty"`
// 接受工作量證明的最小值
MinimumAcceptedPOW float64 `toml:",omitempty"`
}
這里面提到了一個(gè)MinimumAcceptedPOW
,是關(guān)于工作量證明的鲜锚,我在Proof of Work找到了這個(gè)設(shè)計(jì)的初衷突诬。
之所以用到
POW
,主要是為了防止垃圾消息芜繁,也同樣是為了緩解網(wǎng)絡(luò)的壓力旺隙。計(jì)算POW
的成本可以理解為“你想讓網(wǎng)絡(luò)將你的消息存儲(chǔ)一段時(shí)間(即TTL
時(shí)間段),因此需要分配資源骏令,那么你就需要為此支付價(jià)格”蔬捷。所需的POW
應(yīng)該與消息大小和TTL
成正比。
代碼中還提供了一個(gè)默認(rèn)配置樣例DefaultConfig
榔袋。
// whisper/whisperv6/config.go
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize, // uint32(1024 * 1024)
MinimumAcceptedPOW: DefaultMinimumPoW, // 0.2
}
新建一個(gè)Whisper客戶端
搞明白了Config周拐,開(kāi)始新建一個(gè)Whisper客戶端,whisper.go中的New()方法會(huì)新建一個(gè)Whisper客戶端凰兑,該客戶端可以在以太坊P2P網(wǎng)絡(luò)中進(jìn)行通信妥粟。
// whisper/whisperv6/whisper.go
func New(cfg *Config) *Whisper {
// 如果傳入的cfg是nil,則使用上面提到的DefaultConfig
if cfg == nil {
cfg = &DefaultConfig
}
// 初始化Whisper結(jié)構(gòu)體
whisper := &Whisper{
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[common.Hash]*Envelope),
expirations: make(map[uint32]*set.SetNonTS),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}),
// 最長(zhǎng)處理時(shí)間默認(rèn)為10s
syncAllowance: DefaultSyncAllowance,
}
// 使用NewFilters為這個(gè)whisper新建過(guò)濾器
whisper.filters = NewFilters(whisper)
// 將<minPowIdx, cfg.MinimumAcceptedPOW>放入settings內(nèi)存中
whisper.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW)
// 將<maxMsgSizeIdx, cfg.MaxMessageSize>放入settings內(nèi)存中
whisper.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize)
// 指示消息隊(duì)列是否溢出
whisper.settings.Store(overflowIdx, false)
// p2p whisper子協(xié)議規(guī)則
whisper.protocol = p2p.Protocol{
Name: ProtocolName, // "shh"
Version: uint(ProtocolVersion), // uint(uint64(6))
Length: NumberOfMessageCodes, // 128
// 啟動(dòng)運(yùn)行
Run: whisper.HandlePeer,
// 節(jié)點(diǎn)信息
NodeInfo: func() interface{} {
return map[string]interface{}{
"version": ProtocolVersionStr, // "6.0"
// 讀取whisper.settings[maxMsgSizeIdx]
"maxMessageSize": whisper.MaxMessageSize(),
// 讀取whisper.settings[minPowIdx]
// 如果為空或出錯(cuò)吏够,則返回DefaultMinimumPoW勾给,即0.2
"minimumPoW": whisper.MinPow(),
}
},
}
return whisper
}
總體來(lái)說(shuō),New()
主要是創(chuàng)建了一個(gè)空的whisper{}
結(jié)構(gòu)體锅知,并盡力初始化播急,比較重要的地方,就是初始化whisper.protocol
子協(xié)議字段售睹,這里面涉及到很多魔數(shù)桩警。
關(guān)于版本號(hào)6
,wiki上有這么一段話:
Tor
系統(tǒng)有一個(gè)協(xié)議昌妹,可以在兩個(gè)節(jié)點(diǎn)之間建立連接捶枢,但它們互相并不知道對(duì)方在哪里沉噩,這是用于隱藏服務(wù)的Rendezvous協(xié)議。隱藏的服務(wù)(相當(dāng)于TCP/IP
中一個(gè)監(jiān)聽(tīng)端口的服務(wù))會(huì)選擇隨機(jī)數(shù)量的”介紹人”節(jié)點(diǎn)(我想柱蟀,這個(gè)隨機(jī)數(shù)量一般都是6
)。為了做到這一點(diǎn)蚜厉,它會(huì)和每一個(gè)介紹人都建立標(biāo)準(zhǔn)的3跳鏈(3-hop chain
)长已,當(dāng)一個(gè)用戶想和一個(gè)隱藏服務(wù)建立連接時(shí),它們就會(huì)傳播一個(gè)請(qǐng)求去連接與特定公鑰相關(guān)的隱藏服務(wù)昼牛。
關(guān)于消息代號(hào)數(shù)量128
术瓮,p2p.Protocol
結(jié)構(gòu)體中的Lenght
字段表示某一子協(xié)議中消息代號(hào)的總數(shù)量,而在whisper/whisperv6/doc.go
的常量表中可以看到根據(jù)EIP-627
提案贰健,whisper
協(xié)議的消息代號(hào)共有128
種胞四,故為128
HandlePeer
:發(fā)現(xiàn)新節(jié)點(diǎn)后做什么
在新建whisper
客戶端時(shí),whisper.protocol
結(jié)構(gòu)體用HandlerPeer()
填充了Run
字段伶椿,當(dāng)和一個(gè)節(jié)點(diǎn)協(xié)商該協(xié)議時(shí)辜伟,會(huì)在一個(gè)新的goroutine
上調(diào)用Run
代表的方法。在whisper
這里脊另,當(dāng)whisper
子協(xié)議連接被協(xié)商的時(shí)候导狡,下面的P2P
層會(huì)調(diào)用HandlePeer
。
// whisper/whisperv6/whisper.go
func (whisper *Whisper) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
// 新建一個(gè)whisper peer實(shí)例偎痛,接下來(lái)開(kāi)始追蹤它
whisperPeer := newPeer(whisper, peer, rw)
// 同步的將這個(gè)peer實(shí)例添加到追蹤列表中
whisper.peerMu.Lock()
whisper.peers[whisperPeer] = struct{}{}
whisper.peerMu.Unlock()
// 當(dāng)退出時(shí)旱捧,將該peer從追蹤列表中刪除
defer func() {
whisper.peerMu.Lock()
delete(whisper.peers, whisperPeer)
whisper.peerMu.Unlock()
}()
// 握手,狀態(tài)更新
if err := whisperPeer.handshake(); err != nil {
return err
}
whisperPeer.start() // 啟動(dòng)通信
defer whisperPeer.stop() // 退出時(shí)停止通信
return whisper.runMessageLoop(whisperPeer, rw) // 開(kāi)始消息循環(huán)處理
}
這里主要是在發(fā)現(xiàn)一個(gè)新的節(jié)點(diǎn)時(shí)踩麦,whisper
協(xié)議所做的操作枚赡,大部分操作都是和Peer
有關(guān)。下面就來(lái)看看whisper
的Peer
谓谦。
whisper
中的Peer
在peer.go
中有關(guān)于Peer
的定義贫橙,Peer
代表了在whisper
協(xié)議中的一個(gè)節(jié)點(diǎn)連接。
// whisper/whisperv6/peer.go
type Peer struct {
// 本地whisper節(jié)點(diǎn)
host *Whisper
// 遠(yuǎn)程whisper節(jié)點(diǎn)
peer *p2p.Peer
// 消息讀寫句柄
ws p2p.MsgReadWriter
// 遠(yuǎn)程節(jié)點(diǎn)是否可信
trusted bool
// 遠(yuǎn)程節(jié)點(diǎn)要求的POW值
powRequirement float64
// 布隆過(guò)濾器鎖
bloomMu sync.Mutex
// 布隆過(guò)濾器
bloomFilter []byte
// 是否是全節(jié)點(diǎn)過(guò)濾器
fullNode bool
// 存儲(chǔ)該節(jié)點(diǎn)已知的消息反粥,以避免浪費(fèi)帶寬
known *set.Set // Messages already known by the peer to avoid wasting bandwidth
// 優(yōu)雅的退出連接
quit chan struct{}
}
新建一個(gè)遠(yuǎn)程節(jié)點(diǎn)的本地代理Peer
// whisper/whisperv6/peer.go
// 新建一個(gè)whisper協(xié)議下的peer對(duì)象料皇,但這個(gè)方法并不會(huì)去握手。
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false, // 默認(rèn)不可信
powRequirement: 0.0, // 初始化為0
known: set.New(),
quit: make(chan struct{}),
bloomFilter: MakeFullNodeBloom(), //創(chuàng)建一個(gè)長(zhǎng)度為64的布隆過(guò)濾器星压,并且所有位初始化為0xFF
fullNode: true, // 已經(jīng)初始化為全節(jié)點(diǎn)過(guò)濾器
}
}
和遠(yuǎn)程節(jié)點(diǎn)握手:handshake()
handshake()向遠(yuǎn)程節(jié)點(diǎn)發(fā)送協(xié)議初始化狀態(tài)信息践剂,同時(shí)也會(huì)驗(yàn)證遠(yuǎn)程節(jié)點(diǎn)的狀態(tài)。狀態(tài)消息有以下幾點(diǎn)要素:
消息代號(hào)為statusCode娜膘,即0
消息的payload是一個(gè)列表:[whisper協(xié)議版本號(hào)(即6), 本地節(jié)點(diǎn)需要的最小POW值, 本地節(jié)點(diǎn)感興趣的消息過(guò)濾器]
只有版本號(hào)是強(qiáng)制驗(yàn)證的逊脯,后續(xù)2個(gè)參數(shù)都是可選的,即本地節(jié)點(diǎn)要求的最小POW和本地節(jié)點(diǎn)感興趣的消息過(guò)濾器
// whisper/whisperv6/peer.go
func (peer *Peer) handshake() error {
// 異步地發(fā)送握手狀態(tài)消息
errc := make(chan error, 1) // error channel
go func() {
pow := peer.host.MinPow() // 獲取本地節(jié)點(diǎn)(自己)的POW最小需求值
powConverted := math.Float64bits(pow) // 將float64轉(zhuǎn)化為uint64
// BloomFilter()為本地節(jié)點(diǎn)所有感興趣的話題返回一個(gè)集成的布隆過(guò)濾器
// 要求遠(yuǎn)程節(jié)點(diǎn)只能發(fā)送被通告的布隆過(guò)濾器中匹配的消息
// 如果不匹配竣贪,則被認(rèn)為是垃圾消息巩螃,并將與該遠(yuǎn)程節(jié)點(diǎn)斷開(kāi)連接。
bloom := peer.host.BloomFilter()
// 通過(guò)讀寫句柄向遠(yuǎn)程節(jié)點(diǎn)發(fā)送狀態(tài)消息(狀態(tài)碼為0匕争,表示這是一個(gè)狀態(tài)消息)
// 這個(gè)消息包含了[whisper的協(xié)議版本號(hào),POW值甘桑,消息相關(guān)的布隆過(guò)濾器]
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom)
}()
// 獲取遠(yuǎn)程節(jié)點(diǎn)狀態(tài)數(shù)據(jù)跑杭,并且驗(yàn)證協(xié)議是否匹配
packet, err := peer.ws.ReadMsg() // 從讀寫句柄中讀取遠(yuǎn)程消息
if err != nil {
return err
}
// 正常情況下,第一個(gè)數(shù)據(jù)包的代號(hào)應(yīng)該是狀態(tài)碼窄做,否則就報(bào)錯(cuò)
if packet.Code != statusCode {
return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
// 數(shù)據(jù)包是rlp序列化格式愧驱,需要進(jìn)行解碼
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
_, err = s.List()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
}
// 讀取前8個(gè)字節(jié)冯键,作為協(xié)議版本號(hào)
//(上面也看到了,狀態(tài)消息的格式為一個(gè)列表:[whisper的協(xié)議版本號(hào)庸汗,POW值惫确,消息相關(guān)的布隆過(guò)濾器])
peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
// 如果獲取到的協(xié)議版本號(hào)不是uint64(6),那么說(shuō)明協(xié)議不匹配蚯舱。
if peerVersion != ProtocolVersion {
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
}
// 只有版本號(hào)是強(qiáng)制要求的改化,后續(xù)的參數(shù)都是可選的
powRaw, err := s.Uint() // 繼續(xù)讀取8個(gè)字節(jié),作為POW值
// 如果沒(méi)出錯(cuò)枉昏,則驗(yàn)證pow和bloomfilter陈肛;如果出錯(cuò),也沒(méi)什么兄裂,繼續(xù)往下走句旱,即后續(xù)參數(shù)是可選的
if err == nil {
pow := math.Float64frombits(powRaw) // uint64 -> float64
// pow無(wú)窮大,不是數(shù)字晰奖,小于0谈撒,都會(huì)報(bào)錯(cuò)
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
}
// 用pow更新peer.powRequirement
peer.powRequirement = pow
var bloom []byte
err = s.Decode(&bloom) //將剩余的部分全部解碼,并存至bloom[]中
if err == nil {
sz := len(bloom)
// 初始化時(shí)匾南,布隆過(guò)濾器的長(zhǎng)度都是BloomFilterSize啃匿,即64,否則就驗(yàn)證報(bào)錯(cuò)。
if sz != BloomFilterSize && sz != 0 {
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
}
// 用解碼后的bloom更新peer.bloomFilter,以及peer.fullNode
peer.setBloomFilter(bloom)
}
}
// 阻塞等待通道是否返回錯(cuò)誤溯乒,如果出錯(cuò)夹厌,則報(bào)錯(cuò)。
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}
開(kāi)始通信:start()
start()初始化