1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)
2. runtime
本文將著重分析
galley
中一個承上啟下的component
.
[root@master pkg]# pwd
/root/go/src/istio.io/istio/galley/pkg
[root@master pkg]# tree -L 1
.
├── authplugin
├── authplugins
├── config
├── crd
├── meshconfig
├── metadata
├── runtime
├── server
├── source
├── testing
└── util
11 directories, 0 files
[root@master pkg]#
source
會產(chǎn)生事件的源頭,runtime
負(fù)責(zé)接收source
中的事件并交給下游處理. 本文的重點將放到runtime
中.
3. source
// Source to be implemented by a source configuration provider.
type Source interface {
// 開始方法 對k8s而言 就是開始監(jiān)控一些crd資源交由handler處理
Start(handler resource.EventHandler) error
// 停止監(jiān)控
Stop()
}
4. processing
4.1 Dispatcher 和 handler
type Handler interface {
Handle(e resource.Event)
}
type Dispatcher struct {
handlers map[resource.Collection][]Handler
}
// Dispatcher是一個Handler的實現(xiàn)類
// 并且針對每一種collection 都有其對應(yīng)的一系列handler
func (d *Dispatcher) Handle(e resource.Event) {
handlers, found := d.handlers[e.Entry.ID.Collection]
if !found {
scope.Warnf("Unhandled resource event: %v", e)
return
}
for _, h := range handlers {
h.Handle(e)
}
}
可以看到
Dispatcher
是以collection
為類, 每個collection
都有其對應(yīng)的Handler
數(shù)組. (collection
就是一些crd
的名字, 比如istio/networking/v1alpha3/virtualservices
)
Dispatcher
實現(xiàn)了Handler
接口, 針對每一個event
會找到其collection
所有的handler
一個個進(jìn)行處理.
4.2 Listener
// Listener gets notified when resource of a given collection has changed.
type Listener interface {
CollectionChanged(c resource.Collection)
}
當(dāng)某一個
collection
發(fā)生變化時會觸發(fā)該方法
5. state
// 記錄著內(nèi)存狀態(tài)的galley
type State struct {
listener processing.Listener
config *Config
// version counter is a nonce that generates unique ids for each updated view of State.
versionCounter int64
// entries for per-message-type State.
entriesLock sync.Mutex
entries map[resource.Collection]*resourceTypeState
// Virtual version numbers for Gateways & VirtualServices for Ingress projected ones
ingressGWVersion int64
ingressVSVersion int64
lastIngressVersion int64
// 等待被發(fā)布的事件個數(shù)
pendingEvents int64
// 上一次發(fā)布的時間
lastSnapshotTime time.Time
}
type resourceTypeState struct {
// 當(dāng)前狀態(tài)的version
version int64
entries map[resource.FullName]*mcp.Resource
versions map[resource.FullName]resource.Version
}
5.1 Handler方法
func (s *State) Handle(event resource.Event) {
pks, found := s.getResourceTypeState(event.Entry.ID.Collection)
if !found {
return
}
switch event.Kind {
case resource.Added, resource.Updated:
// Check to see if the version has changed.
if curVersion := pks.versions[event.Entry.ID.FullName]; curVersion == event.Entry.ID.Version {
log.Scope.Debugf("Received event for the current, known version: %v", event)
return
}
// 將事件的entry轉(zhuǎn)成mcp.Resource類型
entry, ok := s.toResource(event.Entry)
if !ok {
return
}
// 保存當(dāng)前對象內(nèi)存中的值以及版本
pks.entries[event.Entry.ID.FullName] = entry
pks.versions[event.Entry.ID.FullName] = event.Entry.ID.Version
monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
monitorEntry(event.Entry.ID, true)
case resource.Deleted:
// 刪除當(dāng)前對象內(nèi)存中的值以及版本
delete(pks.entries, event.Entry.ID.FullName)
delete(pks.versions, event.Entry.ID.FullName)
monitoring.RecordStateTypeCount(event.Entry.ID.Collection.String(), len(pks.entries))
monitorEntry(event.Entry.ID, false)
default:
log.Scope.Errorf("Unknown event kind: %v", event.Kind)
return
}
// 更新version
s.versionCounter++
pks.version = s.versionCounter
log.Scope.Debugf("In-memory State has changed:\n%v\n", s)
s.pendingEvents++
// 通知listener對該collection以已經(jīng)發(fā)生變化
s.listener.CollectionChanged(event.Entry.ID.Collection)
}
func (s *State) getResourceTypeState(name resource.Collection) (*resourceTypeState, bool) {
s.entriesLock.Lock()
defer s.entriesLock.Unlock()
// 根據(jù)collection找到當(dāng)前內(nèi)存中存在的對象
// 比如collection是virtualservice 那就是得到內(nèi)存中所有virtualservice的對象
pks, found := s.entries[name]
return pks, found
}
Handler
的主要工作是將當(dāng)前事件的類型轉(zhuǎn)化成mcp.Resource
類型并將其保存到內(nèi)存中. 那保留在內(nèi)存中干什么呢? 在s.listener.CollectionChanged(event.Entry.ID.Collection)
中會進(jìn)行處理, 在下面processor
中會明白.
6. processor
func NewProcessor(src Source, distributor Distributor, cfg *Config) *Processor {
stateStrategy := publish.NewStrategyWithDefaults()
return newProcessor(src, cfg, stateStrategy, distributor, nil)
}
func newProcessor(
src Source,
cfg *Config,
stateStrategy *publish.Strategy,
distributor Distributor,
postProcessHook postProcessHookFn) *Processor {
now := time.Now()
p := &Processor{
stateStrategy: stateStrategy,
distributor: distributor,
source: src,
eventCh: make(chan resource.Event, 1024),
postProcessHook: postProcessHook,
worker: util.NewWorker("runtime processor", scope),
lastEventTime: now,
fullSyncCond: sync.NewCond(&sync.Mutex{}),
}
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
if p.distribute {
stateStrategy.OnChange()
}
})
p.state = newState(cfg, stateListener)
// 這個暫時可以先不用看 以后分析serviceentry的時候需要
p.serviceEntryHandler = serviceentry.NewHandler(cfg.DomainSuffix, processing.ListenerFromFn(func(_ resource.Collection) {
scope.Debug("Processor.process: publish serviceEntry")
s := p.serviceEntryHandler.BuildSnapshot()
p.distributor.SetSnapshot(groups.SyntheticServiceEntry, s)
}))
p.handler = buildDispatcher(p.state, p.serviceEntryHandler)
p.seedMesh()
return p
}
1. 初始化
p.state
, 并且傳入了listener
.
2. 初始化p.handler
, 傳入p.state
,p.serviceEntryHandler
.
func buildDispatcher(state *State, serviceEntryHandler processing.Handler) *processing.Dispatcher {
b := processing.NewDispatcherBuilder()
// 所有注冊的crds
stateSchema := resource.NewSchemaBuilder().RegisterSchema(state.config.Schema).Build()
for _, spec := range stateSchema.All() {
b.Add(spec.Collection, state)
}
if state.config.SynthesizeServiceEntries {
for _, spec := range serviceentry.Schema.All() {
b.Add(spec.Collection, serviceEntryHandler)
}
}
return b.Build()
}
可以看到每個
collection
都有一個基本的handler
, 就是傳進(jìn)來的p.state
.
6.1 Start方法
func (p *Processor) Start() error {
// 啟動方法
setupFn := func() error {
err := p.source.Start(func(e resource.Event) {
// 將事件e傳給管道p.eventCh
p.eventCh <- e
})
if err != nil {
return fmt.Errorf("runtime unable to Start source: %v", err)
}
return nil
}
// 運行方法
runFn := func(ctx context.Context) {
scope.Info("Starting processor...")
defer func() {
scope.Debugf("Process.process: Exiting worker thread")
p.source.Stop()
close(p.eventCh)
p.stateStrategy.Close()
}()
scope.Debug("Starting process loop")
for {
select {
case <-ctx.Done():
// Graceful termination.
scope.Debug("Processor.process: done")
return
case e := <-p.eventCh:
// 從管道p.eventCh中取出要處理的事件
p.processEvent(e)
case <-p.stateStrategy.Publish:
scope.Debug("Processor.process: publish")
// 將當(dāng)前state對象內(nèi)存中保存的對象建立一個快照
s := p.state.buildSnapshot()
// 該快照將交由distributor處理
p.distributor.SetSnapshot(groups.Default, s)
}
if p.postProcessHook != nil {
p.postProcessHook()
}
}
}
// 通過工具類來運行這兩個方法
return p.worker.Start(setupFn, runFn)
}
再看
processEvent
方法
func (p *Processor) processEvent(e resource.Event) {
if scope.DebugEnabled() {
scope.Debugf("Incoming source event: %v", e)
}
p.recordEvent()
if e.Kind == resource.FullSync {
scope.Infof("Synchronization is complete, starting distribution.")
p.fullSyncCond.L.Lock()
// 把distribute設(shè)置為true
p.distribute = true
p.fullSyncCond.Broadcast()
p.fullSyncCond.L.Unlock()
// 這個將會觸發(fā)runFn中的<-p.stateStrategy.Publish
p.stateStrategy.OnChange()
return
}
// 將該event交由dispatcher處理
// 現(xiàn)在可以理解為就是p.state來處理, 原因p.handler就是一個dispatcher
// dispatcher里面每一個collection都注冊了一個p.state這樣的handler
p.handler.Handle(e)
}
1. 如果是
FullSync
, 也就是第一次做同步, 有兩個動作:1.1 將
p.distribute
設(shè)置為true
. 現(xiàn)在回頭來看一下newProcessor
方法.
// processor.go
...
stateListener := processing.ListenerFromFn(func(c resource.Collection) {
// When the state indicates a change occurred, update the publishing strategy
if p.distribute {
stateStrategy.OnChange()
}
})
...
// state.go中的Handler方法
func (s *State) Handle(event resource.Event) {
...
// 通知listener對該collection以已經(jīng)發(fā)生變化
s.listener.CollectionChanged(event.Entry.ID.Collection)
...
}
所以當(dāng)
p.distribute=true
時將調(diào)用stateStrategy.OnChange()
這個時候就會觸發(fā)到Processor的Start()
方法中的<-p.stateStrategy.Publish:
進(jìn)而調(diào)用p.state.buildSnapshot()
生成當(dāng)前內(nèi)存快照交由p.distributor
處理. 這部分在分析mcp
的時候會涉及到.
1.2 通過
p.stateStrategy.OnChange()
觸發(fā)<-p.stateStrategy.Publish
.
- 調(diào)用
p.handler.Handle(e)
方法, 目前可以理解為調(diào)用p.state.Handle(e)
, 因為p.handler
是p.dispatcher
并且為每個collection
注冊了p.state
該handler
方法.
6.2 buildSnapshot
按照
state.entries
中的內(nèi)容創(chuàng)建一個內(nèi)存快照.
// 返回snapshot.Snapshot
func (s *State) buildSnapshot() snapshot.Snapshot {
s.entriesLock.Lock()
defer s.entriesLock.Unlock()
now := time.Now()
monitoring.RecordProcessorSnapshotPublished(s.pendingEvents, now.Sub(s.lastSnapshotTime))
s.lastSnapshotTime = now
// 創(chuàng)建快照
b := snapshot.NewInMemoryBuilder()
for collection, state := range s.entries {
entries := make([]*mcp.Resource, 0, len(state.entries))
for _, entry := range state.entries {
entries = append(entries, entry)
}
version := fmt.Sprintf("%d", state.version)
b.Set(collection.String(), version, entries)
}
// Build entities that are derived from existing ones.
s.buildProjections(b)
// 將pendingEvents清空
sn := b.Build()
s.pendingEvents = 0
return sn
}
func (s *State) buildProjections(b *snapshot.InMemoryBuilder) {
s.buildIngressProjectionResources(b)
}
func (s *State) buildIngressProjectionResources(b *snapshot.InMemoryBuilder) {
ingressByHost := make(map[string]resource.Entry)
// Build ingress projections
state := s.entries[metadata.K8sExtensionsV1beta1Ingresses.Collection]
if state == nil || len(state.entries) == 0 {
return
}
...
}
7. 總結(jié)
上流:
source
從k8s
或者fs
中讀取信息并整理成event
.
處理: 將source
中的事件event
放入p.eventch
, 并且processEvent
從p.eventCh
中讀取, 將信息保存在內(nèi)存中, 然后生成快照.
下流: 將生成的快照交由p.distributor
處理.
8. 參考
1.
istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159