fabric gossip 源碼解析

fabric 中的 gossip 接口酌予,最底層通信接口恳啥,實(shí)際只有兩個(gè)操作查描,所有的 Gossip相關(guān)操作都是在這兩個(gè)接口上堆砌出來的宵蛀,這兩個(gè)接口定義在
fabric/protos/gossip/message.proto

// Gossip
service Gossip {

    // GossipStream is the gRPC stream used for sending and receiving messages
    rpc GossipStream (stream Envelope) returns (stream Envelope) {}

    // Ping is used to probe a remote peer's aliveness
    rpc Ping (Empty) returns (Empty) {}
}

  1. GossipStream 用來通信
  2. Ping用來判斷節(jié)點(diǎn)狀態(tài)

可以看出來 fabric 通信方式使用了 grpc 的 stream痴鳄,沒有采用udp方式瘟斜,在特定情況下可能會(huì)對(duì)性能產(chǎn)生一定的影響

fabric/gossip/comm/comm_impl.go 實(shí)現(xiàn)了最底層接口,我們來分析一下主要實(shí)現(xiàn)痪寻,只關(guān)注核心收發(fā)消息螺句,不關(guān)注其他細(xì)節(jié)

消息的接受

commImpl作為Gossip模塊的基本收發(fā)實(shí)現(xiàn),我們只分析如何收發(fā)的橡类,其實(shí)已經(jīng)給出了 proto 文件我們應(yīng)該很容易猜到蛇尚,接受消息只需要實(shí)現(xiàn) proto 兩個(gè)接口的 server 端, 發(fā)送消息 只需要實(shí)現(xiàn) proto 的client 就可以了顾画。分別看一下收發(fā)細(xì)節(jié)

收消息 comm_impl.go GossipStream 函數(shù)取劫,在這個(gè)函數(shù)中,前面都是做了一下準(zhǔn)備工作亲雪,收發(fā)消息關(guān)鍵代碼有兩處

    // fabric/gossip/comm/comm_impl.go GossipStream

    h := func(m *proto.SignedGossipMessage) {
        c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
            conn:                conn,
            lock:                conn,
            SignedGossipMessage: m,
            connInfo:            connInfo,
        })
    }
    conn.handler = h
    return conn.serviceConnection()

對(duì)消息進(jìn)行分發(fā)的 handler 勇凭,會(huì)在接下來的serviceConnection函數(shù)里面調(diào)用

// fabric/gossip/comm/conn.go

func (conn *connection) serviceConnection() error {
    errChan := make(chan error, 1)
    msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
    defer close(msgChan)

    // Call stream.Recv() asynchronously in readFromStream(),
    // and wait for either the Recv() call to end,
    // or a signal to close the connection, which exits
    // the method and makes the Recv() call to fail in the
    // readFromStream() method
    go conn.readFromStream(errChan, msgChan)

    go conn.writeToStream()

    for !conn.toDie() {
        select {
        case stop := <-conn.stopChan:
            conn.logger.Debug("Closing reading from stream")
            conn.stopChan <- stop
            return nil
        case err := <-errChan:
            return err
        case msg := <-msgChan:
            conn.handler(msg)
        }
    }
    return nil
}

開啟了兩個(gè)goroutine, 分別進(jìn)行收發(fā), go conn.readFromStream(errChan, msgChan),這個(gè)函數(shù)相當(dāng)于在 stream 上不停的接收消息义辕,收到了 就給 msgChan, 在下面這個(gè)for循環(huán)中會(huì)調(diào)用 conn.handler(msg) 來處理收到的消息, go conn.writeToStream() 是寫消息虾标,后續(xù)再講

conn.handler(msg) 實(shí)際就是之前說過的 c.msgPublisher.DeMultiplex 函數(shù),那么我們接下來分析一下 這個(gè)函數(shù)對(duì)消息做了什么

// fabric/gossip/comm/demux.go

// DeMultiplex broadcasts the message to all channels that were returned
// by AddChannel calls and that hold the respected predicates.
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
    defer func() {
        recover()
    }() // recover from sending on a closed channel

    if m.isClosed() {
        return
    }

    m.lock.RLock()
    channels := m.channels
    m.lock.RUnlock()

    for _, ch := range channels {
        if ch.pred(msg) {
            ch.ch <- msg
        }
    }
}

根據(jù)代碼可以看出, 其實(shí)就是 將所有的 channel 拿出來璧函,每個(gè) 調(diào)用一下 ch.pred(msg) 如果返回值為真傀蚌,就將消息 發(fā)送給channel,其實(shí)到這里已經(jīng)比較清楚了蘸吓,相當(dāng)于 哪個(gè)channel 想接受消息善炫,就調(diào)用 AddChannel ,然后就可以接收到消息了库继,我們先看一下 AddChannel 函數(shù)

// fabric/gossip/comm/demux.go

// AddChannel registers a channel with a certain predicate
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} {
    m.lock.Lock()
    defer m.lock.Unlock()
    ch := &channel{ch: make(chan interface{}, 10), pred: predicate}
    m.channels = append(m.channels, ch)
    return ch.ch
}

