[istio源碼分析][galley] galley之上游(source)

1. 前言

轉(zhuǎn)載請(qǐng)說(shuō)明原文出處, 尊重他人勞動(dòng)成果!

源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)

[istio源碼分析][galley] galley之runtime 中分析了galley整個(gè)機(jī)制中一個(gè)承上啟下的組件, 在 [istio源碼分析][galley] galley之下游(mcp) 中分析了galley中負(fù)責(zé)下游處理的mcp, 本文將分析galley的上游, 也就是信息來(lái)源source.

2. source

cd $GOPATH/src/istio.io/istio/galley/pkg/source
> tree -L 1
├── fs
├── kube

可以看到當(dāng)前source支持兩個(gè)來(lái)源fs(文件) 和 kube(k8s集群).

3. fs

galley的來(lái)源是文件, 也就是增/刪/改文件來(lái)進(jìn)行對(duì)象的添加刪除修改操作. 另外galley可以感知道其操作.

3.1 例子

如果沒(méi)有k8s集群, 則可以使用文件來(lái)進(jìn)行測(cè)試, 這里寫(xiě)了個(gè)簡(jiǎn)單的例子https://github.com/nicktming/istio/tree/tming-1.3.6/galley/pkg/source/test來(lái)幫助理解.

// main.go
...
func main()  {
    dir := "./fs"
    shutdown := make(chan os.Signal, 1)
    // 監(jiān)控文件變化
    appsignals.FileTrigger(dir, syscall.SIGUSR1, shutdown)
    // 創(chuàng)建source
    s := newOrFail(dir)
    // 啟動(dòng)source
    ch := startOrFail(s)
    // 下游接收到的事件
    receive(ch)
}

例子中的文件夾fs里面保存了一些yaml文件, 文件里面有一些對(duì)象包括VirtualServiceService等. 運(yùn)行:

receive event:[Event](Added: [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0)), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0) hosts:"some.example.com" gateways:"some-ingress" http:<route:<destination:<host:"some.example.internal" > > > }
receive event:[Event](Added: [VKey](k8s/core/v1/services:kube-system/kube-dns @v0)), entry:{{2018-02-12 23:48:44 +0800 CST map[lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v0) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}
receive event:[Event](FullSync), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](: @) <nil>}

可以看到有兩個(gè)Added事件和一個(gè)FullSync事件. 接著手動(dòng)修改文件中的某個(gè)值, 比如在名字為kube-dnsService中添加了一個(gè)labelenv: test,此時(shí)再次查看日志:

