[istio源碼分析][galley] galley之runtime

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)

2. runtime

本文將著重分析galley中一個承上啟下的component.

[root@master pkg]# pwd
/root/go/src/istio.io/istio/galley/pkg
[root@master pkg]# tree -L 1
.
├── authplugin
├── authplugins
├── config
├── crd
├── meshconfig
├── metadata
├── runtime
├── server
├── source
├── testing
└── util

11 directories, 0 files
[root@master pkg]# 

source會產(chǎn)生事件的源頭, runtime負(fù)責(zé)接收source中的事件并交給下游處理. 本文的重點將放到runtime中.

3. source

// Source to be implemented by a source configuration provider.
type Source interface {
    // 開始方法 對k8s而言 就是開始監(jiān)控一些crd資源交由handler處理
    Start(handler resource.EventHandler) error
    // 停止監(jiān)控
    Stop()
}

4. processing

4.1 Dispatcher 和 handler

type Handler interface {
    Handle(e resource.Event)
}

type Dispatcher struct {
    handlers map[resource.Collection][]Handler
}
// Dispatcher是一個Handler的實現(xiàn)類
// 并且針對每一種collection 都有其對應(yīng)的一系列handler
func (d *Dispatcher) Handle(e resource.Event) {
    handlers, found := d.handlers[e.Entry.ID.Collection]
    if !found {
        scope.Warnf("Unhandled resource event: %v", e)
        return
    }

    for _, h := range handlers {
        h.Handle(e)
    }
}

可以看到Dispatcher是以collection為類, 每個collection都有其對應(yīng)的Handler數(shù)組. (collection就是一些crd的名字, 比如istio/networking/v1alpha3/virtualservices)

Dispatcher實現(xiàn)了Handler接口, 針對每一個event會找到其collection所有的handler一個個進(jìn)行處理.

4.2 Listener

// Listener gets notified when resource of a given collection has changed.
type Listener interface {
    CollectionChanged(c resource.Collection)
}

當(dāng)某一個collection發(fā)生變化時會觸發(fā)該方法

5. state

// 記錄著內(nèi)存狀態(tài)的galley
type State struct {
    listener processing.Listener
    config *Config
    // version counter is a nonce that generates unique ids for each updated view of State.
    versionCounter int64
    // entries for per-message-type State.
    entriesLock sync.Mutex
    entries     map[resource.Collection]*resourceTypeState

    // Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
    ingressGWVersion   int64
    ingressVSVersion   int64
    lastIngressVersion int64
    // 等待被發(fā)布的事件個數(shù)
    pendingEvents int64
    // 上一次發(fā)布的時間
    lastSnapshotTime time.Time
}
type resourceTypeState struct {
    // 當(dāng)前狀態(tài)的version
    version  int64
    entries  map[resource.FullName]*mcp.Resource
    versions map[resource.FullName]resource.Version
}

5.1 Handler方法

func (s *State) Handle(event resource.Event) {
    pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
    if !found {
        return
    }
    switch event.Kind {
    case resource.Added, resource.Updated:
        // Check to see if the version has changed.
        if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
            log.Scope.Debugf("Received event for the current, known version: %v", event)
            return
        }
        // 將事件的entry轉(zhuǎn)成mcp.Resource類型
        entry, ok := s.toResource(event.Entry)
        if !ok {
            return
        }
        // 保存當(dāng)前對象內(nèi)存中的值以及版本
        pks.entries[event.Entry.ID.FullName] = entry
        pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, true)

    case resource.Deleted:
        // 刪除當(dāng)前對象內(nèi)存中的值以及版本
        delete(pks.entries, event.Entry.ID.FullName)
        delete(pks.versions, event.Entry.ID.FullName)
        monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
        monitorEntry(event.Entry.ID, false)

    default:
        log.Scope.Errorf("Unknown event kind: %v", event.Kind)
        return
    }
    // 更新version
    s.versionCounter++
    pks.version = s.versionCounter

    log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
    s.pendingEvents++
    // 通知listener對該collection以已經(jīng)發(fā)生變化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
}

func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()
    // 根據(jù)collection找到當(dāng)前內(nèi)存中存在的對象 
    // 比如collection是virtualservice 那就是得到內(nèi)存中所有virtualservice的對象
    pks, found := s.entries[name]
    return pks, found
}

