fabric 中的 gossip 接口酌予,最底層通信接口恳啥,實(shí)際只有兩個(gè)操作查描,所有的 Gossip相關(guān)操作都是在這兩個(gè)接口上堆砌出來的宵蛀,這兩個(gè)接口定義在
fabric/protos/gossip/message.proto
// Gossip
service Gossip {
// GossipStream is the gRPC stream used for sending and receiving messages
rpc GossipStream (stream Envelope) returns (stream Envelope) {}
// Ping is used to probe a remote peer's aliveness
rpc Ping (Empty) returns (Empty) {}
}
- GossipStream 用來通信
- Ping用來判斷節(jié)點(diǎn)狀態(tài)
可以看出來 fabric 通信方式使用了 grpc 的 stream痴鳄,沒有采用udp方式瘟斜,在特定情況下可能會(huì)對(duì)性能產(chǎn)生一定的影響
fabric/gossip/comm/comm_impl.go 實(shí)現(xiàn)了最底層接口,我們來分析一下主要實(shí)現(xiàn)痪寻,只關(guān)注核心收發(fā)消息螺句,不關(guān)注其他細(xì)節(jié)
消息的接受
commImpl作為Gossip模塊的基本收發(fā)實(shí)現(xiàn),我們只分析如何收發(fā)的橡类,其實(shí)已經(jīng)給出了 proto 文件我們應(yīng)該很容易猜到蛇尚,接受消息只需要實(shí)現(xiàn) proto 兩個(gè)接口的 server 端, 發(fā)送消息 只需要實(shí)現(xiàn) proto 的client 就可以了顾画。分別看一下收發(fā)細(xì)節(jié)
收消息 comm_impl.go GossipStream 函數(shù)取劫,在這個(gè)函數(shù)中,前面都是做了一下準(zhǔn)備工作亲雪,收發(fā)消息關(guān)鍵代碼有兩處
// fabric/gossip/comm/comm_impl.go GossipStream
h := func(m *proto.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = h
return conn.serviceConnection()
對(duì)消息進(jìn)行分發(fā)的 handler 勇凭,會(huì)在接下來的serviceConnection函數(shù)里面調(diào)用
// fabric/gossip/comm/conn.go
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
defer close(msgChan)
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, msgChan)
go conn.writeToStream()
for !conn.toDie() {
select {
case stop := <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
conn.stopChan <- stop
return nil
case err := <-errChan:
return err
case msg := <-msgChan:
conn.handler(msg)
}
}
return nil
}
開啟了兩個(gè)goroutine, 分別進(jìn)行收發(fā), go conn.readFromStream(errChan, msgChan),這個(gè)函數(shù)相當(dāng)于在 stream 上不停的接收消息义辕,收到了 就給 msgChan, 在下面這個(gè)for循環(huán)中會(huì)調(diào)用 conn.handler(msg) 來處理收到的消息, go conn.writeToStream() 是寫消息虾标,后續(xù)再講
conn.handler(msg) 實(shí)際就是之前說過的 c.msgPublisher.DeMultiplex 函數(shù),那么我們接下來分析一下 這個(gè)函數(shù)對(duì)消息做了什么
// fabric/gossip/comm/demux.go
// DeMultiplex broadcasts the message to all channels that were returned
// by AddChannel calls and that hold the respected predicates.
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
defer func() {
recover()
}() // recover from sending on a closed channel
if m.isClosed() {
return
}
m.lock.RLock()
channels := m.channels
m.lock.RUnlock()
for _, ch := range channels {
if ch.pred(msg) {
ch.ch <- msg
}
}
}
根據(jù)代碼可以看出, 其實(shí)就是 將所有的 channel 拿出來璧函,每個(gè) 調(diào)用一下 ch.pred(msg) 如果返回值為真傀蚌,就將消息 發(fā)送給channel,其實(shí)到這里已經(jīng)比較清楚了蘸吓,相當(dāng)于 哪個(gè)channel 想接受消息善炫,就調(diào)用 AddChannel ,然后就可以接收到消息了库继,我們先看一下 AddChannel 函數(shù)
// fabric/gossip/comm/demux.go
// AddChannel registers a channel with a certain predicate
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
m.lock.Lock()
defer m.lock.Unlock()
ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
m.channels = append(m.channels, ch)
return ch.ch
}
根據(jù)代碼可以很清晰的看出來箩艺,這個(gè)函數(shù)值需要一個(gè) common.MessageAcceptor 函數(shù)類型的參數(shù),返回 接收到的消息 chan (上面已經(jīng)分析過了返回的這個(gè) chan 就是接收到的消息的 chan), 對(duì)于 common.MessageAcceptor 也很容易看出來就是一個(gè)消息過濾器宪萄,可以自定義規(guī)則想接收哪些消息艺谆, 全文搜索一下 AddChannel 一下,可以很容易發(fā)現(xiàn) 就是實(shí)現(xiàn) gossip service 的 Accept
到這里已經(jīng)很清晰了拜英,接收消息總過就進(jìn)行如下幾個(gè)過程,
- 實(shí)現(xiàn)message.proto 的 GossipStream 接口静汤,啟動(dòng)一個(gè)goroutine 不停的在 grpc stream 上面接收消息(go conn.readFromStream(errChan, msgChan))
- 接收到消息以后,使用 DeMultiplex 函數(shù) 像 注冊(cè)過的Channel中分發(fā)(也就是調(diào)用了 AddChannel) 的分發(fā)
- 在 AddChannel的時(shí)候給消息添加了一個(gè)過濾器
消息的發(fā)送
發(fā)送代碼很明顯在下面這個(gè)函數(shù)
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
if c.isStopping() || len(peers) == 0 {
return
}
c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")
for _, peer := range peers {
go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg)
}(peer, msg)
}
}
這個(gè)函數(shù)很簡(jiǎn)單就是 msg, peers 兩個(gè)參數(shù)居凶,將 msg 發(fā)給 所有的 peer虫给,有一點(diǎn)需要注意下 在發(fā)送使用了 go ,這樣可以提高發(fā)送效率侠碧,相當(dāng)于同時(shí)給 所有 peer 發(fā)送抹估。接下來 看看 sendToEndpoint 函數(shù)
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
if c.isStopping() {
return
}
c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
defer c.logger.Debug("Exiting")
var err error
conn, err := c.connStore.getConnection(peer)
if err == nil {
disConnectOnErr := func(err error) {
c.logger.Warning(peer, "isn't responsive:", err)
c.disconnect(peer.PKIID)
}
conn.send(msg, disConnectOnErr)
return
}
c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
c.disconnect(peer.PKIID)
}
這個(gè)函數(shù)我們只分析兩句代碼 getConnection, conn.send(), 一個(gè)是獲取conn, 一個(gè)是發(fā)送消息
getConnection
// fabric/gossip/comm/conn.go
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
cs.RLock()
isClosing := cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.New("Shutting down")
}
pkiID := peer.PKIID
endpoint := peer.Endpoint
cs.Lock()
destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
if !hasConnected {
destinationLock = &sync.RWMutex{}
cs.destinationLocks[string(pkiID)] = destinationLock
}
cs.Unlock()
destinationLock.Lock()
cs.RLock()
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {
cs.RUnlock()
destinationLock.Unlock()
return conn, nil
}
cs.RUnlock()
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
destinationLock.Unlock()
cs.RLock()
isClosing = cs.isClosing
cs.RUnlock()
if isClosing {
return nil, errors.New("ConnStore is closing")
}
cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
// check again, maybe someone connected to us during the connection creation?
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// no one connected to us AND we failed connecting!
if err != nil {
return nil, err
}
// at this point in the code, we created a connection to a remote peer
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
go conn.serviceConnection()
return conn, nil
}
這個(gè)函數(shù)代碼很多,對(duì)于主邏輯只有兩句舆床,一個(gè)是 createConnection棋蚌,一個(gè)是 conn.serviceConnection(前文已經(jīng)分析過,就是打開兩個(gè)goroutine挨队,分別監(jiān)聽接收谷暮,發(fā)送) ,其余的都是優(yōu)化盛垦,邏輯完成性的代碼湿弦,無需關(guān)心, 我們看看 createConnection
// fabric/gossip/comm/comm_impl.go
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
var err error
var cc *grpc.ClientConn
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo
c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
if c.isStopping() {
return nil, errors.New("Stopping")
}
cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
if err != nil {
return nil, err
}
cl := proto.NewGossipClient(cc)
if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
cc.Close()
return nil, err
}
if stream, err = cl.GossipStream(context.Background()); err == nil {
connInfo, err = c.authenticateRemotePeer(stream)
if err == nil {
pkiID = connInfo.ID
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
// PKIID is nil when we don't know the remote PKI id's
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
return nil, errors.New("Authentication failure")
}
conn := newConnection(cl, cc, stream, nil)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger
h := func(m *proto.SignedGossipMessage) {
c.logger.Debug("Got message:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = h
return conn, nil
}
}
cc.Close()
return nil, err
}
代碼很長,不過我們只需要簡(jiǎn)單看一下腾夯,可以很明顯的看出來就是我們文章開頭猜測(cè)的颊埃,發(fā)送消息就是 實(shí)現(xiàn) message.proto 里面的client,其余的都是對(duì)這個(gè)client的封裝蝶俱,對(duì)于理解代碼而言不需要太關(guān)注
接下來我們只剩下一個(gè) conn.send()
// fabric/gossip/comm/conn.go
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
conn.Lock()
defer conn.Unlock()
if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
go onErr(errSendOverflow)
return
}
m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
}
conn.outBuff <- m
}
代碼異常簡(jiǎn)單就是 將msg 傳送給 conn.outBuff, 其實(shí)我們看到這里已經(jīng)可以確定肯定有另外一個(gè)地方在 等著 conn.outBuff 發(fā)送過來的消息班利,然后寫進(jìn)去,聰明的讀者已經(jīng)想到了榨呆,沒錯(cuò)罗标,就是前面 serviceConnection 中的 go writeToStream()
func (conn *connection) writeToStream() {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
return
}
select {
case m := <-conn.outBuff:
err := stream.Send(m.envelope)
if err != nil {
go m.onErr(err)
return
}
break
case stop := <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
conn.stopChan <- stop
return
}
}
}
代碼清晰明了, 利用 grpc stream 進(jìn)行寫消息,至此 我們已經(jīng)把 fabric 中 gossip 模塊的最底層收發(fā)消息分析清楚了,gossip 模塊所有的功能都是在這個(gè)代碼基礎(chǔ)上進(jìn)行的
總結(jié)
讀代碼我一直沒有找到特別好的辦法闯割,我現(xiàn)在的方法就是 大致瀏覽一下代碼彻消,知道代碼結(jié)構(gòu),然后從底層一層一層的讀宙拉,讀代碼的時(shí)候一定不能糾結(jié)細(xì)節(jié)宾尚,先讀懂主線,對(duì)于其他的關(guān)心的在一點(diǎn)點(diǎn)看谢澈。