receive event:[Event](Updated: [VKey](k8s/core/v1/services:kube-system/kube-dns @v1)), entry:{{2018-02-12 23:48:44 +0800 CST map[env:test lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v1) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}

添加了一個(gè)關(guān)于kube-system/kube-dns這個(gè)Service的更新事件(Updated).

3.2 分析

初始化一個(gè)文件類型的source.

func New(root string, schema *schema.Instance, config *converter.Config) (runtime.Source, error) {
    fs := &source{
        config:  config,
        root:    root,
        kinds:   map[string]bool{},
        shas:    map[fileResourceKey][sha1.Size]byte{},
        worker:  util.NewWorker("fs source", log.Scope),
        version: 0,
    }
    // 支持的schema 比如VirtualService, Service, Pod等
    for _, spec := range schema.All() {
        fs.kinds[spec.Kind] = true
    }
    return fs, nil
}

1. root為根文件夾path.
2. schema為該source支持的類型, 比如VirtualService, ServicePod等.
3. config在將yaml文件轉(zhuǎn)成對(duì)象時(shí)會(huì)用到.
4. shas在內(nèi)存中保存著所有yaml文件的內(nèi)容.

3.2.2 Start
func (s *source) Start(handler resource.EventHandler) error {
    return s.worker.Start(nil, func(ctx context.Context) {
        // 初始化s.handler處理event
        s.handler = handler
        // 初始加載所有文件
        s.initialCheck()
        // 注冊(cè)一個(gè)signal 可以通過(guò)FileTrigger來(lái)監(jiān)控文件 這樣文件變化就發(fā)送signal到此channel c
        c := make(chan appsignals.Signal, 1)
        appsignals.Watch(c)

        for {
            select {
            case <-ctx.Done():
                return
            case trigger := <-c:
                if trigger.Signal == syscall.SIGUSR1 {
                    log.Scope.Infof("Triggering reload in response to: %v", trigger.Source)
                    s.reload()
                }
            }
        }
    })
}

1. 初始化s.handler
2. 初始加載所有文件, 并生成事件發(fā)送出去.
3. 注冊(cè)一個(gè)signal可以通過(guò)FileTrigger來(lái)監(jiān)控文件 這樣文件變化就發(fā)送signal到此c(channel)
4. 如果c(channel)中接收一個(gè)syscall.SIGUSR1信號(hào), 就是表明監(jiān)控的文件夾中的文件有變化, 所以調(diào)用s.reload()發(fā)送新事件.

3.2.2 initialCheck

func (s *source) initialCheck() {
    // 得到該文件夾下所有文件轉(zhuǎn)化成的對(duì)象 以map形式存儲(chǔ)
    newData := s.readFiles(s.root)
    s.mu.Lock()
    defer s.mu.Unlock()
    for k, r := range newData {
        s.process(resource.Added, k, r)
        s.shas[k] = r.sha
    }
    s.handler(resource.FullSyncEvent)
}

readFiles方法可以將某個(gè)目錄下面的所有yaml文件轉(zhuǎn)化成當(dāng)前source支持的schema, 以map形式保存在newData中, 關(guān)于readFiles的實(shí)現(xiàn)這里就不分析了.

3.2.3 process

func (s *source) process(eventKind resource.EventKind, key fileResourceKey, r *fileResource) {
    version := resource.Version(fmt.Sprintf("v%d", s.version))

    var event resource.Event
    switch eventKind {
    case resource.Added, resource.Updated:
        event = resource.Event{
            Kind: eventKind,
            Entry: resource.Entry{
                ID: resource.VersionedKey{
                    Key: resource.Key{
                        Collection: r.spec.Target.Collection,
                        FullName:   key.fullName,
                    },
                    // 當(dāng)前版本
                    Version: version,
                },
                Item:     r.entry.Resource,
                Metadata: r.entry.Metadata,
            },
        }
    case resource.Deleted:
        spec := kubeMeta.Types.Get(key.kind)
        event = resource.Event{
            Kind: eventKind,
            Entry: resource.Entry{
                ID: resource.VersionedKey{
                    Key: resource.Key{
                        Collection: spec.Target.Collection,
                        FullName:   key.fullName,
                    },
                    Version: version,
                },
            },
        }
    }

    log.Scope.Debugf("Dispatching source event: %v", event)
    s.handler(event)
}

這里的處理方式也很簡(jiǎn)單, 組裝成resource.Event交由s.handler發(fā)到runtime中.
這里需要注意一下Version, 這個(gè)版本號(hào)在什么時(shí)候會(huì)變化? 再次回到Start方法, 當(dāng)文件發(fā)生變化時(shí)會(huì)觸發(fā)reload方法, 接下來(lái)看一下reload方法.

3.2.4 reload

func (s *source) reload() {
    // 再次讀取所有文件
    newData := s.readFiles(s.root)
    s.mu.Lock()
    defer s.mu.Unlock()
    newShas := map[fileResourceKey][sha1.Size]byte{}
    // Compute the deltas using sha comparisons
    nextVersion := s.version + 1
    // sha 為上一個(gè)版本的數(shù)據(jù)內(nèi)容
    // newData為當(dāng)前版本的數(shù)據(jù)內(nèi)容
    // 用sha和newData對(duì)比就可以得到所有事件內(nèi)容
    // 最后更新sha為當(dāng)前版本的數(shù)據(jù)內(nèi)容
    for k, r := range newData {
        newShas[k] = r.sha
        sha, exists := s.shas[k]
        if exists && sha != r.sha {
            if s.version != nextVersion {
                s.version = nextVersion
            }
            s.process(resource.Updated, k, r)
        } else if !exists {
            if s.version != nextVersion {
                s.version = nextVersion
            }
            s.process(resource.Added, k, r)
        }
    }
    for k := range s.shas {
        if _, exists := newShas[k]; !exists {
            s.process(resource.Deleted, k, nil)
        }
    }
    s.shas = newShas
}

主要內(nèi)容如下:
1. sha 為上一個(gè)版本的數(shù)據(jù)內(nèi)容
2. newData為當(dāng)前版本的數(shù)據(jù)內(nèi)容
3.shanewData對(duì)比就可以得到所有事件內(nèi)容并通過(guò)process方法發(fā)送給runtime.
4. 最后更新sha為當(dāng)前版本的數(shù)據(jù)內(nèi)容.

3.2.5 總結(jié)

source.png

1. 初始的時(shí)候通過(guò)initalCheck -> readFile -> process -> handler -> runtime.
2. 初始完的時(shí)候會(huì)發(fā)送一個(gè)FullSync事件表明第一次初始化結(jié)束.
3. 通過(guò)FileTrigger監(jiān)控文件變化, 如有變化通過(guò)reload方法重新加載文件并更新內(nèi)容對(duì)象, 并且根據(jù)當(dāng)前內(nèi)容和上一個(gè)版本內(nèi)容對(duì)比發(fā)送對(duì)應(yīng)事件給runtime.

3. k8s

有了針對(duì)文件作為source的理解, 對(duì)于k8s的理解就會(huì)更簡(jiǎn)單了, 從原理上講, fs監(jiān)控文件的變化, k8s使用informer機(jī)制監(jiān)控k8s中原生資源和crd資源的變化即可.

3.1 source

// galley/pkg/source/kube/source.go
func New(interfaces client.Interfaces, resyncPeriod time.Duration, schema *schema.Instance,
    cfg *kubeConverter.Config) (runtime.Source, error) {

    var err error
    var cl kubernetes.Interface
    var dynClient kubeDynamic.Interface
    var sharedInformers informers.SharedInformerFactory

    log.Scope.Info("creating sources for kubernetes resources")
    sources := make([]runtime.Source, 0)
    for i, spec := range schema.All() {
        log.Scope.Infof("[%d]", i)
        log.Scope.Infof("  Source:      %s", spec.CanonicalResourceName())
        log.Scope.Infof("  Collection:  %s", spec.Target.Collection)

        // If it's a known type, use a custom (optimized) source.
        if builtin.IsBuiltIn(spec.Kind) {
            // Lazy create the kube client.
            if cl, err = getKubeClient(cl, interfaces); err != nil {
                return nil, err
            }
            sharedInformers = getSharedInformers(sharedInformers, cl, resyncPeriod)
            // 創(chuàng)建k8s原生資源的source 比如pod, service
            source, err := builtin.New(sharedInformers, spec)
            if err != nil {
                return nil, err
            }
            sources = append(sources, source)
        } else {
            // Lazy-create the dynamic client
            if dynClient, err = getDynamicClient(dynClient, interfaces); err != nil {
                return nil, err
            }
            // Unknown types use the dynamic source.
            // 創(chuàng)建crd資源的source 比如virtualService, gateway等等
            source, err := dynamic.New(dynClient, resyncPeriod, spec, cfg)
            if err != nil {
                return nil, err
            }
            sources = append(sources, source)
        }
    }
    return &aggregate{
        sources: sources,
    }, nil
}

可以看到k8ssource使用了aggregate結(jié)構(gòu)體來(lái)收集到所有的source. 主要來(lái)自兩大類:
1. k8s原生資源比如pod, Service, 使用原生client-go api即可. 每個(gè)資源都是一個(gè)source.
2. crd資源比如VirtualService, gateway, 使用dynamic client. 每個(gè)資源都是一個(gè)source.

3.2 Start

func (s *aggregate) Start(handler resource.EventHandler) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    syncGroup := sync.WaitGroup{}
    syncGroup.Add(len(s.sources))
    syncHandler := func(e resource.Event) {
        if e.Kind == resource.FullSync {
            // 如果某一個(gè)資源同步完了 就減少一個(gè)
            syncGroup.Done()
        } else {
            // Not a sync event, just pass on to the real handler.
            // 不是同步操作 調(diào)用傳入的handler
            handler(e)
        }
    }
    for _, source := range s.sources {
        // 為每個(gè)資源調(diào)用各自的Start方法
        if err := source.Start(syncHandler); err != nil {
            return err
        }
    }
    go func() {
        // 等待所有資源同步完
        syncGroup.Wait()
        // 發(fā)送一個(gè)FullSync給runtime 表明所有資源以及同步完成
        handler(resource.FullSyncEvent)
    }()
    return nil
}

