1. 前言
轉(zhuǎn)載請(qǐng)說(shuō)明原文出處, 尊重他人勞動(dòng)成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)
在 [istio源碼分析][galley] galley之runtime 中分析了
galley
整個(gè)機(jī)制中一個(gè)承上啟下的組件, 在 [istio源碼分析][galley] galley之下游(mcp) 中分析了galley
中負(fù)責(zé)下游處理的mcp
, 本文將分析galley
的上游, 也就是信息來(lái)源source
.
2. source
cd $GOPATH/src/istio.io/istio/galley/pkg/source
> tree -L 1
├── fs
├── kube
可以看到當(dāng)前
source
支持兩個(gè)來(lái)源fs
(文件) 和kube
(k8s
集群).
3. fs
galley
的來(lái)源是文件, 也就是增/刪/改文件來(lái)進(jìn)行對(duì)象的添加刪除修改操作. 另外galley
可以感知道其操作.
3.1 例子
如果沒(méi)有
k8s
集群, 則可以使用文件來(lái)進(jìn)行測(cè)試, 這里寫(xiě)了個(gè)簡(jiǎn)單的例子https://github.com/nicktming/istio/tree/tming-1.3.6/galley/pkg/source/test
來(lái)幫助理解.
// main.go
...
func main() {
dir := "./fs"
shutdown := make(chan os.Signal, 1)
// 監(jiān)控文件變化
appsignals.FileTrigger(dir, syscall.SIGUSR1, shutdown)
// 創(chuàng)建source
s := newOrFail(dir)
// 啟動(dòng)source
ch := startOrFail(s)
// 下游接收到的事件
receive(ch)
}
例子中的文件夾
fs
里面保存了一些yaml
文件, 文件里面有一些對(duì)象包括VirtualService
和Service
等. 運(yùn)行:
receive event:[Event](Added: [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0)), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](istio/networking/v1alpha3/virtualservices:route-for-myapp @v0) hosts:"some.example.com" gateways:"some-ingress" http:<route:<destination:<host:"some.example.internal" > > > }
receive event:[Event](Added: [VKey](k8s/core/v1/services:kube-system/kube-dns @v0)), entry:{{2018-02-12 23:48:44 +0800 CST map[lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v0) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}
receive event:[Event](FullSync), entry:{{0001-01-01 00:00:00 +0000 UTC map[] map[]} [VKey](: @) <nil>}
可以看到有兩個(gè)
Added
事件和一個(gè)FullSync
事件. 接著手動(dòng)修改文件中的某個(gè)值, 比如在名字為kube-dns
的Service
中添加了一個(gè)label
為env: test
,此時(shí)再次查看日志:
receive event:[Event](Updated: [VKey](k8s/core/v1/services:kube-system/kube-dns @v1)), entry:{{2018-02-12 23:48:44 +0800 CST map[env:test lk1:lv1] map[ak1:av1]} [VKey](k8s/core/v1/services:kube-system/kube-dns @v1) &ServiceSpec{Ports:[{dns-tcp TCP 53 {0 53 } 0}],Selector:map[string]string{},ClusterIP:10.43.240.10,Type:ClusterIP,ExternalIPs:[],SessionAffinity:,LoadBalancerIP:,LoadBalancerSourceRanges:[],ExternalName:,ExternalTrafficPolicy:,HealthCheckNodePort:0,PublishNotReadyAddresses:false,SessionAffinityConfig:nil,}}
添加了一個(gè)關(guān)于
kube-system/kube-dns
這個(gè)Service
的更新事件(Updated
).
3.2 分析
初始化一個(gè)文件類型的
source
.
func New(root string, schema *schema.Instance, config *converter.Config) (runtime.Source, error) {
fs := &source{
config: config,
root: root,
kinds: map[string]bool{},
shas: map[fileResourceKey][sha1.Size]byte{},
worker: util.NewWorker("fs source", log.Scope),
version: 0,
}
// 支持的schema 比如VirtualService, Service, Pod等
for _, spec := range schema.All() {
fs.kinds[spec.Kind] = true
}
return fs, nil
}
1.
root
為根文件夾path
.
2.schema
為該source
支持的類型, 比如VirtualService
,Service
和Pod
等.
3.config
在將yaml
文件轉(zhuǎn)成對(duì)象時(shí)會(huì)用到.
4.shas
在內(nèi)存中保存著所有yaml
文件的內(nèi)容.
3.2.2 Start
func (s *source) Start(handler resource.EventHandler) error {
return s.worker.Start(nil, func(ctx context.Context) {
// 初始化s.handler處理event
s.handler = handler
// 初始加載所有文件
s.initialCheck()
// 注冊(cè)一個(gè)signal 可以通過(guò)FileTrigger來(lái)監(jiān)控文件 這樣文件變化就發(fā)送signal到此channel c
c := make(chan appsignals.Signal, 1)
appsignals.Watch(c)
for {
select {
case <-ctx.Done():
return
case trigger := <-c:
if trigger.Signal == syscall.SIGUSR1 {
log.Scope.Infof("Triggering reload in response to: %v", trigger.Source)
s.reload()
}
}
}
})
}
1. 初始化
s.handler
2. 初始加載所有文件, 并生成事件發(fā)送出去.
3. 注冊(cè)一個(gè)signal
可以通過(guò)FileTrigger
來(lái)監(jiān)控文件 這樣文件變化就發(fā)送signal
到此c(channel)
4. 如果c(channel)
中接收一個(gè)syscall.SIGUSR1
信號(hào), 就是表明監(jiān)控的文件夾中的文件有變化, 所以調(diào)用s.reload()
發(fā)送新事件.
3.2.2 initialCheck
func (s *source) initialCheck() {
// 得到該文件夾下所有文件轉(zhuǎn)化成的對(duì)象 以map形式存儲(chǔ)
newData := s.readFiles(s.root)
s.mu.Lock()
defer s.mu.Unlock()
for k, r := range newData {
s.process(resource.Added, k, r)
s.shas[k] = r.sha
}
s.handler(resource.FullSyncEvent)
}
readFiles
方法可以將某個(gè)目錄下面的所有yaml
文件轉(zhuǎn)化成當(dāng)前source
支持的schema
, 以map
形式保存在newData
中, 關(guān)于readFiles
的實(shí)現(xiàn)這里就不分析了.
3.2.3 process
func (s *source) process(eventKind resource.EventKind, key fileResourceKey, r *fileResource) {
version := resource.Version(fmt.Sprintf("v%d", s.version))
var event resource.Event
switch eventKind {
case resource.Added, resource.Updated:
event = resource.Event{
Kind: eventKind,
Entry: resource.Entry{
ID: resource.VersionedKey{
Key: resource.Key{
Collection: r.spec.Target.Collection,
FullName: key.fullName,
},
// 當(dāng)前版本
Version: version,
},
Item: r.entry.Resource,
Metadata: r.entry.Metadata,
},
}
case resource.Deleted:
spec := kubeMeta.Types.Get(key.kind)
event = resource.Event{
Kind: eventKind,
Entry: resource.Entry{
ID: resource.VersionedKey{
Key: resource.Key{
Collection: spec.Target.Collection,
FullName: key.fullName,
},
Version: version,
},
},
}
}
log.Scope.Debugf("Dispatching source event: %v", event)
s.handler(event)
}
這里的處理方式也很簡(jiǎn)單, 組裝成
resource.Event
交由s.handler
發(fā)到runtime
中.
這里需要注意一下Version
, 這個(gè)版本號(hào)在什么時(shí)候會(huì)變化? 再次回到Start
方法, 當(dāng)文件發(fā)生變化時(shí)會(huì)觸發(fā)reload
方法, 接下來(lái)看一下reload
方法.
3.2.4 reload
func (s *source) reload() {
// 再次讀取所有文件
newData := s.readFiles(s.root)
s.mu.Lock()
defer s.mu.Unlock()
newShas := map[fileResourceKey][sha1.Size]byte{}
// Compute the deltas using sha comparisons
nextVersion := s.version + 1
// sha 為上一個(gè)版本的數(shù)據(jù)內(nèi)容
// newData為當(dāng)前版本的數(shù)據(jù)內(nèi)容
// 用sha和newData對(duì)比就可以得到所有事件內(nèi)容
// 最后更新sha為當(dāng)前版本的數(shù)據(jù)內(nèi)容
for k, r := range newData {
newShas[k] = r.sha
sha, exists := s.shas[k]
if exists && sha != r.sha {
if s.version != nextVersion {
s.version = nextVersion
}
s.process(resource.Updated, k, r)
} else if !exists {
if s.version != nextVersion {
s.version = nextVersion
}
s.process(resource.Added, k, r)
}
}
for k := range s.shas {
if _, exists := newShas[k]; !exists {
s.process(resource.Deleted, k, nil)
}
}
s.shas = newShas
}
主要內(nèi)容如下:
1.sha
為上一個(gè)版本的數(shù)據(jù)內(nèi)容
2.newData
為當(dāng)前版本的數(shù)據(jù)內(nèi)容
3. 用sha
和newData
對(duì)比就可以得到所有事件內(nèi)容并通過(guò)process
方法發(fā)送給runtime
.
4. 最后更新sha
為當(dāng)前版本的數(shù)據(jù)內(nèi)容.
3.2.5 總結(jié)
1. 初始的時(shí)候通過(guò)
initalCheck
->readFile
->process
->handler
->runtime
.
2. 初始完的時(shí)候會(huì)發(fā)送一個(gè)FullSync
事件表明第一次初始化結(jié)束.
3. 通過(guò)FileTrigger
監(jiān)控文件變化, 如有變化通過(guò)reload
方法重新加載文件并更新內(nèi)容對(duì)象, 并且根據(jù)當(dāng)前內(nèi)容和上一個(gè)版本內(nèi)容對(duì)比發(fā)送對(duì)應(yīng)事件給runtime
.
3. k8s
有了針對(duì)文件作為
source
的理解, 對(duì)于k8s
的理解就會(huì)更簡(jiǎn)單了, 從原理上講,fs
監(jiān)控文件的變化,k8s
使用informer
機(jī)制監(jiān)控k8s
中原生資源和crd
資源的變化即可.
3.1 source
// galley/pkg/source/kube/source.go
func New(interfaces client.Interfaces, resyncPeriod time.Duration, schema *schema.Instance,
cfg *kubeConverter.Config) (runtime.Source, error) {
var err error
var cl kubernetes.Interface
var dynClient kubeDynamic.Interface
var sharedInformers informers.SharedInformerFactory
log.Scope.Info("creating sources for kubernetes resources")
sources := make([]runtime.Source, 0)
for i, spec := range schema.All() {
log.Scope.Infof("[%d]", i)
log.Scope.Infof(" Source: %s", spec.CanonicalResourceName())
log.Scope.Infof(" Collection: %s", spec.Target.Collection)
// If it's a known type, use a custom (optimized) source.
if builtin.IsBuiltIn(spec.Kind) {
// Lazy create the kube client.
if cl, err = getKubeClient(cl, interfaces); err != nil {
return nil, err
}
sharedInformers = getSharedInformers(sharedInformers, cl, resyncPeriod)
// 創(chuàng)建k8s原生資源的source 比如pod, service
source, err := builtin.New(sharedInformers, spec)
if err != nil {
return nil, err
}
sources = append(sources, source)
} else {
// Lazy-create the dynamic client
if dynClient, err = getDynamicClient(dynClient, interfaces); err != nil {
return nil, err
}
// Unknown types use the dynamic source.
// 創(chuàng)建crd資源的source 比如virtualService, gateway等等
source, err := dynamic.New(dynClient, resyncPeriod, spec, cfg)
if err != nil {
return nil, err
}
sources = append(sources, source)
}
}
return &aggregate{
sources: sources,
}, nil
}
可以看到
k8s
的source
使用了aggregate
結(jié)構(gòu)體來(lái)收集到所有的source
. 主要來(lái)自兩大類:
1.k8s
原生資源比如pod
,Service
, 使用原生client-go api
即可. 每個(gè)資源都是一個(gè)source
.
2.crd
資源比如VirtualService
,gateway
, 使用dynamic client
. 每個(gè)資源都是一個(gè)source
.
3.2 Start
func (s *aggregate) Start(handler resource.EventHandler) error {
s.mu.Lock()
defer s.mu.Unlock()
syncGroup := sync.WaitGroup{}
syncGroup.Add(len(s.sources))
syncHandler := func(e resource.Event) {
if e.Kind == resource.FullSync {
// 如果某一個(gè)資源同步完了 就減少一個(gè)
syncGroup.Done()
} else {
// Not a sync event, just pass on to the real handler.
// 不是同步操作 調(diào)用傳入的handler
handler(e)
}
}
for _, source := range s.sources {
// 為每個(gè)資源調(diào)用各自的Start方法
if err := source.Start(syncHandler); err != nil {
return err
}
}
go func() {
// 等待所有資源同步完
syncGroup.Wait()
// 發(fā)送一個(gè)FullSync給runtime 表明所有資源以及同步完成
handler(resource.FullSyncEvent)
}()
return nil
}
1. 可以看到
Start
方法是將所有的source
全部啟動(dòng), 也就是list and watch
所有的資源.
2. 等到所有的資源都已經(jīng)同步完了, 也就是通過(guò)handler(e)
發(fā)送給runtime
了, 該Start
才會(huì)發(fā)送一個(gè)FullSync
事件給runtime
.
3. 關(guān)于某個(gè)資源的Start
就不多說(shuō)了, 了解informer
機(jī)制原理就明白了.
4. server
這里結(jié)合一個(gè)
galley
的server
端啟動(dòng)程序看看是如何調(diào)用的.
// galley/pkg/server/components/processing.go
func (p *Processing) Start() (err error) {
var mesh meshconfig.Cache
var src runtime.Source
if mesh, err = newMeshConfigCache(p.args.MeshConfigFile); err != nil {
return
}
if src, err = p.createSource(mesh); err != nil {
return
}
...
p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
...
}
func (p *Processing) createSource(mesh meshconfig.Cache) (src runtime.Source, err error) {
...
sourceSchema := p.getSourceSchema()
if p.args.ConfigPath != "" {
if src, err = fsNew(p.args.ConfigPath, sourceSchema, converterCfg); err != nil {
return
}
} else {
var k client.Interfaces
if k, err = newKubeFromConfigFile(p.args.KubeConfig); err != nil {
return
}
var found []schema.ResourceSpec
...
sourceSchema = schema.New(found...)
if src, err = newSource(k, p.args.ResyncPeriod, sourceSchema, converterCfg); err != nil {
return
}
}
return
}
可以看到創(chuàng)建的
source
將為參數(shù)傳入到NewProcessor
中, 這個(gè)在 [istio源碼分析][galley] galley之runtime 中已經(jīng)分析過(guò)了, 所以現(xiàn)在已經(jīng)和runtime
對(duì)接上了.
5.參考
1.
istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159