根據(jù)代碼可以很清晰的看出來箩艺,這個(gè)函數(shù)值需要一個(gè) common.MessageAcceptor 函數(shù)類型的參數(shù),返回 接收到的消息 chan (上面已經(jīng)分析過了返回的這個(gè) chan 就是接收到的消息的 chan), 對(duì)于 common.MessageAcceptor 也很容易看出來就是一個(gè)消息過濾器宪萄,可以自定義規(guī)則想接收哪些消息艺谆, 全文搜索一下 AddChannel 一下,可以很容易發(fā)現(xiàn) 就是實(shí)現(xiàn) gossip service 的 Accept

到這里已經(jīng)很清晰了拜英,接收消息總過就進(jìn)行如下幾個(gè)過程,

  1. 實(shí)現(xiàn)message.proto 的 GossipStream 接口静汤,啟動(dòng)一個(gè)goroutine 不停的在 grpc stream 上面接收消息(go conn.readFromStream(errChan, msgChan))
  2. 接收到消息以后,使用 DeMultiplex 函數(shù) 像 注冊(cè)過的Channel中分發(fā)(也就是調(diào)用了 AddChannel) 的分發(fā)
  3. 在 AddChannel的時(shí)候給消息添加了一個(gè)過濾器

消息的發(fā)送

發(fā)送代碼很明顯在下面這個(gè)函數(shù)

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
    if c.isStopping() || len(peers) == 0 {
        return
    }

    c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")

    for _, peer := range peers {
        go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
            c.sendToEndpoint(peer, msg)
        }(peer, msg)
    }
}

這個(gè)函數(shù)很簡(jiǎn)單就是 msg, peers 兩個(gè)參數(shù)居凶,將 msg 發(fā)給 所有的 peer虫给,有一點(diǎn)需要注意下 在發(fā)送使用了 go ,這樣可以提高發(fā)送效率侠碧,相當(dāng)于同時(shí)給 所有 peer 發(fā)送抹估。接下來 看看 sendToEndpoint 函數(shù)

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
    if c.isStopping() {
        return
    }
    c.logger.Debug("Entering, Sending to", peer.Endpoint, ", msg:", msg)
    defer c.logger.Debug("Exiting")
    var err error

    conn, err := c.connStore.getConnection(peer)
    if err == nil {
        disConnectOnErr := func(err error) {
            c.logger.Warning(peer, "isn't responsive:", err)
            c.disconnect(peer.PKIID)
        }
        conn.send(msg, disConnectOnErr)
        return
    }
    c.logger.Warning("Failed obtaining connection for", peer, "reason:", err)
    c.disconnect(peer.PKIID)
}

這個(gè)函數(shù)我們只分析兩句代碼 getConnection, conn.send(), 一個(gè)是獲取conn, 一個(gè)是發(fā)送消息

getConnection

// fabric/gossip/comm/conn.go

func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
    cs.RLock()
    isClosing := cs.isClosing
    cs.RUnlock()

    if isClosing {
        return nil, errors.New("Shutting down")
    }

    pkiID := peer.PKIID
    endpoint := peer.Endpoint

    cs.Lock()
    destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]
    if !hasConnected {
        destinationLock = &sync.RWMutex{}
        cs.destinationLocks[string(pkiID)] = destinationLock
    }
    cs.Unlock()

    destinationLock.Lock()

    cs.RLock()
    conn, exists := cs.pki2Conn[string(pkiID)]
    if exists {
        cs.RUnlock()
        destinationLock.Unlock()
        return conn, nil
    }
    cs.RUnlock()

    createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)

    destinationLock.Unlock()

    cs.RLock()
    isClosing = cs.isClosing
    cs.RUnlock()
    if isClosing {
        return nil, errors.New("ConnStore is closing")
    }

    cs.Lock()
    delete(cs.destinationLocks, string(pkiID))
    defer cs.Unlock()

    // check again, maybe someone connected to us during the connection creation?
    conn, exists = cs.pki2Conn[string(pkiID)]

    if exists {
        if createdConnection != nil {
            createdConnection.close()
        }
        return conn, nil
    }

    // no one connected to us AND we failed connecting!
    if err != nil {
        return nil, err
    }

    // at this point in the code, we created a connection to a remote peer
    conn = createdConnection
    cs.pki2Conn[string(createdConnection.pkiID)] = conn

    go conn.serviceConnection()

    return conn, nil
}

這個(gè)函數(shù)代碼很多,對(duì)于主邏輯只有兩句舆床,一個(gè)是 createConnection棋蚌,一個(gè)是 conn.serviceConnection(前文已經(jīng)分析過,就是打開兩個(gè)goroutine挨队,分別監(jiān)聽接收谷暮,發(fā)送) ,其余的都是優(yōu)化盛垦,邏輯完成性的代碼湿弦,無需關(guān)心, 我們看看 createConnection

