Serf下的Memberlist 源碼分析

(一)Memberlist 簡介

(1)Memberlist是用來管理分布式集群內(nèi)節(jié)點發(fā)現(xiàn),節(jié)點故障檢測糠悯、節(jié)點列表發(fā)現(xiàn)的軟件勋陪。
(2)Memberlist 是基于Gossip協(xié)議來傳播消息,該Gossip構(gòu)建在swim協(xié)議之上赋朦。

(二) Protocol

集群內(nèi)的廣播
節(jié)點通過udp協(xié)議向K個節(jié)點發(fā)送消息篓冲,節(jié)點從廣播隊列里面獲取消息李破,廣播隊列里的消息發(fā)送失敗超過一定次數(shù)后,消息就會被丟棄壹将。發(fā)送次數(shù)參考Config 里的 RetransmitMul的注釋嗤攻。

func (m *Memberlist) gossip() {
    
    // 隨機獲取K個節(jié)點
    kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
       ......
    })
    for _, node := range kNodes {
        // 獲取消息隊列里的消息
        msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
        if len(msgs) == 0 {
            return
        }

        addr := node.Address()
        if len(msgs) == 1 {//只有一條消息
            //通過UDP發(fā)送消息
            if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
            }
        } else {
            // Otherwise create and send a compound message
            compound := makeCompoundMessage(msgs)
            if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
                m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
            }
        }
    }
}

Push/Pull
每隔一個時間間隔,隨機選取一個節(jié)點诽俯,跟它建立tcp連接妇菱,然后將本地的全部節(jié)點 狀態(tài)、用戶數(shù)據(jù)發(fā)送過去暴区,然后對端將其掌握的全部節(jié)點狀態(tài)闯团、用戶數(shù)據(jù)發(fā)送回來,然后完成2份數(shù)據(jù)的合并仙粱。 此動作可以加速集群內(nèi)信息的收斂速度房交。

func (m *Memberlist) pushPull() {
    // Get a random live node
    m.nodeLock.RLock()
    nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
        return n.Name == m.config.Name ||
            n.State != stateAlive
    })
    m.nodeLock.RUnlock()

    // If no nodes, bail
    if len(nodes) == 0 {
        return
    }
    node := nodes[0]

    // Attempt a push pull
    if err := m.pushPullNode(node.Address(), false); err != nil {
        m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
    }
}

// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr string, join bool) error {
    defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())

    // Attempt to send and receive with the node
    remote, userState, err := m.sendAndReceiveState(addr, join)
    if err != nil {
        return err
    }
    //合并所有node信息
    if err := m.mergeRemoteState(join, remote, userState); err != nil {
        return err
    }
    return nil
}

節(jié)點的三個狀態(tài)

const (
    stateAlive nodeStateType = iota 
    stateSuspect
    stateDead
)

Alive (比較簡單,代碼不分析)
當(dāng)節(jié)點上線的時候伐割,向集群廣播Alive消息候味。

Probe
當(dāng)節(jié)點啟動之后,每個一定的時間間隔隔心,會選取一個節(jié)點對其發(fā)送一個PING(UDP)消息,當(dāng)PING消息失敗后白群,會隨機選取IndirectChecks個節(jié)點發(fā)起間接的PING(UDP)請求和直接在發(fā)起一個TCP PING請求。收到間接PING請求的節(jié)點會根據(jù)請求中的地址發(fā)起一個PING消息硬霍,將PING的結(jié)果返回給間接請求的源節(jié)點帜慢。如果探測超時之間內(nèi),本節(jié)點沒有收到任何一個要探測節(jié)點的ACK消息,則標記要探測的節(jié)點狀態(tài)為suspect粱玲。

// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
......
    node = *m.nodes[m.probeIndex]
    if node.Name == m.config.Name {
        skip = true
    } else if node.State == stateDead {
        skip = true
    }

    // Potentially skip
    m.nodeLock.RUnlock()
    m.probeIndex++
    if skip {
        numCheck++
        goto START
    }

    // Probe the specific node
    m.probeNode(&node)
}

// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
    ......
    if node.State == stateAlive {
        //發(fā)送一個Ping消息
        if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
            m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
            return
        }
    }else{
        ......
    }

     //等待Ping的返回結(jié)果
    select {
    case v := <-ackCh:
        //Ping成功
        if v.Complete == true {
            if m.config.Ping != nil {
                rtt := v.Timestamp.Sub(sent)
                m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
            }
            return
        }
        ......
    case <-time.After(m.config.ProbeTimeout):
        //Ping超時
        m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
    }

    // Get some random live nodes.
    m.nodeLock.RLock()
    kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
        return n.Name == m.config.Name ||
            n.Name == node.Name ||
            n.State != stateAlive
    })
    m.nodeLock.RUnlock()

    // 發(fā)起Indirect Ping 
    expectedNacks := 0
    ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
    for _, peer := range kNodes {
        // We only expect nack to be sent from peers who understand
        // version 4 of the protocol.
        if ind.Nack = peer.PMax >= 4; ind.Nack {
            expectedNacks++
        }

        if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
            m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
        }
    }

    fallbackCh := make(chan bool, 1)
    if (!m.config.DisableTcpPings) && (node.PMax >= 3) {//發(fā)起tcp PING
        go func() {
            defer close(fallbackCh)
            didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
            if err != nil {
                m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
            } else {
                fallbackCh <- didContact
            }
        }()
    } else {
        close(fallbackCh)
    }
    
    select {
    case v := <-ackCh:
        if v.Complete == true {//PING成功
            return
        }
    }
    
    for didContact := range fallbackCh {//PING超時
        if didContact {
            m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
            return
        }
    }
    .....
    //失敗成suspect
    s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
    m.suspectNode(&s)
    

suspect
(1)當(dāng)節(jié)點被標記為suspect,本地啟動一個定時器侍咱,發(fā)出一個suspect廣播,在此期間如果收到其他節(jié)點發(fā)送過來的supect消息密幔,就將本地的suspect確認數(shù)加1楔脯,當(dāng)確認數(shù)達到要求之后并且該節(jié)依舊不是alive狀態(tài),會將該節(jié)點標記dead胯甩。
(2)如果本地節(jié)點收到一個針對本地節(jié)點的suspect消息昧廷,本地節(jié)點會發(fā)送一個alive廣播,修正本節(jié)點在其他節(jié)點上的狀態(tài)偎箫。

dead
和suspect過程類型這里就不再贅述木柬。

(三)Degegate

Memberlist對消息的封裝和邏輯處理并不提供具體的邏輯實現(xiàn),只提供了一個Delegate interface 淹办,Delegate interface里面具體的方法由serf層來封裝和邏輯實現(xiàn)眉枕,包括集群的管理和UserEvent和Query等等。

type Delegate interface {
    // NodeMeta is used to retrieve meta-data about the current node
    // when broadcasting an alive message. It's length is limited to
    // the given byte size. This metadata is available in the Node structure.
    NodeMeta(limit int) []byte
 
    // NotifyMsg is called when a user-data message is received.
    // Care should be taken that this method does not block, since doing
    // so would block the entire UDP packet receive loop. Additionally, the byte
    // slice may be modified after the call returns, so it should be copied if needed
    NotifyMsg([]byte)
 
    // GetBroadcasts is called when user data messages can be broadcast.
    // It can return a list of buffers to send. Each buffer should assume an
    // overhead as provided with a limit on the total byte size allowed.
    // The total byte size of the resulting data to send must not exceed
    // the limit. Care should be taken that this method does not block,
    // since doing so would block the entire UDP packet receive loop.
    GetBroadcasts(overhead, limit int) [][]byte
 
    // LocalState is used for a TCP Push/Pull. This is sent to
    // the remote side in addition to the membership information. Any
    // data can be sent here. See MergeRemoteState as well. The `join`
    // boolean indicates this is for a join instead of a push/pull.
    LocalState(join bool) []byte
 
    // MergeRemoteState is invoked after a TCP Push/Pull. This is the
    // state received from the remote side and is the result of the
    // remote side's LocalState call. The 'join'
    // boolean indicates this is for a join instead of a push/pull.
    MergeRemoteState(buf []byte, join bool)
}
~                                                                                                                                                       
~                                                     