1. 可以看到Start方法是將所有的source全部啟動(dòng), 也就是list and watch所有的資源.
2. 等到所有的資源都已經(jīng)同步完了, 也就是通過(guò)handler(e)發(fā)送給runtime了, 該Start才會(huì)發(fā)送一個(gè)FullSync事件給runtime.
3. 關(guān)于某個(gè)資源的Start就不多說(shuō)了, 了解informer機(jī)制原理就明白了.

4. server

這里結(jié)合一個(gè)galleyserver端啟動(dòng)程序看看是如何調(diào)用的.

// galley/pkg/server/components/processing.go
func (p *Processing) Start() (err error) {
      var mesh meshconfig.Cache
    var src runtime.Source

    if mesh, err = newMeshConfigCache(p.args.MeshConfigFile); err != nil {
        return
    }
    if src, err = p.createSource(mesh); err != nil {
        return
    }
    ...
    p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
    ...
}
func (p *Processing) createSource(mesh meshconfig.Cache) (src runtime.Source, err error) {
    ...
    sourceSchema := p.getSourceSchema()
    if p.args.ConfigPath != "" {
        if src, err = fsNew(p.args.ConfigPath, sourceSchema, converterCfg); err != nil {
            return
        }
    } else {
        var k client.Interfaces
        if k, err = newKubeFromConfigFile(p.args.KubeConfig); err != nil {
            return
        }
        var found []schema.ResourceSpec
        ...
        sourceSchema = schema.New(found...)
        if src, err = newSource(k, p.args.ResyncPeriod, sourceSchema, converterCfg); err != nil {
            return
        }
    }
    return
}

