是配置共享和服務(wù)發(fā)現(xiàn)的鍵值存儲系統(tǒng)
1 應(yīng)用程序可以在集群中共享信息哨坪、配置或作服務(wù)發(fā)現(xiàn)握截,etcd 會在集群的各個節(jié)點中復(fù)制這些數(shù)據(jù)并保證這些數(shù)據(jù)始終正確崖媚。
2 服務(wù)注冊的場景在微服務(wù)架構(gòu)設(shè)計中告抄,被用來統(tǒng)一管理各個服務(wù)的服務(wù)地址,客戶端和網(wǎng)關(guān)可以通過注冊中心查詢要訪問的目標服務(wù)地址淳附,實現(xiàn)動態(tài)服務(wù)訪問,并在此基礎(chǔ)上實現(xiàn)服務(wù)負 載均衡蠢古。
3 有點類似Redis的操作奴曙,首先會啟動一個服務(wù),然后利用命令行來操作草讶,它是一個KV存儲的格式
1 get put watch del 等命令!
- 4 watch 是來監(jiān)控相關(guān)的操作的
- 5 etcd 存儲了歷史信息洽糟,也就是有個機制緩存了,這是基于B+樹來執(zhí)行的堕战。(B+是隨機順序的@だ!)
- 6 etcd 里的Leaded 會為每個Key創(chuàng)建一個索引,每個索引對應(yīng)著一個B+樹
- 7 每個B+樹里存儲了Key的歷史版本信息践啄。
- 8
etcd 實現(xiàn)強一致性的核心在于 Raft算法
- 1 服務(wù)器下載地址:https://github.com/etcd-io/etcd/releases/tag/v3.5.5 下載完畢直接啟動etcd.exe 服務(wù)器就行浇雹。
- 2 etcd 是一個k,v 存儲系統(tǒng)
- 3 put 命令是寫數(shù)據(jù):.\etcdctl.exe put key
- 4 .\etcdctl.exe get key 是獲取key的值
- 5 .\etcdctl.exe watch key 是監(jiān)聽這個key
- 6 .\etcdctl.exe txn -i 可以進入事務(wù)操作
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":8,"raft_term":4},"kvs":[{"key":"a2V5","create_revision":5,"mod_revision":8,"version":4,"value":"eXV5dQ=="}],"count":1}
- 7 revision 字段是Key的全局版本號,在內(nèi)存中B樹會維護一個Key-->對應(yīng)的revison屿讽,而在磁盤中B+樹會針對revison版本號來修改對應(yīng)的value (映射關(guān)系!)
- 8 .\etcdctl.exe lease grant xxx 設(shè)置一個過期時間
.\etcdctl.exe lease grant 50
.\etcdctl.exe lease put key sos
.\etcdctl.exe lease put key sos2 --lease=xxxxx字符串
.\etcdctl.exe get key //等過期時間一過昭灵,在獲取的時候就會失敗!
etcd一致性
- 1 先寫日志在寫磁盤,涉及到leader的選舉機制伐谈,利用Raft共識算法達成數(shù)據(jù)同步烂完。
demo
- 1 wins本地啟動服務(wù)器: etcd.exe
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"time"
)
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Println("connect to etcd is failed")
return
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
res, _ := client.Get(ctx, "name")
cancel()
for index, v := range res.Kvs {
fmt.Printf("下標為:%v 鍵是:%s 值是:%s \n", index, v.Key, v.Value)
}
}
Raft.go
import (
"fmt"
"math/rand"
"sync"
"time"
)
type NodeInfo struct {
ID string
Port string
}
type Message struct {
MsgBody string
MsgID int
}
type Raft struct {
//本節(jié)點信息
node *NodeInfo
//本節(jié)點獲得的投票數(shù)
vote int
//互斥鎖
lock sync.Mutex
//本節(jié)點編號
me string
//當前任期
currentTerm int
//為哪個節(jié)點投票
votedFor string
//當前節(jié)點狀態(tài)0 follower 1 candidate 2 leader
state int
//發(fā)送最后一條消息的時間
lastSendMessageTime int64
//發(fā)送最后一次心跳的時間
lastSendHeartBeatTime int64
//當前節(jié)點的領(lǐng)導(dǎo)
currentLeader string
//心跳超時時間(秒)
heartBeatTimeout int
//接收投票成功通道
voteChan chan bool
//心跳信號
heartChan chan bool
}
func NewRaft(id, port string) *Raft {
rf := new(Raft)
rf.node = &NodeInfo{
ID: id,
Port: port,
}
//當前節(jié)點獲得票數(shù)
rf.setVote(0)
//編號
rf.me = id
//給0 1 2三個節(jié)點投票,給誰都不投
rf.setVoteFor("-1")
//設(shè)置節(jié)點狀態(tài) 0 follower
rf.setStatus(0)
//最后一次心跳檢測時間
rf.lastSendHeartBeatTime = 0
//心跳超時時間
rf.heartBeatTimeout = heartBeatTimeout
//最初沒有領(lǐng)導(dǎo)
rf.setCurrentLeader("-1")
//設(shè)置任期
rf.setTerm(0)
//投票通道
rf.voteChan = make(chan bool)
//心跳通道
rf.heartChan = make(chan bool)
return rf
}
//設(shè)置投票數(shù)量
func (rf *Raft) setVote(num int) {
rf.lock.Lock()
rf.vote = num
rf.lock.Unlock()
}
//設(shè)置為誰投票
func (rf *Raft) setVoteFor(id string) {
rf.lock.Lock()
rf.votedFor = id
rf.lock.Unlock()
}
//設(shè)置當前節(jié)點狀態(tài)
func (rf *Raft) setStatus(state int) {
rf.lock.Lock()
rf.state = state
rf.lock.Unlock()
}
//設(shè)置當前領(lǐng)導(dǎo)者
func (rf *Raft) setCurrentLeader(leader string) {
rf.lock.Lock()
rf.currentLeader = leader
rf.lock.Unlock()
}
//設(shè)置任期
func (rf *Raft) setTerm(term int) {
rf.lock.Lock()
rf.currentTerm = term
rf.lock.Unlock()
}
//投票累加
func (rf *Raft) voteAdd() {
rf.lock.Lock()
rf.vote++
rf.lock.Unlock()
}
//任期累加
func (rf *Raft) termAdd() {
rf.lock.Lock()
rf.currentTerm++
rf.lock.Unlock()
}
//獲取當前時間的毫秒數(shù)
func millisecond() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
//產(chǎn)生隨機值
func randRange(min, max int64) int64 {
//用于心跳信號的時間
rand.Seed(time.Now().UnixNano())
return rand.Int63n(max-min) + min
}
//恢復(fù)默認設(shè)置
func (rf *Raft) reDefault() {
rf.setVote(0)
rf.setVoteFor("-1")
rf.setStatus(0)
}
//給跟隨者節(jié)點發(fā)送心跳包
func (rf *Raft) sendHeartPacket() {
//如果收到通道開啟的消息诵棵,將會向其他節(jié)點進行固定頻率的心跳檢測
<-rf.heartChan //沒有收到channel就會阻塞等待
for {
fmt.Println("本節(jié)點開始發(fā)送心跳檢測")
rf.broadcast("Raft.HeartBeatResponse", rf.node, func(ok bool) {
fmt.Println("收到心跳檢測", ok)
})
//最后一次心跳的時間
rf.lastSendHeartBeatTime = millisecond()
//休眠 --》心跳檢測頻率的時間
time.Sleep(time.Second * time.Duration(heartBeatRate))
}
}
//修改節(jié)點為候選人狀態(tài)
func (rf *Raft) becomeCandidate() bool {
r := randRange(1500, 5000)
//休眠隨機時間后抠蚣,再開始成為候選人
time.Sleep(time.Duration(r) * time.Millisecond)
//如果當前節(jié)點是跟隨者,并且沒有領(lǐng)導(dǎo)履澳,也沒有為別人投票
if rf.state == 0 && rf.currentLeader == "-1" && rf.votedFor == "-1" {
//將節(jié)點狀態(tài)變成候選者
rf.setStatus(1)
//設(shè)置為自己投了票
rf.setVoteFor(rf.me)
//自己的投票數(shù)量增加
rf.voteAdd()
//節(jié)點任期加1
rf.termAdd()
fmt.Println("本節(jié)點已經(jīng)變成候選人狀態(tài)")
fmt.Printf("當前獲得的票數(shù):%d\n", rf.vote)
//開啟選舉通道
return true
}
return false
}
//進行選舉
func (rf *Raft) election() bool {
fmt.Println("開始進行領(lǐng)導(dǎo)者選舉嘶窄,向其他節(jié)點進行廣播")
go rf.broadcast("Raft.Vote", rf.node, func(ok bool) {
rf.voteChan <- ok
})
for {
select {
//選舉超時
case <-time.After(time.Second * time.Duration(electionTimeout)):
fmt.Println("領(lǐng)導(dǎo)者選舉超時怀跛,節(jié)點變更為追隨者狀態(tài)")
rf.reDefault()
return false
case ok := <-rf.voteChan:
if ok {
rf.voteAdd()
fmt.Printf("獲得來自其他節(jié)點的投票,當前得票數(shù):%d\n", rf.vote)
}
if rf.vote >= nodeCount/2+1 && rf.currentLeader == "-1" {
fmt.Println("獲得大多數(shù)節(jié)點的同意柄冲,本節(jié)點被選舉成為了leader")
//節(jié)點狀態(tài)變?yōu)?吻谋,代表leader
rf.setStatus(2)
//當前領(lǐng)導(dǎo)者為自己
rf.setCurrentLeader(rf.me)
fmt.Println("向其他節(jié)點進行廣播本節(jié)點成為了leader...")
go rf.broadcast("Raft.ConfirmationLeader", rf.node, func(ok bool) {
fmt.Println("其他節(jié)點:是否同意[", rf.node.ID, "]為領(lǐng)導(dǎo)者", ok)
})
//有l(wèi)eader了,可以發(fā)送心跳包了
rf.heartChan <- true
return true
}
}
}
}
//嘗試成為候選人并選舉
func (rf *Raft) tryToBeCandidateWithElection() {
for {
//嘗試成為候選人節(jié)點
if rf.becomeCandidate() {
//成為后選人節(jié)點后 向其他節(jié)點要選票來進行選舉
if rf.election() {
break
} else {
continue //領(lǐng)導(dǎo)者選舉超時,重新稱為候選人進行選舉
}
} else {
//沒有變成候選人现横,則退出
//不是跟隨者漓拾,或者有領(lǐng)導(dǎo),或者為別人投票
break
}
}
}
//心跳超時檢測
func (rf *Raft) heartTimeoutDetection() {
for {
//0.5秒檢測一次
time.Sleep(time.Millisecond * 5000)
//心跳超時
if rf.lastSendHeartBeatTime != 0 && (millisecond()-rf.lastSendHeartBeatTime) > int64(rf.heartBeatTimeout*1000) {
fmt.Printf("心跳檢測超時戒祠,已超過%d秒\n", rf.heartBeatTimeout)
fmt.Println("即將重新開啟選舉")
rf.reDefault()
rf.setCurrentLeader("-1")
rf.lastSendHeartBeatTime = 0
go rf.tryToBeCandidateWithElection()
}
}
}