// fabric/gossip/comm/comm_impl.go

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
    var err error
    var cc *grpc.ClientConn
    var stream proto.Gossip_GossipStreamClient
    var pkiID common.PKIidType
    var connInfo *proto.ConnectionInfo

    c.logger.Debug("Entering", endpoint, expectedPKIID)
    defer c.logger.Debug("Exiting")

    if c.isStopping() {
        return nil, errors.New("Stopping")
    }
    cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
    if err != nil {
        return nil, err
    }

    cl := proto.NewGossipClient(cc)

    if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {
        cc.Close()
        return nil, err
    }

    if stream, err = cl.GossipStream(context.Background()); err == nil {
        connInfo, err = c.authenticateRemotePeer(stream)
        if err == nil {
            pkiID = connInfo.ID
            if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
                // PKIID is nil when we don't know the remote PKI id's
                c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
                return nil, errors.New("Authentication failure")
            }
            conn := newConnection(cl, cc, stream, nil)
            conn.pkiID = pkiID
            conn.info = connInfo
            conn.logger = c.logger

            h := func(m *proto.SignedGossipMessage) {
                c.logger.Debug("Got message:", m)
                c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
                    conn:                conn,
                    lock:                conn,
                    SignedGossipMessage: m,
                    connInfo:            connInfo,
                })
            }
            conn.handler = h
            return conn, nil
        }
    }
    cc.Close()
    return nil, err
}

代碼很長,不過我們只需要簡(jiǎn)單看一下腾夯,可以很明顯的看出來就是我們文章開頭猜測(cè)的颊埃,發(fā)送消息就是 實(shí)現(xiàn) message.proto 里面的client,其余的都是對(duì)這個(gè)client的封裝蝶俱,對(duì)于理解代碼而言不需要太關(guān)注

接下來我們只剩下一個(gè) conn.send()

// fabric/gossip/comm/conn.go

func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
    conn.Lock()
    defer conn.Unlock()

    if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
        go onErr(errSendOverflow)
        return
    }

    m := &msgSending{
        envelope: msg.Envelope,
        onErr:    onErr,
    }

    conn.outBuff <- m
}

代碼異常簡(jiǎn)單就是 將msg 傳送給 conn.outBuff, 其實(shí)我們看到這里已經(jīng)可以確定肯定有另外一個(gè)地方在 等著 conn.outBuff 發(fā)送過來的消息班利,然后寫進(jìn)去,聰明的讀者已經(jīng)想到了榨呆,沒錯(cuò)罗标,就是前面 serviceConnection 中的 go writeToStream()

func (conn *connection) writeToStream() {
    for !conn.toDie() {
        stream := conn.getStream()
        if stream == nil {
            conn.logger.Error(conn.pkiID, "Stream is nil, aborting!")
            return
        }
        select {
        case m := <-conn.outBuff:
            err := stream.Send(m.envelope)
            if err != nil {
                go m.onErr(err)
                return
            }
            break
        case stop := <-conn.stopChan:
            conn.logger.Debug("Closing writing to stream")
            conn.stopChan <- stop
            return
        }
    }
}

代碼清晰明了, 利用 grpc stream 進(jìn)行寫消息,至此 我們已經(jīng)把 fabric 中 gossip 模塊的最底層收發(fā)消息分析清楚了,gossip 模塊所有的功能都是在這個(gè)代碼基礎(chǔ)上進(jìn)行的

總結(jié)

讀代碼我一直沒有找到特別好的辦法闯割,我現(xiàn)在的方法就是 大致瀏覽一下代碼彻消,知道代碼結(jié)構(gòu),然后從底層一層一層的讀宙拉,讀代碼的時(shí)候一定不能糾結(jié)細(xì)節(jié)宾尚,先讀懂主線,對(duì)于其他的關(guān)心的在一點(diǎn)點(diǎn)看谢澈。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末煌贴,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子锥忿,更是在濱河造成了極大的恐慌崔步,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缎谷,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡灶似,警方通過查閱死者的電腦和手機(jī)列林,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來酪惭,“玉大人希痴,你說我怎么就攤上這事〈焊校” “怎么了砌创?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鲫懒。 經(jīng)常有香客問我嫩实,道長,這世上最難降的妖魔是什么窥岩? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任甲献,我火速辦了婚禮,結(jié)果婚禮上颂翼,老公的妹妹穿的比我還像新娘晃洒。我一直安慰自己,他們只是感情好朦乏,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布球及。 她就那樣靜靜地躺著,像睡著了一般呻疹。 火紅的嫁衣襯著肌膚如雪吃引。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音际歼,去河邊找鬼惶翻。 笑死,一個(gè)胖子當(dāng)著我的面吹牛鹅心,可吹牛的內(nèi)容都是我干的吕粗。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼旭愧,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼颅筋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起输枯,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤议泵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后桃熄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體先口,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年瞳收,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了碉京。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡螟深,死狀恐怖谐宙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情界弧,我是刑警寧澤凡蜻,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站垢箕,受9級(jí)特大地震影響划栓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜条获,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一茅姜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧月匣,春花似錦钻洒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至萍悴,卻和暖如春头遭,著一層夾襖步出監(jiān)牢的瞬間寓免,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來泰國打工计维, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留袜香,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓鲫惶,卻偏偏與公主長得像蜈首,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子欠母,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容