可以看到創(chuàng)建的source將為參數(shù)傳入到NewProcessor中, 這個(gè)在 [istio源碼分析][galley] galley之runtime 中已經(jīng)分析過(guò)了, 所以現(xiàn)在已經(jīng)和runtime對(duì)接上了.

full_source.png

5.參考

1. istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市翁潘,隨后出現(xiàn)的幾起案子蒿柳,更是在濱河造成了極大的恐慌酵紫,老刑警劉巖袜茧,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秘蛇,死亡現(xiàn)場(chǎng)離奇詭異耳璧,居然都是意外死亡盗忱,警方通過(guò)查閱死者的電腦和手機(jī)治筒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)屉栓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人耸袜,你說(shuō)我怎么就攤上這事友多。” “怎么了堤框?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵域滥,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蜈抓,道長(zhǎng)启绰,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任沟使,我火速辦了婚禮委可,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘腊嗡。我一直安慰自己着倾,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布燕少。 她就那樣靜靜地躺著卡者,像睡著了一般。 火紅的嫁衣襯著肌膚如雪客们。 梳的紋絲不亂的頭發(fā)上虎眨,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天蟋软,我揣著相機(jī)與錄音,去河邊找鬼嗽桩。 笑死岳守,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的碌冶。 我是一名探鬼主播湿痢,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扑庞!你這毒婦竟也來(lái)了譬重?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤罐氨,失蹤者是張志新(化名)和其女友劉穎臀规,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體栅隐,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡塔嬉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了租悄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谨究。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖泣棋,靈堂內(nèi)的尸體忽然破棺而出胶哲,到底是詐尸還是另有隱情,我是刑警寧澤潭辈,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布鸯屿,位于F島的核電站,受9級(jí)特大地震影響把敢,放射性物質(zhì)發(fā)生泄漏寄摆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一技竟、第九天 我趴在偏房一處隱蔽的房頂上張望冰肴。 院中可真熱鬧,春花似錦榔组、人聲如沸熙尉。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)检痰。三九已至,卻和暖如春锨推,著一層夾襖步出監(jiān)牢的瞬間铅歼,已是汗流浹背公壤。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留椎椰,地道東北人厦幅。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像慨飘,于是被迫代替她去往敵國(guó)和親确憨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容