(一)Memberlist 簡介
(1)Memberlist是用來管理分布式集群內(nèi)節(jié)點發(fā)現(xiàn),節(jié)點故障檢測糠悯、節(jié)點列表發(fā)現(xiàn)的軟件勋陪。
(2)Memberlist 是基于Gossip協(xié)議來傳播消息,該Gossip構(gòu)建在swim協(xié)議之上赋朦。
(二) Protocol
集群內(nèi)的廣播
節(jié)點通過udp協(xié)議向K個節(jié)點發(fā)送消息篓冲,節(jié)點從廣播隊列里面獲取消息李破,廣播隊列里的消息發(fā)送失敗超過一定次數(shù)后,消息就會被丟棄壹将。發(fā)送次數(shù)參考Config 里的 RetransmitMul的注釋嗤攻。
func (m *Memberlist) gossip() {
// 隨機獲取K個節(jié)點
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
......
})
for _, node := range kNodes {
// 獲取消息隊列里的消息
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}
addr := node.Address()
if len(msgs) == 1 {//只有一條消息
//通過UDP發(fā)送消息
if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
}
}
Push/Pull
每隔一個時間間隔,隨機選取一個節(jié)點诽俯,跟它建立tcp連接妇菱,然后將本地的全部節(jié)點 狀態(tài)、用戶數(shù)據(jù)發(fā)送過去暴区,然后對端將其掌握的全部節(jié)點狀態(tài)闯团、用戶數(shù)據(jù)發(fā)送回來,然后完成2份數(shù)據(jù)的合并仙粱。 此動作可以加速集群內(nèi)信息的收斂速度房交。
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]
// Attempt a push pull
if err := m.pushPullNode(node.Address(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(addr string, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(addr, join)
if err != nil {
return err
}
//合并所有node信息
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
節(jié)點的三個狀態(tài)
const (
stateAlive nodeStateType = iota
stateSuspect
stateDead
)
Alive (比較簡單,代碼不分析)
當(dāng)節(jié)點上線的時候伐割,向集群廣播Alive消息候味。
Probe
當(dāng)節(jié)點啟動之后,每個一定的時間間隔隔心,會選取一個節(jié)點對其發(fā)送一個PING(UDP)消息,當(dāng)PING消息失敗后白群,會隨機選取IndirectChecks個節(jié)點發(fā)起間接的PING(UDP)請求和直接在發(fā)起一個TCP PING請求。收到間接PING請求的節(jié)點會根據(jù)請求中的地址發(fā)起一個PING消息硬霍,將PING的結(jié)果返回給間接請求的源節(jié)點帜慢。如果探測超時之間內(nèi),本節(jié)點沒有收到任何一個要探測節(jié)點的ACK消息,則標記要探測的節(jié)點狀態(tài)為suspect粱玲。
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
......
node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true
} else if node.State == stateDead {
skip = true
}
// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip {
numCheck++
goto START
}
// Probe the specific node
m.probeNode(&node)
}
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
......
if node.State == stateAlive {
//發(fā)送一個Ping消息
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
return
}
}else{
......
}
//等待Ping的返回結(jié)果
select {
case v := <-ackCh:
//Ping成功
if v.Complete == true {
if m.config.Ping != nil {
rtt := v.Timestamp.Sub(sent)
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
}
return
}
......
case <-time.After(m.config.ProbeTimeout):
//Ping超時
m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
}
// Get some random live nodes.
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != stateAlive
})
m.nodeLock.RUnlock()
// 發(fā)起Indirect Ping
expectedNacks := 0
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}
if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}
fallbackCh := make(chan bool, 1)
if (!m.config.DisableTcpPings) && (node.PMax >= 3) {//發(fā)起tcp PING
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}
select {
case v := <-ackCh:
if v.Complete == true {//PING成功
return
}
}
for didContact := range fallbackCh {//PING超時
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
}
.....
//失敗成suspect
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
suspect
(1)當(dāng)節(jié)點被標記為suspect,本地啟動一個定時器侍咱,發(fā)出一個suspect廣播,在此期間如果收到其他節(jié)點發(fā)送過來的supect消息密幔,就將本地的suspect確認數(shù)加1楔脯,當(dāng)確認數(shù)達到要求之后并且該節(jié)依舊不是alive狀態(tài),會將該節(jié)點標記dead胯甩。
(2)如果本地節(jié)點收到一個針對本地節(jié)點的suspect消息昧廷,本地節(jié)點會發(fā)送一個alive廣播,修正本節(jié)點在其他節(jié)點上的狀態(tài)偎箫。
dead
和suspect過程類型這里就不再贅述木柬。
(三)Degegate
Memberlist對消息的封裝和邏輯處理并不提供具體的邏輯實現(xiàn),只提供了一個Delegate interface 淹办,Delegate interface里面具體的方法由serf層來封裝和邏輯實現(xiàn)眉枕,包括集群的管理和UserEvent和Query等等。
type Delegate interface {
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
NodeMeta(limit int) []byte
// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
NotifyMsg([]byte)
// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
GetBroadcasts(overhead, limit int) [][]byte
// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
LocalState(join bool) []byte
// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// state received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
MergeRemoteState(buf []byte, join bool)
}
~
~
NodeMeta
NotifyMsg
NotifyMsg是serf中整個消息中間件的核心接口怜森,gossip協(xié)議層所有的消息都會回調(diào)到NotifyMsg速挑。
func (d *delegate) NotifyMsg(buf []byte) {
// If we didn't actually receive any data, then ignore it.
if len(buf) == 0 {
return
}
metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))
rebroadcast := false
rebroadcastQueue := d.serf.broadcasts
t := messageType(buf[0])
switch t {
......
case messageUserEventType:
var event messageUserEvent
if err := decodeMessage(buf[1:], &event); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
rebroadcast = d.serf.handleUserEvent(&event)
rebroadcastQueue = d.serf.eventBroadcasts
case messageQueryType:
var query messageQuery
if err := decodeMessage(buf[1:], &query); err != nil {
d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)
break
}
d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
rebroadcast = d.serf.handleQuery(&query)
rebroadcastQueue = d.serf.queryBroadcasts
default:
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
}
......
}
NotifyMsg包含了多種消息類型的處理,這里我們只分析UserEvent和Query,如果對其他消息的處理有信息可以閱讀源碼副硅。其中UserEvent實現(xiàn)了集群中不需要反饋的姥宝,單向通信,Query和QueryResponse實現(xiàn)了集群中雙向通信恐疲。
func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
// Witness a potentially newer time
s.eventClock.Witness(eventMsg.LTime)
s.eventLock.Lock()
defer s.eventLock.Unlock()
// Ignore if it is before our minimum event time
if eventMsg.LTime < s.eventMinTime {
return false
}
// Check if this message is too old
curTime := s.eventClock.Time()
if curTime > LamportTime(len(s.eventBuffer)) &&
eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
s.logger.Printf(
"[WARN] serf: received old event %s from time %d (current: %d)",
eventMsg.Name,
eventMsg.LTime,
s.eventClock.Time())
return false
}
// Check if we've already seen this(該消息已經(jīng)收到過)
idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
seen := s.eventBuffer[idx]
userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
if seen != nil && seen.LTime == eventMsg.LTime {
for _, previous := range seen.Events {
if previous.Equals(&userEvent) {
return false
}
}
} else {
seen = &userEvents{LTime: eventMsg.LTime}
s.eventBuffer[idx] = seen
}
// Add to recent events
seen.Events = append(seen.Events, userEvent)
// Update some metrics
metrics.IncrCounter([]string{"serf", "events"}, 1)
metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
//將消息發(fā)往Agent層
if s.config.EventCh != nil {
s.config.EventCh <- UserEvent{
LTime: eventMsg.LTime,
Name: eventMsg.Name,
Payload: eventMsg.Payload,
Coalesce: eventMsg.CC,
}
}
return true
}
Query消息的處理機制和UserEvent類型腊满,這里不再贅述。
其他接口比較簡單這里就不再做分析了培己。
(四)Transport
Memberlist的內(nèi)部包含了TCP和UDP的通信過程碳蛋,所以需要一個Transport來實現(xiàn)底層的api。
Memberlist內(nèi)部默認提供了Transport的實現(xiàn)省咨,當(dāng)然也可以通過Config來配置定制的Transport.
Transport的接口如下
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}
在newMemberlist里面會判斷是否使用默認的Transport,這部分代碼如下:
func newMemberlist(conf *Config) (*Memberlist, error) {
......
// Set up a network transport by default if a custom one wasn't given
// by the config.
transport := conf.Transport
if transport == nil {
nc := &NetTransportConfig{
BindAddrs: []string{conf.BindAddr},
BindPort: conf.BindPort,
Logger: logger,
}
// See comment below for details about the retry in here.
makeNetRetry := func(limit int) (*NetTransport, error) {
var err error
for try := 0; try < limit; try++ {
var nt *NetTransport
if nt, err = NewNetzzTransport(nc); err == nil {
return nt, nil
}
if strings.Contains(err.Error(), "address already in use") {
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
continue
}
}
return nil, fmt.Errorf("failed to obtain an address: %v", err)
}
......
}