Etcd watch源碼閱讀

公司的業(yè)務(wù)里面使用了Consul做服務(wù)發(fā)現(xiàn), 發(fā)現(xiàn)其有一個watch機制.這個watch機制引起我的好奇, 因為剛好在看Etcd-raft的代碼, Etcd也有類似的watch機制, 所以趁熱打鐵, 趕緊趁周末研究下etcd watch機制源碼的實現(xiàn).

在看源碼之前, 我們通過一個簡單的例子, 看看Etcd的watch是如何使用的.

  1. 先往Etcd寫入一對KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value="神蛋使者"

  1. Watch這對KV

curl http://127.0.0.1:2379/v2/keys/name?wait=true

如果一切正常, 這時候請求會被阻塞住.

  1. 新開一個終端, 修改存進去的KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value=神蛋使者1號

  1. 阻塞的那個請求返回watch到的結(jié)果
{
  "action":"set",
  "node":{ 
      "key":"/name",
      "value":"神蛋使者1號",
      "modifiedIndex":25,
     "createdIndex":25
  },
   "prevNode": {
     "key":"/name",
     "value":"神蛋使者",
     "modifiedIndex":24,
     "createdIndex":24
   }
  }

體驗流程大概就是這樣, 下面正式看源碼.

接口定義

type Watcher interface {
    // Watch watches on a key or prefix. The watched events will be returned
    // through the returned channel.
    // If the watch is slow or the required rev is compacted, the watch request
    // might be canceled from the server-side and the chan will be closed.
    // 'opts' can be: 'WithRev' and/or 'WithPrefix'.
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

    // Close closes the watcher and cancels all watch requests.
    Close() error
}

該接口定義了兩個方法, Watch 和 Close

Watch 方法返回一個WatchChan 類似的變量, WatchChan是一個channel, 定義如下:

type WatchChan <-chan WatchResponse

該通道傳遞WatchResponse類型

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event

    // CompactRevision is the minimum revision the watcher may receive.
    CompactRevision int64

    // Canceled is used to indicate watch failure.
    // If the watch failed and the stream was about to close, before the channel is closed,
    // the channel sends a final response that has Canceled set to true with a non-nil Err().
    Canceled bool

    // Created is used to indicate the creation of the watcher.
    Created bool

    closeErr error
}

其中Event類型是一個gRPC生成的消息對象