NodeMeta

NotifyMsg

NotifyMsg是serf中整個消息中間件的核心接口怜森,gossip協(xié)議層所有的消息都會回調(diào)到NotifyMsg速挑。

func (d *delegate) NotifyMsg(buf []byte) {
    // If we didn't actually receive any data, then ignore it.
    if len(buf) == 0 {
        return
    }
    metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))

    rebroadcast := false
    rebroadcastQueue := d.serf.broadcasts
    t := messageType(buf[0])
    switch t {
     ......
    case messageUserEventType:
        var event messageUserEvent
        if err := decodeMessage(buf[1:], &event); err != nil {
            d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)
            break
        }

        d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
        rebroadcast = d.serf.handleUserEvent(&event)
        rebroadcastQueue = d.serf.eventBroadcasts

    case messageQueryType:
        var query messageQuery
        if err := decodeMessage(buf[1:], &query); err != nil {
            d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)
            break
        }

        d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
        rebroadcast = d.serf.handleQuery(&query)
        rebroadcastQueue = d.serf.queryBroadcasts

     

    default:
        d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
    }

    ......
}

NotifyMsg包含了多種消息類型的處理,這里我們只分析UserEvent和Query,如果對其他消息的處理有信息可以閱讀源碼副硅。其中UserEvent實現(xiàn)了集群中不需要反饋的姥宝,單向通信,Query和QueryResponse實現(xiàn)了集群中雙向通信恐疲。

func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
    // Witness a potentially newer time
    s.eventClock.Witness(eventMsg.LTime)

    s.eventLock.Lock()
    defer s.eventLock.Unlock()

    // Ignore if it is before our minimum event time
    if eventMsg.LTime < s.eventMinTime {
        return false
    }

    // Check if this message is too old
    curTime := s.eventClock.Time()
    if curTime > LamportTime(len(s.eventBuffer)) &&
        eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
        s.logger.Printf(
            "[WARN] serf: received old event %s from time %d (current: %d)",
            eventMsg.Name,
            eventMsg.LTime,
            s.eventClock.Time())
        return false
    }

    // Check if we've already seen this(該消息已經(jīng)收到過)
    idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
    seen := s.eventBuffer[idx]
    userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
    if seen != nil && seen.LTime == eventMsg.LTime {
        for _, previous := range seen.Events {
            if previous.Equals(&userEvent) {
                return false
            }
        }
    } else {
        seen = &userEvents{LTime: eventMsg.LTime}
        s.eventBuffer[idx] = seen
    }

    // Add to recent events
    seen.Events = append(seen.Events, userEvent)

    // Update some metrics
    metrics.IncrCounter([]string{"serf", "events"}, 1)
    metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
     
    //將消息發(fā)往Agent層
    if s.config.EventCh != nil {
        s.config.EventCh <- UserEvent{
            LTime:    eventMsg.LTime,
            Name:     eventMsg.Name,
            Payload:  eventMsg.Payload,
            Coalesce: eventMsg.CC,
        }
    }
    return true
}

Query消息的處理機制和UserEvent類型腊满,這里不再贅述。

其他接口比較簡單這里就不再做分析了培己。

(四)Transport

Memberlist的內(nèi)部包含了TCP和UDP的通信過程碳蛋,所以需要一個Transport來實現(xiàn)底層的api。
Memberlist內(nèi)部默認提供了Transport的實現(xiàn)省咨,當(dāng)然也可以通過Config來配置定制的Transport.
Transport的接口如下