Handler的主要工作是將當(dāng)前事件的類型轉(zhuǎn)化成mcp.Resource類型并將其保存到內(nèi)存中. 那保留在內(nèi)存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)中會進(jìn)行處理, 在下面processor中會明白.

6. processor

func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
    stateStrategy := publish.NewStrategyWithDefaults()
    return newProcessor(src, cfg, stateStrategy, distributor, nil)
}
func newProcessor(
    src Source,
    cfg *Config,
    stateStrategy *publish.Strategy,
    distributor Distributor,
    postProcessHook postProcessHookFn) *Processor {
    now := time.Now()
    p := &Processor{
        stateStrategy:   stateStrategy,
        distributor:     distributor,
        source:          src,
        eventCh:         make(chan resource.Event, 1024),
        postProcessHook: postProcessHook,
        worker:          util.NewWorker("runtime processor", scope),
        lastEventTime:   now,
        fullSyncCond:    sync.NewCond(&sync.Mutex{}),
    }
    stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
    p.state = newState(cfg, stateListener)
    // 這個暫時可以先不用看 以后分析serviceentry的時候需要
    p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
        scope.Debug("Processor.process: publish serviceEntry")
        s := p.serviceEntryHandler.BuildSnapshot()
        p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
    }))
    p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
    p.seedMesh()
    return p
}

1. 初始化p.state, 并且傳入了listener.
2. 初始化p.handler, 傳入p.state, p.serviceEntryHandler.

func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
    b := processing.NewDispatcherBuilder()
    // 所有注冊的crds
    stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
    for _, spec := range stateSchema.All() {
        b.Add(spec.Collection, state)
    }
    if state.config.SynthesizeServiceEntries {
        for _, spec := range serviceentry.Schema.All() {
            b.Add(spec.Collection, serviceEntryHandler)
        }
    }
    return b.Build()
}

可以看到每個collection都有一個基本的handler, 就是傳進(jìn)來的p.state.

6.1 Start方法

func (p *Processor) Start() error {
    // 啟動方法
    setupFn := func() error {
        err := p.source.Start(func(e resource.Event) {
            // 將事件e傳給管道p.eventCh
            p.eventCh <- e
        })
        if err != nil {
            return fmt.Errorf("runtime unable to Start source: %v", err)
        }
        return nil
    }
    // 運行方法
    runFn := func(ctx context.Context) {
        scope.Info("Starting processor...")
        defer func() {
            scope.Debugf("Process.process: Exiting worker thread")
            p.source.Stop()
            close(p.eventCh)
            p.stateStrategy.Close()
        }()

        scope.Debug("Starting process loop")

        for {
            select {
            case <-ctx.Done():
                // Graceful termination.
                scope.Debug("Processor.process: done")
                return
            case e := <-p.eventCh:
                // 從管道p.eventCh中取出要處理的事件
                p.processEvent(e)
            case <-p.stateStrategy.Publish:
                scope.Debug("Processor.process: publish")
                // 將當(dāng)前state對象內(nèi)存中保存的對象建立一個快照
                s := p.state.buildSnapshot()
                // 該快照將交由distributor處理
                p.distributor.SetSnapshot(groups.Default, s)
            }

            if p.postProcessHook != nil {
                p.postProcessHook()
            }
        }
    }
    // 通過工具類來運行這兩個方法
    return p.worker.Start(setupFn, runFn)
}

再看processEvent方法

func (p *Processor) processEvent(e resource.Event) {
    if scope.DebugEnabled() {
        scope.Debugf("Incoming source event: %v", e)
    }
    p.recordEvent()

    if e.Kind == resource.FullSync {
        scope.Infof("Synchronization is complete, starting distribution.")

        p.fullSyncCond.L.Lock()
        // 把distribute設(shè)置為true
        p.distribute = true
        p.fullSyncCond.Broadcast()
        p.fullSyncCond.L.Unlock()
        // 這個將會觸發(fā)runFn中的<-p.stateStrategy.Publish
        p.stateStrategy.OnChange()
        return
    }
    // 將該event交由dispatcher處理
    // 現(xiàn)在可以理解為就是p.state來處理, 原因p.handler就是一個dispatcher
    // dispatcher里面每一個collection都注冊了一個p.state這樣的handler
    p.handler.Handle(e)
}

1. 如果是FullSync, 也就是第一次做同步, 有兩個動作:

1.1 將p.distribute設(shè)置為true. 現(xiàn)在回頭來看一下newProcessor方法.

