[istio源碼分析][pilot] pilot之a(chǎn)ds

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

在前一篇文章 [istio源碼分析][pilot] pilot之DiscoveryServer 中已經(jīng)分析了DiscoveryServer拿到galleyk8s的數(shù)據(jù)后向每一個連接的client端(其實是envoy) 發(fā)送了一個XdsEventclient.pushChannel.

本文將分析從clientdiscoveryServer端之間的聯(lián)系. 主要是分析整體的流程, 關于細節(jié)方面的東西可以在具體問題中進行調(diào)試即可.

2. adsc

adscistio模擬的一個client端, 真正的客戶端是sidecar中的envoy. 這里為了方便, 因此用adsc進行分析.

// pkg/adsc/adsc.go
func (a *ADSC) Run() error {
    var err error
    if len(a.certDir) > 0 {
        ...
        a.conn, err = grpc.Dial(a.url, opts...)
        ...
    } else {
        a.conn, err = grpc.Dial(a.url, grpc.WithInsecure())
        ...
    }
    // 建立連接
    xds := ads.NewAggregatedDiscoveryServiceClient(a.conn)
    edsstr, err := xds.StreamAggregatedResources(context.Background())
    if err != nil {
        return err
    }
    a.stream = edsstr
    // 從discovery server接收信息
    go a.handleRecv()
    return nil
}
func (a *ADSC) handleRecv() {
    for {
        // 從server端獲得信息
        msg, err := a.stream.Recv()
        ...
        listeners := []*xdsapi.Listener{}
        clusters := []*xdsapi.Cluster{}
        routes := []*xdsapi.RouteConfiguration{}
        eds := []*xdsapi.ClusterLoadAssignment{}
        // 為獲得的數(shù)據(jù)分類
        for _, rsc := range msg.Resources { // Any
            a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo
            valBytes := rsc.Value
            if rsc.TypeUrl == listenerType {
                ll := &xdsapi.Listener{}
                _ = proto.Unmarshal(valBytes, ll)
                listeners = append(listeners, ll)
            } ...
        }
        ...
        // 向server端發(fā)送ACK
        a.ack(msg)
        ...
        // 客戶端處理listener, cluster, endpoint, route
        ...
    }
}
func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
    _ = a.stream.Send(&xdsapi.DiscoveryRequest{
        ResponseNonce: msg.Nonce,
        TypeUrl:       msg.TypeUrl,
        Node:          a.node(),
        VersionInfo:   msg.VersionInfo,
    })
}

1.discoveryServer建立連接.
2. 通過handleRecv方法與server端交流.
3. 將獲得的數(shù)據(jù)分類(endpoint, listener, cluster, route)
4.server端發(fā)送ack.
5. 客戶端自己處理數(shù)據(jù).

envoy就是數(shù)據(jù)生成的配置文件了, xds協(xié)議.

3. server端

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    peerInfo, ok := peer.FromContext(stream.Context())
    peerAddr := "0.0.0.0"
    if ok {
        peerAddr = peerInfo.Addr.String()
    }
    t0 := time.Now()
    err := s.globalPushContext().InitContext(s.Env)
    ...
    // 創(chuàng)建一個XdsConnection
    con := newXdsConnection(peerAddr, stream)
    var receiveError error
    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    // 接收con對應的client的請求
    go receiveThread(con, reqChannel, &receiveError)

    node := &core.Node{}
    for {
        // Block until either a request is received or a push is triggered.
        select {
        case discReq, ok := <-reqChannel:
            if !ok {
                // Remote side closed connection.
                return receiveError
            }
            // This should be only set for the first request. Guard with ID check regardless.
            if discReq.Node != nil && discReq.Node.Id != "" {
                node = discReq.Node
                err = s.initConnectionNode(discReq.Node, con)
                if err != nil {
                    return err
                }
            }
            switch discReq.TypeUrl {
            case ClusterType:
                ...
            case ListenerType:
                ...
            case RouteType:
                ...
            case EndpointType:
                ...
            default:
                adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
            }

            con.mu.Lock()
            if !con.added {
                con.added = true
                con.mu.Unlock()
                // 添加到xdsclient中
                s.addCon(con.ConID, con)
                defer s.removeCon(con.ConID, con)
            } else {
                con.mu.Unlock()
            }
        case pushEv := <-con.pushChannel:
            err := s.pushConnection(con, pushEv)
            pushEv.done()
            if err != nil {
                return nil
            }
        }
    }
}

可以看到select中有兩個case.
1. discReq, ok := <-reqChannel:是從client來的, receiveThread方法會看到.
2. pushEv := <-con.pushChannel: 是從k8sgalley中來的, 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .

3.1 第一個分支

這里先看一下第一個分支的操作.

receiveThread

