/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items identified by string numbers. The protocol is as follows: 1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers. 2) Each remote peer responds with a digest of its messages and returns that NONCE. 3) The initiator checks the validity of the NONCEs received, aggregates the digests, and crafts a request containing specific item ids it wants to receive from each remote peer and then sends each request to its corresponding peer. 4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE. Other peer Initiator O <-------- Hello <NONCE> ------------------------- O /|\ --------- Digest <[3,5,8, 10...], NONCE> --------> /|\ | <-------- Request <[3,8], NONCE> ----------------- | / \ --------- Response <[item3, item8], NONCE>-------> / \ */
- 先看下官方的解釋好了五慈。
- 還記得之前的RemoteStateRequest篇么格了?那個(gè)似乎也是去拉取block啊假抄,這里似乎也是啊劫拢,下面我們具體看下陶冷,為什么要有這兩種機(jī)制碴开,他們所面對(duì)的場(chǎng)景是什么。
Hello
發(fā)起點(diǎn)
func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
engine := &PullEngine{
PullAdapter: participant,
stopFlag: int32(0),
state: util.NewSet(),
item2owners: make(map[string][]string),
peers2nonces: make(map[string]uint64),
nonces2peers: make(map[uint64]string),
acceptingDigests: int32(0),
acceptingResponses: int32(0),
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
digFilter: df,
digestWaitTime: util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime),
requestWaitTime: util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime),
responseWaitTime: util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime),
}
go func() {
for !engine.toDie() {
time.Sleep(sleepTime)
if engine.toDie() {
return
}
engine.initiatePull()
}
}()
return engine
}
func (engine *PullEngine) initiatePull() {
engine.lock.Lock()
defer engine.lock.Unlock()
engine.acceptDigests()
for _, peer := range engine.SelectPeers() {
nonce := engine.newNONCE()
engine.outgoingNONCES.Add(nonce)
engine.nonces2peers[nonce] = peer
engine.peers2nonces[peer] = nonce
engine.Hello(peer, nonce)
}
time.AfterFunc(engine.digestWaitTime, func() {
engine.processIncomingDigests()
})
}
- acceptDigests表示正式開(kāi)始一次基于hello-digest流程青伤,串行執(zhí)行。
- 當(dāng)PullEngine啟動(dòng)的時(shí)候每隔一定時(shí)間會(huì)隨機(jī)選取節(jié)點(diǎn)發(fā)起Hello請(qǐng)求
- 這里需要注意的是殴瘦,會(huì)記錄Hello請(qǐng)求的Nonce和peer節(jié)點(diǎn)的關(guān)系狠角。應(yīng)該是后面會(huì)以這個(gè)來(lái)跟蹤執(zhí)行情況。
- 發(fā)出的Hello會(huì)記錄到outgoingNONCES里面
- 超時(shí)的情況蚪腋,之后再講
請(qǐng)求
func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
helloMsg := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Content: &proto.GossipMessage_Hello{
Hello: &proto.GossipHello{
Nonce: nonce,
Metadata: nil,
MsgType: p.config.MsgType,
},
},
}
p.logger.Debug("Sending", p.config.MsgType, "hello to", dest)
sMsg, err := helloMsg.NoopSign()
if err != nil {
p.logger.Errorf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
- 沒(méi)什么好講的丰歌,注意下nonce就好。下面看下對(duì)方收到hello怎么處理屉凯。
處理
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
engine.incomingNONCES.Add(nonce)
time.AfterFunc(engine.requestWaitTime, func() {
engine.incomingNONCES.Remove(nonce)
})
a := engine.state.ToArray()
var digest []string
filter := engine.digFilter(context)
for _, item := range a {
dig := item.(string)
if !filter(dig) {
continue
}
digest = append(digest, dig)
}
if len(digest) == 0 {
return
}
engine.SendDigest(digest, nonce, context)
}
- 這里也是一樣會(huì)記錄收到的hello請(qǐng)求的nonce到本地incomingNONCES
- 將本地的state里面的blocknums序號(hào)集組裝成Digest返回立帖,這里以blockpullengine為例,每次收到block消息時(shí)候神得,都會(huì)將block序號(hào)冗余到engine的state里面厘惦。
- 下面我們看下Digest的情況
Digest
請(qǐng)求
func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) {
digMsg := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
MsgType: p.config.MsgType,
Nonce: nonce,
Digests: util.StringsToBytes(digest),
},
},
}
remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
if p.logger.IsEnabledFor(zapcore.DebugLevel) {
p.logger.Debug("Sending", p.config.MsgType, "digest:", digMsg.GetDataDig().FormattedDigests(), "to", remotePeer)
}
context.(proto.ReceivedMessage).Respond(digMsg)
}
- 不會(huì)再生成新的Nonce,這里會(huì)沿用之前的nonce哩簿。
處理
func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()
defer engine.lock.Unlock()
for _, n := range digest {
if engine.state.Exists(n) {
continue
}
if _, exists := engine.item2owners[n]; !exists {
engine.item2owners[n] = make([]string, 0)
}
engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
}
}
- 基本就是統(tǒng)計(jì)本地沒(méi)有的block序號(hào),將序號(hào)所屬的peer節(jié)點(diǎn)記錄下來(lái)酝静,還記得發(fā)hello的時(shí)候的nonces2peers么节榜,當(dāng)時(shí)記錄了hello是發(fā)給哪個(gè)節(jié)點(diǎn)的。
- 這里需要注意的是item2owners别智,這里會(huì)最終組成一組映射宗苍,擁有某個(gè)block的peer列表
小結(jié)
好了,至此薄榛,整個(gè)流程處理了一半讳窟,發(fā)起點(diǎn)也收到了節(jié)點(diǎn)反饋的Digest數(shù)據(jù)。知道了自己和對(duì)方的差異的部分敞恋,那么什么時(shí)候開(kāi)始正式拉取數(shù)據(jù)呢丽啡?下面我們繼續(xù)往下深入。
Request
發(fā)起點(diǎn)
func (engine *PullEngine) initiatePull() {
...
time.AfterFunc(engine.digestWaitTime, func() {
engine.processIncomingDigests()
})
}
- 還記得這部分代碼么硬猫?就是hello的超時(shí)處理的部分补箍,很明顯了改执,Request的發(fā)起點(diǎn)就是等待digestWaitTime超時(shí)。
func (engine *PullEngine) processIncomingDigests() {
engine.ignoreDigests()
engine.lock.Lock()
defer engine.lock.Unlock()
requestMapping := make(map[string][]string)
for n, sources := range engine.item2owners {
// select a random source
source := sources[util.RandomInt(len(sources))]
if _, exists := requestMapping[source]; !exists {
requestMapping[source] = make([]string, 0)
}
// append the number to that source
requestMapping[source] = append(requestMapping[source], n)
}
engine.acceptResponses()
for dest, seqsToReq := range requestMapping {
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
}
time.AfterFunc(engine.responseWaitTime, engine.endPull)
}
- ignoreDigests表示hello-digest結(jié)束坑雅,開(kāi)始request-response流程
- 統(tǒng)計(jì)item2owners辈挂,計(jì)算本地沒(méi)有的block部分,準(zhǔn)備去擁有方去拉取
請(qǐng)求
func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
req := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataReq{
DataReq: &proto.DataRequest{
MsgType: p.config.MsgType,
Nonce: nonce,
Digests: util.StringsToBytes(items),
},
},
}
if p.logger.IsEnabledFor(zapcore.DebugLevel) {
p.logger.Debug("Sending", req.GetDataReq().FormattedDigests(), "to", dest)
}
sMsg, err := req.NoopSign()
if err != nil {
p.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
只需要注意nonce裹粤,其他沒(méi)什么好講的终蒂。說(shuō)明整個(gè)過(guò)程雙方都會(huì)密切關(guān)心這個(gè)流程的狀態(tài)。
處理
func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
if !engine.incomingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()
defer engine.lock.Unlock()
filter := engine.digFilter(context)
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) && filter(item) {
items2Send = append(items2Send, item)
}
}
if len(items2Send) == 0 {
return
}
go engine.SendRes(items2Send, context, nonce)
}
- 基本上就是拿到對(duì)方發(fā)來(lái)的請(qǐng)求遥诉,來(lái)滿足對(duì)方的需求拇泣,馬上來(lái)看看Response發(fā)送會(huì)組裝些什么?
Response
請(qǐng)求
func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) {
items2return := []*proto.Envelope{}
p.RLock()
defer p.RUnlock()
for _, item := range items {
if msg, exists := p.itemID2Msg[item]; exists {
items2return = append(items2return, msg.Envelope)
}
}
returnedUpdate := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataUpdate{
DataUpdate: &proto.DataUpdate{
MsgType: p.config.MsgType,
Nonce: nonce,
Data: items2return,
},
},
}
remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
p.logger.Debug("Sending", len(returnedUpdate.GetDataUpdate().Data), p.config.MsgType, "items to", remotePeer)
context.(proto.ReceivedMessage).Respond(returnedUpdate)
}
- itemID2Msg突那,這里暫存了收到的block
- 所以這里就是根據(jù)收到block序號(hào)列表挫酿,然后組裝items2return返回給對(duì)方。
處理
if res := msg.GetDataUpdate(); res != nil {
itemIDs = make([]string, len(res.Data))
items = make([]*proto.SignedGossipMessage, len(res.Data))
pullMsgType = ResponseMsgType
for i, pulledMsg := range res.Data {
msg, err := pulledMsg.ToGossipMessage()
if err != nil {
p.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
return
}
p.MsgCons(msg)
itemIDs[i] = p.IdExtractor(msg)
items[i] = msg
p.Lock()
p.itemID2Msg[itemIDs[i]] = msg
p.Unlock()
}
p.engine.OnRes(itemIDs, res.Nonce)
}
- MsgCons(msg)這里會(huì)將拉取的block push到payloads里面愕难,等待commit到本地早龟,并同步最新的height給好朋友們。至于怎么到payloads的猫缭,細(xì)節(jié)這里就不提了葱弟,有興趣的可以自己研究。
- 剩下的就是為了Pull機(jī)制來(lái)服務(wù)的猜丹,將收到的數(shù)據(jù)進(jìn)行下次pull的準(zhǔn)備芝加。加入本地state和itemID2Msg,什么的射窒,都跟前面的做一些呼應(yīng)藏杖。
補(bǔ)充
下面我們看下gossip里面是怎么使用這個(gè)PullEngine的,總的來(lái)說(shuō)分為兩塊脉顿,一是block蝌麸,前面也提到,另一個(gè)就是cert艾疟。
BlockPuller
func (gc *gossipChannel) createBlockPuller() pull.Mediator {
conf := pull.Config{
MsgType: proto.PullMsgType_BLOCK_MSG,
Channel: []byte(gc.chainID),
ID: gc.GetConf().ID,
PeerCountToSelect: gc.GetConf().PullPeerNum,
PullInterval: gc.GetConf().PullInterval,
Tag: proto.GossipMessage_CHAN_AND_ORG,
}
seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {
dataMsg := msg.GetDataMsg()
if dataMsg == nil || dataMsg.Payload == nil {
gc.logger.Warning("Non-data block or with no payload")
return ""
}
return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
}
adapter := &pull.PullAdapter{
Sndr: gc,
MemSvc: gc.memFilter,
IdExtractor: seqNumFromMsg,
MsgCons: func(msg *proto.SignedGossipMessage) {
gc.DeMultiplex(msg)
},
}
adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
gc.RLock()
height := gc.ledgerHeight
gc.RUnlock()
digests := digestMsg.Digests
digestMsg.Digests = nil
for i := range digests {
seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
if err != nil {
gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
continue
}
if seqNum >= height {
digestMsg.Digests = append(digestMsg.Digests, digests[i])
}
}
return digestMsg
}
return pull.NewPullMediator(conf, adapter)
}
- 這里決定了首先接收的是PullMsgType_BLOCK_MSG類型消息
- 然后會(huì)在DataMessage消息中提取SeqNum来吩,也就是blocknum
- 過(guò)濾掉收到消息的height低于本地賬本的部分,沒(méi)有意義啦蔽莱,既然本地已經(jīng)有了弟疆。
- 這里的MsgCons也就是消息的comsumer,最終會(huì)將收到的block提交到本地賬本
- 之后就是走engine的標(biāo)準(zhǔn)流程了
CertPuller
func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
conf := pull.Config{
MsgType: proto.PullMsgType_IDENTITY_MSG,
Channel: []byte(""),
ID: g.conf.InternalEndpoint,
PeerCountToSelect: g.conf.PullPeerNum,
PullInterval: g.conf.PullInterval,
Tag: proto.GossipMessage_EMPTY,
}
pkiIDFromMsg := func(msg *proto.SignedGossipMessage) string {
identityMsg := msg.GetPeerIdentity()
if identityMsg == nil || identityMsg.PkiId == nil {
return ""
}
return fmt.Sprintf("%s", string(identityMsg.PkiId))
}
certConsumer := func(msg *proto.SignedGossipMessage) {
idMsg := msg.GetPeerIdentity()
if idMsg == nil || idMsg.Cert == nil || idMsg.PkiId == nil {
g.logger.Warning("Invalid PeerIdentity:", idMsg)
return
}
err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
if err != nil {
g.logger.Warningf("Failed associating PKI-ID with certificate: %+v", errors.WithStack(err))
}
g.logger.Debug("Learned of a new certificate:", idMsg.Cert)
}
adapter := &pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
EgressDigFilter: g.sameOrgOrOurOrgPullFilter,
}
return pull.NewPullMediator(conf, adapter)
}
- 這里決定了接收的是PullMsgType_IDENTITY_MSG類型消息
- 然后會(huì)在PeerIdentity消息中提取PkiId
- certConsumer這里是消息的訂閱者盗冷,最終會(huì)將收到的消息存入idMapper
- 之后就是走engine的標(biāo)準(zhǔn)流程了
總結(jié)
- 至此怠苔,基本上你能區(qū)別RemoteStateRequest和DataRequest的區(qū)別。DataRequest是用來(lái)給本地的block進(jìn)行查漏補(bǔ)缺正塌,而RemoteStateRequest是block同步的主力嘀略,主要是跟leader的最新的block保持一致恤溶。
- PullEngine不光是用來(lái)同步block,也用來(lái)同步peer節(jié)點(diǎn)的證書帜羊。分別叫BlockPullEngine和CertPullEngine咒程。