// processor.go
...
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
        // When the state indicates a change occurred, update the publishing strategy
        if p.distribute {
            stateStrategy.OnChange()
        }
    })
...
// state.go中的Handler方法
func (s *State) Handle(event resource.Event) {
...
    // 通知listener對該collection以已經(jīng)發(fā)生變化
    s.listener.CollectionChanged(event.Entry.ID.Collection)
...
}

所以當(dāng)p.distribute=true時將調(diào)用stateStrategy.OnChange()這個時候就會觸發(fā)到Processor的Start()方法中的<-p.stateStrategy.Publish:進(jìn)而調(diào)用p.state.buildSnapshot()生成當(dāng)前內(nèi)存快照交由p.distributor處理. 這部分在分析mcp的時候會涉及到.

圖片.png

1.2 通過p.stateStrategy.OnChange()觸發(fā)<-p.stateStrategy.Publish.

  1. 調(diào)用p.handler.Handle(e)方法, 目前可以理解為調(diào)用p.state.Handle(e), 因為p.handlerp.dispatcher并且為每個collection注冊了p.statehandler方法.

6.2 buildSnapshot

按照state.entries中的內(nèi)容創(chuàng)建一個內(nèi)存快照.

// 返回snapshot.Snapshot
func (s *State) buildSnapshot() snapshot.Snapshot {
    s.entriesLock.Lock()
    defer s.entriesLock.Unlock()

    now := time.Now()
    monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
    s.lastSnapshotTime = now
    // 創(chuàng)建快照
    b := snapshot.NewInMemoryBuilder()

    for collection, state := range s.entries {
        entries := make([]*mcp.Resource, 0, len(state.entries))
        for _, entry := range state.entries {
            entries = append(entries, entry)
        }
        version := fmt.Sprintf("%d", state.version)
        b.Set(collection.String(), version, entries)
    }

    // Build entities that are derived from existing ones.
    s.buildProjections(b)
    // 將pendingEvents清空
    sn := b.Build()
    s.pendingEvents = 0
    return sn
}

func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
    s.buildIngressProjectionResources(b)
}

func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
    ingressByHost := make(map[string]resource.Entry)
    // Build ingress projections
    state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
    if state == nil || len(state.entries) == 0 {
        return
    }
    ...
}

7. 總結(jié)

上流: sourcek8s或者fs中讀取信息并整理成event.
處理: 將source中的事件event放入p.eventch, 并且processEventp.eventCh中讀取, 將信息保存在內(nèi)存中, 然后生成快照.
下流: 將生成的快照交由p.distributor處理.

圖片.png

8. 參考

1. istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蛙酪,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子睦袖,更是在濱河造成了極大的恐慌,老刑警劉巖兵罢,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件孤里,死亡現(xiàn)場離奇詭異妙蔗,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)宿刮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來私蕾,“玉大人僵缺,你說我怎么就攤上這事〔劝龋” “怎么了磕潮?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長懊纳。 經(jīng)常有香客問我揉抵,道長,這世上最難降的妖魔是什么嗤疯? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任冤今,我火速辦了婚禮,結(jié)果婚禮上茂缚,老公的妹妹穿的比我還像新娘戏罢。我一直安慰自己,他們只是感情好脚囊,可當(dāng)我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布龟糕。 她就那樣靜靜地躺著,像睡著了一般悔耘。 火紅的嫁衣襯著肌膚如雪讲岁。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天衬以,我揣著相機(jī)與錄音缓艳,去河邊找鬼。 笑死看峻,一個胖子當(dāng)著我的面吹牛阶淘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播互妓,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼溪窒,長吁一口氣:“原來是場噩夢啊……” “哼坤塞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起澈蚌,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤摹芙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后宛瞄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瘫辩,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年坛悉,在試婚紗的時候發(fā)現(xiàn)自己被綠了伐厌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡裸影,死狀恐怖挣轨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情轩猩,我是刑警寧澤卷扮,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站均践,受9級特大地震影響晤锹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜彤委,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一鞭铆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧焦影,春花似錦车遂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至彬呻,卻和暖如春衣陶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背闸氮。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工剪况, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人湖苞。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓拯欧,卻偏偏與公主長得像详囤,于是被迫代替她去往敵國和親财骨。 傳聞我的和親對象是個殘疾皇子镐作,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容