type Event struct {
    // type is the kind of event. If type is a PUT, it indicates
    // new data has been stored to the key. If type is a DELETE,
    // it indicates the key was deleted.
    Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
    // kv holds the KeyValue for the event.
    // A PUT event contains current kv pair.
    // A PUT event with kv.Version=1 indicates the creation of a key.
    // A DELETE/EXPIRE event contains the deleted key with
    // its modification revision set to the revision of deletion.
    Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
    // prev_kv holds the key-value pair before the event happens.
    PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

接下來看實現(xiàn)了Watcher接口的watcher類型

// watcher implements the Watcher interface
type watcher struct {
    remote pb.WatchClient

    // mu protects the grpc streams map
    mu sync.RWMutex

    // streams holds all the active grpc streams keyed by ctx value.
    streams map[string]*watchGrpcStream
}

watcher結(jié)構(gòu)很簡單, 只有3個字段. remote抽象了發(fā)起watch請求的客戶端, streams是一個map, 這個map映射了交互的數(shù)據(jù)流.還有一個保護并發(fā)環(huán)境下數(shù)據(jù)流讀寫安全的讀寫鎖.

streams所屬的watchGrpcStream類型抽象了所有交互的數(shù)據(jù), 它的結(jié)構(gòu)定義如下:

type watchGrpcStream struct {
    owner  *watcher
    remote pb.WatchClient

    // ctx controls internal remote.Watch requests
    ctx context.Context
    // ctxKey is the key used when looking up this stream's context
    ctxKey string
    cancel context.CancelFunc

    // substreams holds all active watchers on this grpc stream
    substreams map[int64]*watcherStream
    // resuming holds all resuming watchers on this grpc stream
    resuming []*watcherStream

    // reqc sends a watch request from Watch() to the main goroutine
    reqc chan *watchRequest
    // respc receives data from the watch client
    respc chan *pb.WatchResponse
    // donec closes to broadcast shutdown
    donec chan struct{}
    // errc transmits errors from grpc Recv to the watch stream reconn logic
    errc chan error
    // closingc gets the watcherStream of closing watchers
    closingc chan *watcherStream
    // wg is Done when all substream goroutines have exited
    wg sync.WaitGroup

    // resumec closes to signal that all substreams should begin resuming
    resumec chan struct{}
    // closeErr is the error that closed the watch stream
    closeErr error
}

比較有意思的是, watchGrpcStream也包含了一個watcher類型的owner字段, watcher和watchGrpcStream可以互相引用到對方.同時又定義了watcher類型中已經(jīng)定義過的remote,而且還不是指針類型, 這點不大明白作用是啥.

還有幾個字段值得關(guān)注, 一個是substreams, 看下它的定義和注釋:

// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream

再看看watcherStream類型的定義:

// watcherStream represents a registered watcher
type watcherStream struct {
    // initReq is the request that initiated this request
    initReq watchRequest

    // outc publishes watch responses to subscriber
    outc chan WatchResponse
    // recvc buffers watch responses before publishing
    recvc chan *WatchResponse
    // donec closes when the watcherStream goroutine stops.
    donec chan struct{}
    // closing is set to true when stream should be scheduled to shutdown.
    closing bool
    // id is the registered watch id on the grpc stream
    id int64

    // buf holds all events received from etcd but not yet consumed by the client
    buf []*WatchResponse
}

畫個圖整理下他們之間的關(guān)系:

下載.png

接下來輪到watcher是如何watch方法的了:

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // 應(yīng)用配置
    ow := opWatch(key, opts...)

    var filters []pb.WatchCreateRequest_FilterType
    if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
    }
    if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
    }

    // 根據(jù)傳入的參數(shù)構(gòu)造watch請求
    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }

    ok := false
    // 將請求上下文格式化為字符串
    ctxKey := fmt.Sprintf("%v", ctx)

    // find or allocate appropriate grpc watch stream
    // 接下來配置對應(yīng)的輸出流, 注意得加鎖
    w.mu.Lock()

    // 如果stream為空, 返回一個已經(jīng)關(guān)閉的channel.
    // 這種情況應(yīng)該是防止streams為空的情況
    if w.streams == nil {
        // closed
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
    }

    // 注意這里, 前面我們提到streams是一個map,該map的key是請求上下文
    // 如果該請求對應(yīng)的流為空,則新建
    wgs := w.streams[ctxKey]
    if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
    }
    donec := wgs.donec
    reqc := wgs.reqc
    w.mu.Unlock()

    // couldn't create channel; return closed channel
        // couldn't create channel; return closed channel
    // 這里要設(shè)置為緩沖的原因可能與下面的兩個
    // closeCh <- WatchResponse{closeErr: wgs.closeErr}
    // 語句有關(guān),這里不理解
    closeCh := make(chan WatchResponse, 1)

    // submit request
    select {
    // 發(fā)送上面構(gòu)造好的watch請求給對應(yīng)的流
    case reqc <- wr:
        ok = true
    // 請求斷開(這里應(yīng)該囊括了客戶端請求斷開的所有情況)
    case <-wr.ctx.Done():
    // watch完成
    // 這里應(yīng)該是處理非正常完成的情況
    // 注意下面的重試邏輯
    case <-donec:
        if wgs.closeErr != nil {
            // 如果不是空上下文導致流被丟棄的情況
            // 則不應(yīng)該重試
            closeCh <- WatchResponse{closeErr: wgs.closeErr}
            break
        }
        // retry; may have dropped stream from no ctxs
        return w.Watch(ctx, key, opts...)
    }

    // receive channel
    // 如果是初始請求順利發(fā)送才會執(zhí)行這里
    if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{closeErr: wgs.closeErr}
                break
            }
            // retry; may have dropped stream from no ctxs
            return w.Watch(ctx, key, opts...)
        }
    }

    close(closeCh)
    return closeCh
}

