consul
相信大家已經(jīng)知道了逛揩,在日常的開發(fā)以及運維中也會常常聽到 consul
這個詞季稳,但是不是所有的人都知道它是什么?它在運維中扮演了什么樣的角色呢辐啄?
首先甥绿,我們來看下 consul
的官網(wǎng)中是怎么形容自己的:
Service Discovery And Configuration Make Easy
讓服務(wù)發(fā)現(xiàn)以及配置變得更簡單,這個就是 consul
在 micro service
橫行的今天想要為我們解決的問題则披。
在微服務(wù)的世界中共缕,運維人員變得越來越累了,以往可能 1臺物理機部署 3~4 個應(yīng)用士复,就可以將提供完整的服務(wù)图谷,而且維護也變得極為容易,寫腳本就是了阱洪。但如今便贵,一旦進入的 micro service
的世界,提供相同的服務(wù)可能就需要 20~40 個應(yīng)用了冗荸,如此一來承璃,運維人員的工作量以及壓力大大增加,但是卻得不到肉眼可見的好處蚌本。
先來說下 consul
為我們提供的四大組件:
Service Discovery: 當(dāng)某個應(yīng)用可用的時候盔粹,可以向
consul
客戶端注冊自己隘梨,或者讓consul
客戶端通過配置發(fā)現(xiàn)自己,這樣舷嗡,如果有需要這個應(yīng)用的其他應(yīng)用就可以通過 consul 快速找到一個可用的應(yīng)用了轴猎。Health Check:
consul
客戶端提供任意數(shù)量的健康檢查,包括對應(yīng)用保持心跳进萄、主機物理資源監(jiān)控等捻脖。健康檢查可以被operator
檢測并操作,防止流量進入不健康的主機中鼠。KV Store: 應(yīng)用按需使用
consul
的 KV存儲 可婶,可以用于動態(tài)配置、功能標(biāo)記援雇、協(xié)調(diào)扰肌、領(lǐng)袖選舉等,通過客戶端的 HTTP 接口可以靈活方便使用熊杨。Multi Datacenter:
consul
提供開箱即用的多數(shù)據(jù)中心曙旭,這意味著用戶不需要擔(dān)心需要建立額外的抽象層讓業(yè)務(wù)擴展到多個區(qū)域。
我們可以發(fā)現(xiàn)晶府,微服務(wù)治理 的所有解決方案在 consul
的四大組件下都能得到很好的解決桂躏。
為了深入了解 consul
,我們先來看下幾個專有名詞:
Agent: 代理是
consul
集群中每個成員的基本單位川陆,他們以守護進程的形式存在剂习,代理有 客戶端 以及 服務(wù)端 兩種角色運行。所有的節(jié)點都必須運行一個代理较沪,它們相互之間通過 DNS 或者 HTTP 接口保持健康檢查并同步數(shù)據(jù)鳞绕。Client: 客戶端 是代理的其中一種角色,它會將所有的 RPC 請求轉(zhuǎn)發(fā)到 服務(wù)端 代理尸曼,而 客戶端 本身是 無狀態(tài) 的们何,而且只會占用非常少的資源并消耗少量的 網(wǎng)絡(luò)帶寬 ,建議每個應(yīng)用節(jié)點都運行一個 客戶端 控轿。
Server: 服務(wù)端 相對而言是一個非常重的代理冤竹,它的主要工作包括參與 raft仲裁 、 維護集群狀態(tài) 茬射、 響應(yīng)RPC查詢 鹦蠕、與其他的數(shù)據(jù)中心 交換數(shù)據(jù)、將 查詢轉(zhuǎn)發(fā)給Leader / 其他數(shù)據(jù)中心 等在抛≈硬。可以說,服務(wù)端是
consul
集群中最重要的角色,所以建議將其放置在相對獨立的主機上肠阱,并且一個集群(數(shù)據(jù)中心)中至少需要 3 個以上的 服務(wù)端 才能保證 最終一致性 票唆。Datacenter: 數(shù)據(jù)中心 很好理解,其實就是一個 低延遲辖所、高帶寬 的私有網(wǎng)絡(luò)環(huán)境,一個穩(wěn)定的數(shù)據(jù)中心環(huán)境對
raft協(xié)議
來說非常重要磨德,否則及其可能出現(xiàn)數(shù)據(jù)不同步缘回、服務(wù)質(zhì)量下降等情況。Gossip: 一種保證 最終一致性 的分布式協(xié)議典挑,常用于 點對點通信 酥宴,模擬人與人之間的交流從而達到理想中的 最終一致性 。而
consul
通過 UDP 使用該協(xié)議提供 成員管理 您觉、失敗檢測 拙寡、事件廣播 等功能。Raft: 是一種保證 強一致性 的分布式協(xié)議琳水,
consul
使用該協(xié)議提供 服務(wù)端 們的數(shù)據(jù)一致性肆糕,所以我們會在consul
源碼解析中會重點講述這個算法是如何被應(yīng)用的。
???
// agent/agent.go
// delegate 定義了代理的公共接口
type delegate interface {
Encrypted() bool // 數(shù)據(jù)是否進行了加密
GetLANCoordinate() (lib.CoordinateSet, error) // 獲取局域網(wǎng)內(nèi)為 Coordinate 角色的服務(wù)端
Leave() error // 離開集群
LANMembers() []serf.Member // 局域網(wǎng)內(nèi)的所有成員
LANMemberAllSegments() ([]serf.Member, error) // 局域網(wǎng)內(nèi)所有段區(qū)的成員
LANSegmentMembers(segment string) ([]serf.Member, error) // 局域網(wǎng)內(nèi)某個段區(qū)的所有成員
LocalMember() serf.Member // 本機成員
JoinLAN(addrs []string) (n int, err error) // 加入一個或多個段區(qū)
RemoveFailedNode(node string) error // 嘗試移除某個異常的節(jié)點
RPC(method string, args interface{}, reply interface{}) error // 遠(yuǎn)程調(diào)用
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReqlyFn) error // 發(fā)起快照存檔的遠(yuǎn)程調(diào)用
Shutdown() error // 關(guān)閉代理
Stats() map[string]map[string]string // 用于獲取應(yīng)用當(dāng)前狀態(tài)
}
// Agent 代理
type Agent struct {
// 代理的運行時配置在孝,支持 hot reload
config *config.RuntimeConfig
// ... 日志相關(guān)
// 內(nèi)存中收集到的應(yīng)用诚啃、主機等狀態(tài)信息
MemSink *metrics.InmemSink
// 代理的公共接口,而配置項則決定代理的角色
delegate delegate
// 本地策略執(zhí)行的管理者
acls *aclManager
// 保證策略執(zhí)行者的權(quán)限私沮,可實時更新始赎,覆蓋本地配置文件
tokens *token.Store
// 存儲本地節(jié)點、應(yīng)用仔燕、心跳的狀態(tài)造垛,用于反熵
State *local.State
// 負(fù)責(zé)維持本地與遠(yuǎn)端的狀態(tài)同步
sync *ae.StateSyncer
// ...各種心跳包(Monitor/HTTP/TCP/TTL/Docker 等)
// 用于接受其他節(jié)點發(fā)送過來的事件
eventCh chan serf.UserEvent
// 用環(huán)形隊列存儲接受到的所有事件,用 index 指向下一個插入的節(jié)點
// 使用讀寫鎖保證數(shù)據(jù)安全晰搀,當(dāng)一個事件被插入時五辽,會通知 group 中所有的訂閱者
eventBuf []*UserEvent
eventIndex int
eventLock sync.RWMutex
eventNotify NotifyGroup
// 重啟,并返回通知是否重啟成功
reloadCh chan chan error
// 關(guān)閉代理前的操作
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
// 添加到局域網(wǎng)成功的回調(diào)函數(shù)
joinLANNotifier notifier
// 返回重試加入局域網(wǎng)失敗的錯誤信息
retryJoinCh chan error
// 并發(fā)安全的存儲當(dāng)前所有節(jié)點的唯一名稱外恕,用于 RPC傳輸
endpoints map[string]string
endpointsLock sync.RWMutex
// ...為代理提供 DNS/HTTP 的API
// 追蹤當(dāng)前代理正在運行的所有監(jiān)控
watchPlans []*watch.Plan
}
// 決定當(dāng)前啟動的是 server 還是 client 的關(guān)鍵代碼在于
func (a *Agent) Start() error {
// ...
if c.ServerMode {
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
// error handler
a.delegate = server // 主要差別在這里
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger, a.tokens)
// error handler
a.delegate = client // 主要差別在這里
}
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }
// ...
}
因為相對來說奔脐, 客戶端 是比較輕量的代理,所以我們先來先看 客戶端 的結(jié)構(gòu)與實現(xiàn):
// agent/consul/client.go
type Client struct {
config *Config
// 連接 服務(wù)端 的連接池吁讨,用 TCP 協(xié)議
connPool *pool.ConnPool
// 用于選擇和維護客戶端用于 RPC 請求的服務(wù)端
routers *router.Manager
// 用于限制從客戶端到服務(wù)器的 RPC 總數(shù)
rpcLimiter *rate.Limiter
// 負(fù)責(zé)接送來自同一個數(shù)據(jù)中心的事件
eventCh chan serf.Event
logger *log.Logger
// 存儲當(dāng)前數(shù)據(jù)中心的 serf 集群信息
serf *serf.Serf
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
而 Client 所實現(xiàn)的 delegate
接口幾乎都是直接調(diào)用的 serf
提供的接口髓迎,如:
func (c *Client) JoinLAN(addrs []string) (int, error) {
return c.serf.Join(addrs, true)
}
func (c *Client) Leave() error {
c.logger.Printf("[INFO] consul: client starting leave")
// Leave the LAN pool
if c.serf != nil {
if err := c.serf.Leave(); err != nil {
c.logger.Printf("[ERR] consul: Failed to leave LAN Serf cluster: %v", err)
}
}
return nil
}
func (c *Client) KeyManagerLAN() *serf.KeyManager {
return c.serf.KeyManager()
}
而 Server
的由于涉及到 raft 協(xié)議,所以其實現(xiàn)復(fù)雜的多 (44 個 fields
)建丧,先來看下結(jié)構(gòu):
// agent/consul/server.go
type Server struct {
// 哨兵接口排龄,負(fù)責(zé)處理xxx TODO
sentinel sentinel.Evaluator
// 負(fù)責(zé)策略執(zhí)行的權(quán)限緩存 TODO
aclAuthCache *acl.Cache
// 負(fù)責(zé)策略執(zhí)行的非權(quán)限緩存 TODO
aclCache *aclCache
// 自動更新當(dāng)前權(quán)限的有效性
autopilotPolicy AutopilotPolicy
// 負(fù)責(zé)觸發(fā)移除無用的應(yīng)用的檢查
autopilotRemoveDeadCh chan struct{}
// 負(fù)責(zé)停止自動更新權(quán)限有效的代碼
autopilotShutdownCh chan struct{}
// 阻塞直至停止自動更新權(quán)限
autopilotWaitGroup sync.WaitGroup
// 存儲當(dāng)前集群的健康狀態(tài)
clusterHealth structs.OperatorHealthReply
clusterHealthLock sync.RWMutex
config *Config
// 連接其他服務(wù)端的連接池
connPool *pool.ConnPool
// ...
// 保證強一致性的 raft 狀態(tài)機
fsm *fsm.FSM
// 在相同數(shù)據(jù)中心的 consul 節(jié)點使用 raft 協(xié)議保證操作的強一致性
raft *raft.Raft
raftLayer *RaftLayer
raftStore *raftboltdb.BoltStore // 提供 ACID 的高性能 KV數(shù)據(jù)庫
raftTransport *raft.NetworkTransport
raftInmem *raft.InmemStore
// 通過 setupRaft() 設(shè)置,確保server能收到Leader更換通知
raftNotifyCh <-chan bool
// 判斷 Leader 是否準(zhǔn)備好提供一致性讀取
readyForConsistentReads int32
// 使用信號通知該服務(wù)端需要退出集群,并嘗試將 RPC 轉(zhuǎn)發(fā)到其他的服務(wù)端上橄维。
leaveCh chan struct{}
// 用于路由公域網(wǎng)的服務(wù)端或者由用戶定義的段區(qū)域
router *router.Router
// 用于接受進來的請求連接
Listener net.Listener
rpcServer *rpc.Server
rpcTLS *tls.Config
serfLAN *serf.Serf
// 通過段名指引到不同的 serf 集群中
segmentLAN map[string]*serf.Serf
// 在同一個數(shù)據(jù)中心中尺铣,由服務(wù)端組成的 serf 集群
serfWAN *serf.Serf
// 在當(dāng)前的數(shù)據(jù)中心進行服務(wù)端追蹤,提供 id 與 address 相互轉(zhuǎn)換
serverLookup *ServerLookup
// 通知廣播争舞,讓公域網(wǎng)的 serf 實例知道局域網(wǎng)的服務(wù)端發(fā)生的變化(退出)
floodLock sync.RWMutex
floodCh []chan struct{}
// ...
// 通知需要存儲快照凛忿,并重新發(fā)起 Leader 選舉
reassertLeaderCh chan chan error
// tombstone 算法的 GC 調(diào)優(yōu)參數(shù)
tombstoneGC *state.TombstoneGC
//
aclReplicationStatus structs.ACLReplicationStatus
aclReplicationStatusLock sync.RWMutex
}
下面來看看 consul
作為微服務(wù)治理的基礎(chǔ)架構(gòu)圖是怎么樣的:
毫無疑問,raft 協(xié)議是維持整個 consul
生態(tài)中最重要的一個協(xié)議竞川,下面重點來講下 raft 協(xié)議:
作為基于 Paxos 的一種變種算法店溢,它簡化了狀態(tài),通過加入 時間差 委乌、 領(lǐng)導(dǎo)選舉 等概念使一致性算法變得更簡單床牧、易懂,先來看看其中的專有名詞:
-
Leader election: 領(lǐng)導(dǎo)選舉 是目前落地的一致性算法不可避免的一個步驟(CASPaxos 好像不需要遭贸,不過還沒有深入研究)戈咳,為了達到一致性,必須要已一個共同認(rèn)可的節(jié)點做出所有操作壕吹。而 raft 協(xié)議的節(jié)點都會處于三種狀態(tài)之一:
Leader: 操作的真正處理者著蛙,它負(fù)責(zé)將改動確認(rèn)生效,并將其同步到其他節(jié)點耳贬。
Follower: 負(fù)責(zé)將來自 Leader 的改動請求寫入本地 Log册踩,返回成功。
Candidate: 如果 Leader 發(fā)生了故障效拭,沒有收到心跳的 Followers 們會進入 Candidate 狀態(tài)暂吉,并開始選主,并保持該狀態(tài)直到選主結(jié)束缎患。
Log Replication: 當(dāng) Leader 被選舉出來以后慕的,所有的寫請求都必須要到 Leader 執(zhí)行,Leader 會將其作為 Log entry 追加到日志中挤渔,然后給其他 Follower 發(fā)送請求肮街,當(dāng)絕大部分的 (
(N/2)+1
)的 Follower replicated 成功后,就代表該寫入事件成功了判导,就會返回結(jié)果狀態(tài)到客戶端嫉父。Log Compaction: 在實際的應(yīng)用場景中,因為磁盤空間是有限的眼刃,日志不能無限地增長绕辖,否則即使系統(tǒng)需要重啟也需要耗費大量的時間。所以擂红, raft 會對節(jié)點進行
snapshot
操作仪际,執(zhí)行成功后,會將snapshot
之前的日志丟棄掉。
為了更快速理解 raft 算法树碱,我們可以帶著3個問題觀看 動畫 :
Leader 在什么時候肯适,如何進行選舉?
Log Replication 作用是什么成榜,如何實現(xiàn)框舔?
節(jié)點數(shù)量發(fā)生變化(增加/減少)時如何處理?
結(jié)合 consul
代碼來理解下 raft 協(xié)議:
可以看到 raft 協(xié)議中赎婚,節(jié)點分別處于 3種狀態(tài) :
// raft/state.go
type RaftState uint32
const (
Follower RaftState = iota
Candidate
Leader
// 這個狀態(tài)是 hashicorp/raft 特有的刘绣,表示節(jié)點下線
Shutdown
)
// 獲取當(dāng)前 raft 節(jié)點狀態(tài)
func (r *raftState) getState() RaftState {
stateAddr := (*uint32)(&r.state)
return RaftState(atomic.LoadUint32(stateAddr))
}
先來看看在 consul
里面是如何創(chuàng)建一個 raft node
的:
// raft/api.go
// NewRaft 新建一個 raft 節(jié)點,傳入?yún)?shù)除了 config/transport 以外惑淳,
// 還有 fsm/logs/store/snapshot 额港,這是為了可以通過傳遞參數(shù)饺窿,使崩掉
// 的節(jié)點能夠快速恢復(fù)
func NewRaft(config *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
// TODO ...
r = &Raft{
// ...
}
// 默認(rèn)為 Follower 角色
r.setState(Follower)
// 該處只應(yīng)該在測試的時候被調(diào)用歧焦,
// 不是必須的主流程,所以單獨拎出來
if conf.StartAsLeader {
r.setState(Leader)
r.setLeader(r.localAddr)
}
r.goFunc(r.run) // 管理 Server 狀態(tài)肚医,運行不同的代碼
r.goFunc(r.runFSM) // 管理狀態(tài)機
r.goFunc(r.runSnapshots) // 管理快照
return r, nil
}
// raft/raft.go
func (r *Raft) run() {
for {
select {
case <-r.shutdownCh: // 會被 close(r.shutdownCh) 觸發(fā)
r.setLeader("") // 不聯(lián)系 Leader 節(jié)點了
return
default:
}
switch r.getState() {
case Follower:
r.runFollower()
case Candidate:
r.runCandidate()
case Leader:
r.runLeader()
// 問題:為什么沒有判斷 Shutdown 的狀態(tài)呢绢馍?
}
}
}
// 只有 Follower 狀態(tài)的節(jié)點會運行該段代碼
func (r *Raft) runFollower() {
// ...
for {
select {
case rpc := <-rpcCh:
r.processRPC(rpc) // 根據(jù) rpc 的類型進行 附加日志/備份快照 等操作
// ...
case v := <-r.verifyCh:
// Reject any operations since we are not the leader
v.respond(ErrNotLeader)
case <-heartbeatTimer: // 重頭戲,還記得動畫里面的演示嗎肠套?
// 重設(shè)一個隨機的定時器
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
// 獲取上一次從 Leader 節(jié)點得到聯(lián)系的時間
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
continue // 還能聯(lián)系上 Leader 舰涌,繼續(xù)下一次操作
}
// 從這里開始就發(fā)現(xiàn)聯(lián)系不上 Leader 了,需要重新開始投票D阒伞4砂摇!
lastLeader := r.Leader() // 先存一下之前的 Leader
r.setLeader("") // 離開這個 Leader 的集群
if r.configurations.lastestIndex == 0 { // 沒有其他可通信節(jié)點了刁赖,終止選舉8橥础!宇弛!
// ...
} else if { // 關(guān)于選舉權(quán)的配置鸡典,可以忽略
//...
} else {
// Logger
r.setState(Candidate)
return
}
case <-r.shutdownCh:
return
}
}
}
除了代理的正常維護自己的功能,它還必須處理日志枪芒,所以這里使用了 有限狀態(tài)自動機 (FSM)的方式避免了影響正常節(jié)點功能彻况。
擁有相同日志序列的應(yīng)用必須歸結(jié)于相同的狀態(tài),意味著行為必須是確定性的舅踪。
// raft/fsm.go
// FSM 的接口定義了一些能夠操作日志的有限狀態(tài)機的行為
type FSM interface {
// 提交日志條目纽甘,如果該 FSM 在 Leader 節(jié)點上運行
// 將返回一個 logFuture 用于等待直至日志被認(rèn)為提交成功
Apply(*Log) interface{}
// 將當(dāng)前日志進行快照備份,需要注意的是本函數(shù)不能與 Apply
// 同時在多個線程中被調(diào)用抽碌,但允許在同一線程中并發(fā)運行
Snapshot() (FSMSnapshot, error)
// 將用于從快照中恢復(fù) FSM 狀態(tài)贷腕,一旦該函數(shù)被調(diào)用,F(xiàn)SM 必須
// 暫停其他所有調(diào)用與狀態(tài),直至恢復(fù)完畢
Restore(io.ReadCloser) error
}
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
// commit 提交日志
commit := func(req *commitTuple) {
var resp interface{}
if req.log.Type == LogCommand {
resp = r.fsm.Apply(req.log) // 提交日志條目
}
// 更新索引為提交日志的值
lastIndex = req.log.Index
lastTerm = req.log.Term
// Leader 節(jié)點需要等待日志 commit
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
// restore 恢復(fù)快照
restore := func(req *restoreFuture) {
meta, source, err := r.snapshots.Open(req.ID) // 讀取快照
// error handle
if err := r.fsm.Restore(source); err != nil {
// error handle
}
source.Close()
// 更新索引為快照中的值
lastIndex = meta.Index
lastTerm = meta.Term
req.respond(nil)
}
// snapshot 生成快照備份
snapshot := func(req *reqSnapshotFuture) {
if lastIndex == 0 {
req.respond(ErrNothingNewToSnapshot)
return
}
snap, err := r.fsm.Snapshot()
// 返回當(dāng)前的索引給請求
req.index = lastIndex
req.term = lastTerm
req.snapshot = snap
req.respond(err)
}
for {
select {
case ptr := <-r.fsmMutateCh: // 變化通道
switch req := ptr.(type) {
case *commitTuple:
commit(req)
case *restoreFuture:
restore(req)
default:
// panic here...
}
case req := <-r.fsmSnapshotCh: // 快照通道
snapshot(req)
case <-r.shutdownCh:
return
}
}
}
到此泽裳,基本上 raft 協(xié)議的主干實現(xiàn)已經(jīng)講解完全了瞒斩,此外還有如日志、快照的 存儲引擎 涮总、請求的 通信協(xié)議 等可以留給有興趣的童鞋自行去讀一下胸囱。