type Transport interface {
    // FinalAdvertiseAddr is given the user's configured values (which
    // might be empty) and returns the desired IP and port to advertise to
    // the rest of the cluster.
    FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)

    // WriteTo is a packet-oriented interface that fires off the given
    // payload to the given address in a connectionless fashion. This should
    // return a time stamp that's as close as possible to when the packet
    // was transmitted to help make accurate RTT measurements during probes.
    //
    // This is similar to net.PacketConn, though we didn't want to expose
    // that full set of required methods to keep assumptions about the
    // underlying plumbing to a minimum. We also treat the address here as a
    // string, similar to Dial, so it's network neutral, so this usually is
    // in the form of "host:port".
    WriteTo(b []byte, addr string) (time.Time, error)

    // PacketCh returns a channel that can be read to receive incoming
    // packets from other peers. How this is set up for listening is left as
    // an exercise for the concrete transport implementations.
    PacketCh() <-chan *Packet

    // DialTimeout is used to create a connection that allows us to perform
    // two-way communication with a peer. This is generally more expensive
    // than packet connections so is used for more infrequent operations
    // such as anti-entropy or fallback probes if the packet-oriented probe
    // failed.
    DialTimeout(addr string, timeout time.Duration) (net.Conn, error)

    // StreamCh returns a channel that can be read to handle incoming stream
    // connections from other peers. How this is set up for listening is
    // left as an exercise for the concrete transport implementations.
    StreamCh() <-chan net.Conn

    // Shutdown is called when memberlist is shutting down; this gives the
    // transport a chance to clean up any listeners.
    Shutdown() error
}

在newMemberlist里面會判斷是否使用默認的Transport,這部分代碼如下:

func newMemberlist(conf *Config) (*Memberlist, error) {
    ......
    // Set up a network transport by default if a custom one wasn't given
    // by the config.
    transport := conf.Transport
    if transport == nil {
        nc := &NetTransportConfig{
            BindAddrs: []string{conf.BindAddr},
            BindPort:  conf.BindPort,
            Logger:    logger,
        }

        // See comment below for details about the retry in here.
        makeNetRetry := func(limit int) (*NetTransport, error) {
            var err error
            for try := 0; try < limit; try++ {
                var nt *NetTransport
                if nt, err = NewNetzzTransport(nc); err == nil {
                    return nt, nil
                }
                if strings.Contains(err.Error(), "address already in use") {
                    logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
                    continue
                }
            }

            return nil, fmt.Errorf("failed to obtain an address: %v", err)
        }

    ......
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末肃弟,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子茸炒,更是在濱河造成了極大的恐慌愕乎,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件壁公,死亡現(xiàn)場離奇詭異感论,居然都是意外死亡,警方通過查閱死者的電腦和手機紊册,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門比肄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來快耿,“玉大人,你說我怎么就攤上這事芳绩∠坪ィ” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵妥色,是天一觀的道長搪花。 經(jīng)常有香客問我,道長嘹害,這世上最難降的妖魔是什么撮竿? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮笔呀,結(jié)果婚禮上幢踏,老公的妹妹穿的比我還像新娘。我一直安慰自己许师,他們只是感情好房蝉,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著微渠,像睡著了一般搭幻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上敛助,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天粗卜,我揣著相機與錄音屋确,去河邊找鬼纳击。 笑死,一個胖子當(dāng)著我的面吹牛攻臀,可吹牛的內(nèi)容都是我干的焕数。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼刨啸,長吁一口氣:“原來是場噩夢啊……” “哼堡赔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起设联,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤善已,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后离例,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體换团,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年宫蛆,在試婚紗的時候發(fā)現(xiàn)自己被綠了艘包。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖想虎,靈堂內(nèi)的尸體忽然破棺而出卦尊,到底是詐尸還是另有隱情,我是刑警寧澤舌厨,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布岂却,位于F島的核電站,受9級特大地震影響裙椭,放射性物質(zhì)發(fā)生泄漏淌友。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一骇陈、第九天 我趴在偏房一處隱蔽的房頂上張望震庭。 院中可真熱鬧,春花似錦你雌、人聲如沸器联。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拨拓。三九已至,卻和暖如春氓栈,著一層夾襖步出監(jiān)牢的瞬間渣磷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工授瘦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留醋界,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓提完,卻偏偏與公主長得像形纺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子徒欣,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容