還有Watcher接口的另一個方法Close:

func (w *watcher) Close() (err error) {
    // 在鎖內(nèi)先將streams字段置為空
    // 在鎖外再將一個個流都關(guān)閉
    // 這樣做的意義在于不管哪個流關(guān)閉失敗了
    // 都能先保證streams與這些流的關(guān)系被切斷
    w.mu.Lock()
    streams := w.streams
    w.streams = nil
    w.mu.Unlock()
    for _, wgs := range streams {
        if werr := wgs.Close(); werr != nil {
            err = werr
        }
    }
    // etcd竟然也只是返回一個error
    // 雖然上面的for循環(huán)可能產(chǎn)生多個error
    return err
}

這樣watcher就實現(xiàn)了Watcher接口.大致的實現(xiàn)思路本文就介紹到這里,剩下的代碼也都是對其他相關(guān)數(shù)據(jù)結(jié)構(gòu)的邏輯包裝操作.

簡單閱讀Etcd的這一小部分源碼下來, 我看到他們源碼中的兩個東西,算是Golang或者編程上面的一些最佳實踐:

  1. 對包外只暴露一個公共接口, 包內(nèi)的結(jié)構(gòu)體實現(xiàn)該接口即可.就像本文中的Watcher接口和watcher結(jié)構(gòu)體.這樣有兩個好處, 一個就是代碼能夠解耦,還有就是可以省去命名的苦惱(__)

  2. 另一個是注釋的書寫方式,我發(fā)現(xiàn)etcd源碼里的注釋很大一部分寫在變量的定義上面,而且變量的定義名都很清晰.

  3. 抽象得體.這個其實不只是Etcd, 其他任何優(yōu)秀的開源作品都把他們的代碼抽象得很到位.突然想起我寫的那些渣渣代碼%>_<%

最后, 總結(jié)下etcd的watch機制.其實歸根結(jié)底, 它的watch是通過gRPC的多路復(fù)用實現(xiàn)的,這是一個基于HTTP/2的特性.所以本文可能有些偏離了主題,探討Etcd的watch機制, 其實應(yīng)該研究HTTP/2才是.

算是給自己挖個坑.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末雷激,一起剝皮案震驚了整個濱河市屎暇,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌凶异,老刑警劉巖挤巡,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玄柏,死亡現(xiàn)場離奇詭異,居然都是意外死亡瀑晒,警方通過查閱死者的電腦和手機徘意,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門椎咧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來勤讽,“玉大人,你說我怎么就攤上這事向臀≈钕粒” “怎么了君纫?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵蓄髓,是天一觀的道長舒帮。 經(jīng)常有香客問我,道長好乐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任临庇,我火速辦了婚禮昵慌,結(jié)果婚禮上斋攀,老公的妹妹穿的比我還像新娘。我一直安慰自己侧蘸,他們只是感情好鹉梨,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布存皂。 她就那樣靜靜地躺著晌坤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪旦袋。 梳的紋絲不亂的頭發(fā)上骤菠,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音疤孕,去河邊找鬼娩怎。 笑死,一個胖子當著我的面吹牛胰柑,可吹牛的內(nèi)容都是我干的截亦。 我是一名探鬼主播爬泥,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼崩瓤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起却桶,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤境输,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后颖系,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嗅剖,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年嘁扼,在試婚紗的時候發(fā)現(xiàn)自己被綠了信粮。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡趁啸,死狀恐怖强缘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情不傅,我是刑警寧澤旅掂,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站访娶,受9級特大地震影響商虐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜崖疤,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一秘车、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧戳晌,春花似錦鲫尊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至豪嚎,卻和暖如春搔驼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背侈询。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工舌涨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扔字。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓囊嘉,卻偏偏與公主長得像温技,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子扭粱,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容