前言
Zookeeper作為一種應(yīng)用協(xié)調(diào)系統(tǒng)兆沙,有著廣泛的應(yīng)用诚些,其中一種就是作為服務(wù)注冊中心,比如:Zookeeper+Dubbo+Spring實(shí)現(xiàn)服務(wù)注冊與發(fā)現(xiàn)氢哮。
目前袋毙,遇到了這樣一個(gè)問題,用Go語言編寫的服務(wù)A冗尤,依賴于Zookeeper實(shí)現(xiàn)服務(wù)發(fā)現(xiàn)與注冊听盖,Master選舉胀溺,資源分配等功能。這些功能很大程度上依賴于對Zookeeper節(jié)點(diǎn)監(jiān)聽的操作皆看,監(jiān)聽要具有實(shí)時(shí)性和穩(wěn)定性仓坞,并要針對go-zookeeper——封裝了Zookeeper操作的API做優(yōu)化,實(shí)現(xiàn)對Zookeeper的永久監(jiān)聽功能腰吟。
本文以Master選舉為例无埃,實(shí)現(xiàn)對Zookeeper的/master下屬子節(jié)點(diǎn)的循環(huán)監(jiān)聽
go-zookeeper監(jiān)聽分析
Master節(jié)點(diǎn)只能有一個(gè),在api中毛雇,我們發(fā)現(xiàn)ChildrenW可以滿足對/master子節(jié)點(diǎn)監(jiān)聽的需求
// 1. 返回通道
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
if err := validatePath(path, false); err != nil {
return nil, nil, nil, err
}
var ech <-chan Event
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeChild)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Children, &res.Stat, ech, err
}
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
// 2. 通道大小為1
ch := make(chan Event, 1)
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
return ch
}
在源碼中我們可以看到
ChildrenW會(huì)返回一個(gè)通道嫉称,這個(gè)通道用來傳遞節(jié)點(diǎn)監(jiān)聽的結(jié)果,結(jié)果類型為Event
通道大小為1灵疮,且每調(diào)用一次ChildrenW就會(huì)創(chuàng)建一個(gè)通道织阅,故監(jiān)聽只能生效一次
我們需要自定義函數(shù),用來接收Event傳遞而來的結(jié)果震捣,這個(gè)函數(shù)必然是異步的
如果我們想實(shí)現(xiàn)循環(huán)監(jiān)聽
就要不斷的循環(huán)ChildrenW和他的Event響應(yīng)函數(shù)
這兩者還都是異步的荔棉,因?yàn)橐WC監(jiān)聽的實(shí)時(shí)性,在接收到Event后要立即異步開啟新的ChildrenW和其響應(yīng)函數(shù)
如果簡單的使用循環(huán)與協(xié)程可能會(huì)導(dǎo)致
代碼丑陋
會(huì)開啟大量協(xié)程
無法控制協(xié)程執(zhí)行順序蒿赢,導(dǎo)致程序運(yùn)行結(jié)果異常
循環(huán)監(jiān)聽
// 使用專屬通道接收event
// 通道大小由服務(wù)并發(fā)量決定润樱,越大處理越快
MasterChan = make(chan zk.Event, 1)
// zookeeper master節(jié)點(diǎn)監(jiān)聽函數(shù),需要異步調(diào)用
func masterObserver(conn *zk.Conn, path string) {
// 開啟event響應(yīng)協(xié)程
go masterChanProcess(conn)
for {
_, _, event, err := conn.ChildrenW(path)
if err != nil {
log.Printf("Start to watch children path %s failed, err: %s", path, err.Error())
}
log.Printf("Start to watch children path %s successful!", path)
select {
case e := <-event:
// 將監(jiān)聽得到的event放入其專屬通道內(nèi)
MasterChan <- e
}
}
}
// event響應(yīng)函數(shù)羡棵,需要異步調(diào)用
func masterChanProcess(conn *zk.Conn) {
for {
select {
// 專屬通道響應(yīng)
case event := <-MasterChan:
log.Printf("Master alteration has been detected, path is %s", event.Path)
/** event響應(yīng)過程祥国,your code */
}
}
}
func main() {
// zk初始化
go masterChanProcess(conn)
}
優(yōu)點(diǎn)
僅開啟兩個(gè)協(xié)程,資源消耗小
通道保證高并發(fā)性能晾腔,以及event的響應(yīng)順序
代碼簡潔