func receiveThread(con *XdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) {
    defer close(reqChannel) // indicates close of the remote side.
    for {
        // 從client端接收信息
        req, err := con.stream.Recv()
        ...
        select {
        // 將req轉(zhuǎn)到reqChannel中
        case reqChannel <- req:
        case <-con.stream.Context().Done():
            adsLog.Errorf("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
            return
        }
    }
}

1.client端接收的req直接放入到reqChannel中, 所以會進入到StreamAggregatedResources中的第一個分支.
2.ClusterType為例:

             case ClusterType:
                if con.CDSWatch {
                    // Already received a cluster watch request, this is an ACK
                    if discReq.ErrorDetail != nil {
                        adsLog.Warnf("ADS:CDS: ACK ERROR %v %s (%s) %v", peerAddr, con.ConID, con.modelNode.ID, discReq.String())
                        errCode := codes.Code(discReq.ErrorDetail.Code)
                        incrementXDSRejects(cdsReject, node.Id, errCode.String())
                    } else if discReq.ResponseNonce != "" {
                        con.ClusterNonceAcked = discReq.ResponseNonce
                    }
                    adsLog.Debugf("ADS:CDS: ACK %s %s (%s) %s %s", peerAddr, con.ConID, con.modelNode.ID, discReq.VersionInfo, discReq.ResponseNonce)
                    continue
                }
                // CDS REQ is the first request an envoy makes. This shows up
                // immediately after connect. It is followed by EDS REQ as
                // soon as the CDS push is returned.
                adsLog.Infof("ADS:CDS: REQ %v %s %v version:%s", peerAddr, con.ConID, time.Since(t0), discReq.VersionInfo)
                con.CDSWatch = true
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
                if err != nil {
                    return err
                }

1. 可以看到除了是第一次會調(diào)用pushCds方法, 后面都是打印ACK/NACK信息. 通過一個變量con.CDSWatch進行控制.
2. 另外通過s.addCon(con.ConID, con)將此連接加入到adsClients, 這個在startPush方法需要分發(fā)給所有的client端的時候用到的. 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .

pushCds
// pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
    pushStart := time.Now()
    // 發(fā)送給該envoy (con.modelNode)
    // 內(nèi)容在push中
    // 根據(jù)client信息和內(nèi)容生成要發(fā)送的clusters集合
    rawClusters := s.generateRawClusters(con.modelNode, push)
    ...
    // 構(gòu)造response并發(fā)送給該client(envoy)
    response := con.clusters(rawClusters)
    err := con.send(response)
    ...
    return nil
}
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) []*xdsapi.Cluster {
    rawClusters := s.ConfigGenerator.BuildClusters(s.Env, node, push)
    ...
    return rawClusters
}

1. client端(envoy)信息是con.modelNode, 內(nèi)容為push, 根據(jù)此兩個信息傳入到generateRawClusters中生成cluster集合. 實際上是通過s.ConfigGenerator.BuildClusters方法,

type ConfigGenerator interface {
    BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Listener
    BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Cluster
    BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeNames []string) []*v2.RouteConfiguration
}

這個部分的內(nèi)容就是根據(jù)當前的內(nèi)容生成envoy所接受的cluster, endpointroute.

2.client端(envoy)發(fā)送數(shù)據(jù).

3.2 第二個分支

這里分析pushEv := <-con.pushChannel:, 這個分支是從configControllerServiceController 過來的, 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .

func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
    ...
    if con.CDSWatch {
        err := s.pushCds(con, pushEv.push, currentVersion)
        ...
    }
    if len(con.Clusters) > 0 {
        err := s.pushEds(pushEv.push, con, currentVersion, nil)
        ...
    }
    if con.LDSWatch {
        err := s.pushLds(con, pushEv.push, currentVersion)
        ...
    }
    if len(con.Routes) > 0 {
        err := s.pushRoute(con, pushEv.push, currentVersion)
        ...
    }
    ...
    return nil
}

很多細節(jié)部分省略了很多, 可以看到最終是往此client端發(fā)送cluster, endpoint, routelistener.

4. 總結(jié)

ads.png

5. 參考

1. istio 1.3.6源碼

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末傻谁,一起剝皮案震驚了整個濱河市魁索,隨后出現(xiàn)的幾起案子管怠,更是在濱河造成了極大的恐慌寸士,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓶蝴,死亡現(xiàn)場離奇詭異脓规,居然都是意外死亡,警方通過查閱死者的電腦和手機翩伪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雀鹃,“玉大人幻工,你說我怎么就攤上這事±杈ィ” “怎么了囊颅?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長傅瞻。 經(jīng)常有香客問我踢代,道長,這世上最難降的妖魔是什么嗅骄? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任胳挎,我火速辦了婚禮,結(jié)果婚禮上溺森,老公的妹妹穿的比我還像新娘慕爬。我一直安慰自己,他們只是感情好屏积,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布医窿。 她就那樣靜靜地躺著,像睡著了一般炊林。 火紅的嫁衣襯著肌膚如雪姥卢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天渣聚,我揣著相機與錄音独榴,去河邊找鬼。 笑死奕枝,一個胖子當著我的面吹牛棺榔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播隘道,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼掷豺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了薄声?” 一聲冷哼從身側(cè)響起当船,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎默辨,沒想到半個月后德频,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡缩幸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年壹置,在試婚紗的時候發(fā)現(xiàn)自己被綠了竞思。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡钞护,死狀恐怖盖喷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情难咕,我是刑警寧澤课梳,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站余佃,受9級特大地震影響暮刃,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜爆土,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一椭懊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧步势,春花似錦氧猬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至桑腮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛉幸,已是汗流浹背破讨。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留奕纫,地道東北人提陶。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像匹层,于是被迫代替她去往敵國和親隙笆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容