etcd初始化流程
etcd啟動(dòng)時(shí)首先會(huì)調(diào)用startEtcdOrProxyV2
代兵, 這個(gè)方法里首先會(huì)進(jìn)行config的初始化以及解析傳入的配置項(xiàng)片习,然后檢查config中的Dir是否為空,如果為空則根據(jù)config中指定的Name來(lái)生成data dir
,默認(rèn)如下所示,后面
再次啟動(dòng)時(shí)會(huì)檢查data dir
的類型,目前有三種:member
, proxy
, empty
蝙斜,分別代表成員经磅,代理泌绣,空。然后進(jìn)入不同的分支調(diào)用startEtcd
预厌,或者startProxy
阿迈。
+---->startEtcd ---> configurePeerListeners ---> configureClientListeners ----> etcdserver.NewServer
|
startEtcdOrProxyV2 ---> newConifg ---> cfg.parse ---> identify data dir ---> |
|
+---->startProxy
etcd data dir如下:
(ENV) [root@ceph-2 etcd]# ls
10.255.101.74.etcd 10.255.101.74.proxy.etcd etcd.conf etcd-proxy.conf
(ENV) [root@ceph-2 etcd]# tree -h
.
├── [ 20] 10.255.101.74.etcd
│ └── [ 29] member
│ ├── [ 246] snap
│ │ ├── [366K] 0000000000000002-0000000000d5a021.snap
│ │ ├── [366K] 0000000000000002-0000000000d726c2.snap
│ │ ├── [366K] 0000000000000002-0000000000d8ad63.snap
│ │ ├── [366K] 0000000000000002-0000000000da3404.snap
│ │ ├── [362K] 0000000000000002-0000000000dbbaa5.snap
│ │ └── [ 20K] db
│ └── [ 244] wal
│ ├── [ 61M] 000000000000001e-0000000000c0cf3d.wal
│ ├── [ 61M] 000000000000001f-0000000000c775f1.wal
│ ├── [ 61M] 0000000000000020-0000000000ce234b.wal
│ ├── [ 61M] 0000000000000021-0000000000d4ce84.wal
│ ├── [ 61M] 0000000000000022-0000000000db76f5.wal
│ └── [ 61M] 0.tmp
├── [ 19] 10.255.101.74.proxy.etcd
│ └── [ 21] proxy
│ └── [ 70] cluster
├── [3.6K] etcd.conf
└── [ 558] etcd-proxy.conf
6 directories, 15 files
啟動(dòng)etcd server時(shí)會(huì)創(chuàng)建store,如果data dir, wal
dir和snap
dir不存在則創(chuàng)建, snap/db
為backend path轧叽。如果db
存在的話苗沧,則用db構(gòu)建Backend
。構(gòu)建完成后會(huì)啟動(dòng)goroutine執(zhí)行backend.run()
炭晒。
// file: mvcc/backend/backend.go
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
}
接著崎页,新創(chuàng)建Transport
// etcdserver/server.go
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
WAL
如果WAL目錄存在,則會(huì)打開所有的wal并檢驗(yàn)snapshot entries腰埂,其通過(guò)decoder來(lái)對(duì)wal進(jìn)行解碼,decoder結(jié)構(gòu)如下
// wal/decoder.go
type decoder struct {
mu sync.Mutex
brs []*bufio.Reader
// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32
}
其中brs對(duì)應(yīng)所有的wal文件Reader蜈膨,分別遍歷每個(gè)wal文件:
- 以
little endian
的形式讀取wal開頭8個(gè)字節(jié)屿笼,例如下面wal文件中開頭8個(gè)字節(jié)為04 00 00 00 00 00 00 84
,注意是小端優(yōu)先序翁巍,低56bits代表record字節(jié)驴一,值為4; 高8bits的低3位部分代表pad灶壶,84的二進(jìn)制表述為10000100肝断, 低三位的值為4。WAL entry size
最大為10MB驰凛。每個(gè)WAL segment file
的默認(rèn)大小為64MB胸懈。0000000 04 00 00 00 00 00 00 84 08 04 10 00 00 00 00 00 0000010 20 00 00 00 00 00 00 00 08 01 10 bf ae e5 db 08 0000020 1a 16 08 e9 e4 bc b2 8f ba fc 88 82 01 10 c4 cf
var ( // SegmentSizeBytes is the preallocated size of each wal segment file. // The actual size might be larger than this. In general, the default // value should be used, but this is defined as an exported variable // so that tests can set a different segment size. SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB )
- 讀取record bytes + padding bytes
- 將其反序列化為Record,其結(jié)構(gòu)如下恰响,其中包括類型趣钱,CRC以及數(shù)據(jù),校驗(yàn)時(shí)會(huì)根據(jù)data計(jì)算其CRC值胚宦,然后與Record中的CRC值進(jìn)行比較首有,如果不相等,說(shuō)明數(shù)據(jù)已經(jīng)損壞枢劝。
- 獲取所有Record類型為snapshot且其Index小于
Committed hardState
井联。
// wal/walpb/record.pb.go
type Record struct {
Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
如下所示,Record
有五種類型
// wal/wal.go
const (
metadataType int64 = iota + 1
entryType
stateType
crcType
snapshotType
)
// raft/raftpb/raft.pb.go
type HardState struct {
Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
// wal/walpb/record.pb.go
type Snapshot struct {
Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}`
+----------------------------------------------------------------------+
| +-------------------------------+---------------------------------+ |
| | record bytes<56bits> | padding <lower 3 bits of 8bits> | |
| |-----------------------------------------------------------------+ |
| | data | |
| +-----------------------------------------------------------------+ |
| ... |
+----------------------------------------------------------------------+
Snapshot
snap
目錄下只包含.snap
結(jié)尾的文件以及db
文件您旁。其中每個(gè).snap
文件命名格式為%016x-%016x.snap
烙常,即term-index.snap
。而wal
目錄下的格式為%016x-%016x.wal
被冒,即seq-index.wal
军掂。
其對(duì)應(yīng)snappb.Snapshot
結(jié)構(gòu):
// etcdserver/api/snap/snappb/snap.pb.go
type Snapshot struct {
Crc uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
snappb.Snapshot
中的Data
又對(duì)應(yīng)raftpb.Snapshot
// raft/raftpb/raft.pb.go
type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
啟動(dòng)/重啟節(jié)點(diǎn)
根據(jù)是否存在WAL目錄轮蜕,以及是否是new cluster來(lái)判斷執(zhí)行啟動(dòng)節(jié)點(diǎn)還是重啟節(jié)點(diǎn),下面以重啟節(jié)點(diǎn)為例進(jìn)行介紹蝗锥。
- 從WAL中讀取
metadata
跃洛,raftpb.HardState
以及所有的raftpb.Entry
。
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
NodeID uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
ClusterID uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
XXX_unrecognized []byte `json:"-"`
}
- 創(chuàng)建
RaftCluster
對(duì)象终议,其中metadata
中的NodeID
和ClusterID
分別對(duì)應(yīng)RaftCluster
的localID
和cid
汇竭。
// file: etcdserver/api/membership/cluster.go
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
lg *zap.Logger
localID types.ID
cid types.ID
token string
v2store v2store.Store
be backend.Backend
sync.Mutex // guards the fields below
version *semver.Version
members map[types.ID]*Member
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool
downgradeInfo *DowngradeInfo
}
- 創(chuàng)建
MemoryStorage
。- Apply snapshot
- 設(shè)置HardState穴张,步驟一中獲取的值
- 將步驟一中獲取的entries append到
MemoryStorage
细燎。
// file: raft/storage.go
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
// Protects access to all fields. Most methods of MemoryStorage are
// run on the raft goroutine, but Append() is run on an application
// goroutine.
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
}
4 . 根據(jù)raft.Config配置重啟Node
通常建議ElectionTick = 10 * HeartbeatTick
,這樣可以避免不必要的leader切換皂甘。
// file: raft/rawnode.go
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
-
根據(jù)raft.Config新建Raft玻驻。詳見
raft/raft.go
文件。校驗(yàn)Config
-
新建raftLog偿枕。如下圖所示璧瞬,需要注意的是raftLog的
committed
和applied
初始值為firstIndex - 1
,log.unstable.offset
等于lastIndex + 1
渐夸。
60 log := &raftLog{ 61 storage: storage, 62 logger: logger, 63 maxNextEntsSize: maxNextEntsSize, 64 } 65 firstIndex, err := storage.FirstIndex() 66 if err != nil { 67 panic(err) // TODO(bdarnell) 68 } 69 lastIndex, err := storage.LastIndex() 70 if err != nil { 71 panic(err) // TODO(bdarnell) 72 } 73 log.unstable.offset = lastIndex + 1 74 log.unstable.logger = logger 75 // Initialize our committed and applied pointers to the time of the last compaction. 76 log.committed = firstIndex - 1 77 log.applied = firstIndex - 1
從Memory Storage中獲取
HardState
和ConfState
嗤锉,前面提到過(guò)Memory Storage會(huì)Apply snapshot已經(jīng)獲取WAL中記錄的Hard State信息。構(gòu)建raft信息墓塌,默認(rèn)情況下每條message的最大size為1MB瘟忱。
const ( // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). // Assuming the RTT is around 10ms, 1MB max size is large enough. maxSizePerMsg = 1 * 1024 * 1024 // Never overflow the rafthttp buffer, which is 4096. // TODO: a better const? maxInflightMsgs = 4096 / 8 )
- 根據(jù)上面步驟3獲取的
HardState
設(shè)置raft的Vote
,Term
以及raftLog的committed
苫幢。 - 初始化節(jié)點(diǎn)為follower節(jié)點(diǎn)访诱。包括為其指定
step
,tick
, 重置Term
值,將其Lead
置為None态坦,State
置為Follower盐数。
695 func (r *raft) becomeFollower(term uint64, lead uint64) { 696 r.step = stepFollower 697 r.reset(term) 698 r.tick = r.tickElection 699 r.lead = lead 700 r.state = StateFollower 701 r.logger.Infof("%x became follower at term %d", r.id, r.Term) 702 }
根據(jù)Raft信息構(gòu)建RawNode,將raft的
HardState
和Soft State
保存在rawNode的prevSoftSt
和prevHardSt
伞梯。
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
- 新建raft node玫氢,其中node為
Node
接口的標(biāo)準(zhǔn)實(shí)現(xiàn)。
// file: raft/node.go
// Node represents a node in a raft cluster.
type Node interface
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode
}
- 啟動(dòng)goroutine運(yùn)行
node.run()
方法谜诫。詳見raft/node.go
文件漾峡。
// file: raft/raft.go
// StateType represents the role of a node in a cluster.
type StateType uint64
var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
"StatePreCandidate",
}
Transport
Peer
遠(yuǎn)端raft node通過(guò)peer
來(lái)進(jìn)行表述,本地raft node通過(guò)peer來(lái)向遠(yuǎn)端發(fā)送messages喻旷,每個(gè)peer有兩種底層的機(jī)制來(lái)發(fā)送messages生逸,分別為stream
和pipeline
。
etcd主要采用Stream
消息通道和pipeline
消息通道,其中Stream消息通道維護(hù)HTTP長(zhǎng)連接槽袄,主要負(fù)責(zé)數(shù)據(jù)傳輸量較小烙无,發(fā)送比較頻繁的消息,而pipeline消息通道在傳輸數(shù)據(jù)完成后會(huì)立即關(guān)閉連接遍尺,主要負(fù)責(zé)傳輸數(shù)據(jù)量較大截酷,發(fā)送頻率較低的消息,例如傳輸快照數(shù)據(jù)乾戏。
Handler
/raft --> pipelineHandler
/raft/stream/ --> streamHandler
/raft/sanpshot --> snapshotHandler
/raft/probing --> httpHealth
Message encoder/decoder
Message的encoder/decoder
通過(guò)封裝io.Writer/Reader迂苛,分別對(duì)Message進(jìn)行編碼,解碼鼓择。
+----------------------------------------------------------------------+
| +-------------------------------+---------------------------------+ |
| | message size (8 bytes) | |
| |-----------------------------------------------------------------+ |
| | data | |
| +-----------------------------------------------------------------+ |
| ... |
+----------------------------------------------------------------------+
編碼時(shí)先寫入8字節(jié)的message大小三幻,然后才是序列號(hào)過(guò)后的數(shù)據(jù)。
解碼正好與之相反呐能,首先讀取8字節(jié)的message大小念搬,然后判斷其是否大于512MB
,如果大于則直接返錯(cuò)摆出。如果小于閾值則將其反序列化為Message锁蠕。也可以通過(guò)指定讀取的字節(jié)大小,例如snapshot信息最大可為1TB懊蒸。詳細(xì)見etcdserver/api/rafthttp/msg_codec.go
。
// messageEncoder is a encoder that can encode all kinds of messages.
// It MUST be used with a paired messageDecoder.
type messageEncoder struct {
w io.Writer
}
func (enc *messageEncoder) encode(m *raftpb.Message) error {
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
_, err := enc.w.Write(pbutil.MustMarshal(m))
return err
}
// messageDecoder is a decoder that can decode all kinds of messages.
type messageDecoder struct {
r io.Reader
}
var (
readBytesLimit uint64 = 512 * 1024 * 1024 // 512 MB
ErrExceedSizeLimit = errors.New("rafthttp: error limit exceeded")
)
func (dec *messageDecoder) decode() (raftpb.Message, error) {
return dec.decodeLimit(readBytesLimit)
}
func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
var m raftpb.Message
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
if l > numBytes {
return m, ErrExceedSizeLimit
}
buf := make([]byte, int(l))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
return m, m.Unmarshal(buf)
}
本文是基于etcd 3.5.0-pre
版本悯搔。