公司的業(yè)務(wù)里面使用了Consul做服務(wù)發(fā)現(xiàn), 發(fā)現(xiàn)其有一個watch機制.這個watch機制引起我的好奇, 因為剛好在看Etcd-raft的代碼, Etcd也有類似的watch機制, 所以趁熱打鐵, 趕緊趁周末研究下etcd watch機制源碼的實現(xiàn).
在看源碼之前, 我們通過一個簡單的例子, 看看Etcd的watch是如何使用的.
- 先往Etcd寫入一對KV
curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value="神蛋使者"
- Watch這對KV
如果一切正常, 這時候請求會被阻塞住.
- 新開一個終端, 修改存進去的KV
curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value=神蛋使者1號
- 阻塞的那個請求返回watch到的結(jié)果
{
"action":"set",
"node":{
"key":"/name",
"value":"神蛋使者1號",
"modifiedIndex":25,
"createdIndex":25
},
"prevNode": {
"key":"/name",
"value":"神蛋使者",
"modifiedIndex":24,
"createdIndex":24
}
}
體驗流程大概就是這樣, 下面正式看源碼.
接口定義
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}
該接口定義了兩個方法, Watch 和 Close
Watch 方法返回一個WatchChan 類似的變量, WatchChan是一個channel, 定義如下:
type WatchChan <-chan WatchResponse
該通道傳遞WatchResponse類型
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
}
其中Event類型是一個gRPC生成的消息對象
type Event struct {
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
接下來看實現(xiàn)了Watcher接口的watcher類型
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
}
watcher結(jié)構(gòu)很簡單, 只有3個字段. remote抽象了發(fā)起watch請求的客戶端, streams是一個map, 這個map映射了交互的數(shù)據(jù)流.還有一個保護并發(fā)環(huán)境下數(shù)據(jù)流讀寫安全的讀寫鎖.
streams所屬的watchGrpcStream類型抽象了所有交互的數(shù)據(jù), 它的結(jié)構(gòu)定義如下:
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
比較有意思的是, watchGrpcStream也包含了一個watcher類型的owner字段, watcher和watchGrpcStream可以互相引用到對方.同時又定義了watcher類型中已經(jīng)定義過的remote,而且還不是指針類型, 這點不大明白作用是啥.
還有幾個字段值得關(guān)注, 一個是substreams, 看下它的定義和注釋:
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
再看看watcherStream類型的定義:
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
畫個圖整理下他們之間的關(guān)系:
接下來輪到watcher是如何watch方法的了:
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// 應(yīng)用配置
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
// 根據(jù)傳入的參數(shù)構(gòu)造watch請求
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
// 將請求上下文格式化為字符串
ctxKey := fmt.Sprintf("%v", ctx)
// find or allocate appropriate grpc watch stream
// 接下來配置對應(yīng)的輸出流, 注意得加鎖
w.mu.Lock()
// 如果stream為空, 返回一個已經(jīng)關(guān)閉的channel.
// 這種情況應(yīng)該是防止streams為空的情況
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
// 注意這里, 前面我們提到streams是一個map,該map的key是請求上下文
// 如果該請求對應(yīng)的流為空,則新建
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
// couldn't create channel; return closed channel
// 這里要設(shè)置為緩沖的原因可能與下面的兩個
// closeCh <- WatchResponse{closeErr: wgs.closeErr}
// 語句有關(guān),這里不理解
closeCh := make(chan WatchResponse, 1)
// submit request
select {
// 發(fā)送上面構(gòu)造好的watch請求給對應(yīng)的流
case reqc <- wr:
ok = true
// 請求斷開(這里應(yīng)該囊括了客戶端請求斷開的所有情況)
case <-wr.ctx.Done():
// watch完成
// 這里應(yīng)該是處理非正常完成的情況
// 注意下面的重試邏輯
case <-donec:
if wgs.closeErr != nil {
// 如果不是空上下文導致流被丟棄的情況
// 則不應(yīng)該重試
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
// receive channel
// 如果是初始請求順利發(fā)送才會執(zhí)行這里
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
}
close(closeCh)
return closeCh
}
還有Watcher接口的另一個方法Close:
func (w *watcher) Close() (err error) {
// 在鎖內(nèi)先將streams字段置為空
// 在鎖外再將一個個流都關(guān)閉
// 這樣做的意義在于不管哪個流關(guān)閉失敗了
// 都能先保證streams與這些流的關(guān)系被切斷
w.mu.Lock()
streams := w.streams
w.streams = nil
w.mu.Unlock()
for _, wgs := range streams {
if werr := wgs.Close(); werr != nil {
err = werr
}
}
// etcd竟然也只是返回一個error
// 雖然上面的for循環(huán)可能產(chǎn)生多個error
return err
}
這樣watcher就實現(xiàn)了Watcher接口.大致的實現(xiàn)思路本文就介紹到這里,剩下的代碼也都是對其他相關(guān)數(shù)據(jù)結(jié)構(gòu)的邏輯包裝操作.
簡單閱讀Etcd的這一小部分源碼下來, 我看到他們源碼中的兩個東西,算是Golang或者編程上面的一些最佳實踐:
對包外只暴露一個公共接口, 包內(nèi)的結(jié)構(gòu)體實現(xiàn)該接口即可.就像本文中的Watcher接口和watcher結(jié)構(gòu)體.這樣有兩個好處, 一個就是代碼能夠解耦,還有就是可以省去命名的苦惱(__)
另一個是注釋的書寫方式,我發(fā)現(xiàn)etcd源碼里的注釋很大一部分寫在變量的定義上面,而且變量的定義名都很清晰.
抽象得體.這個其實不只是Etcd, 其他任何優(yōu)秀的開源作品都把他們的代碼抽象得很到位.突然想起我寫的那些渣渣代碼%>_<%
最后, 總結(jié)下etcd的watch機制.其實歸根結(jié)底, 它的watch是通過gRPC的多路復(fù)用實現(xiàn)的,這是一個基于HTTP/2的特性.所以本文可能有些偏離了主題,探討Etcd的watch機制, 其實應(yīng)該研究HTTP/2才是.
算是給自己挖個坑.