Ingress Nginx的系統(tǒng)架構(gòu)
Ingress Nginx的主流程邏輯
- 解析命令行參數(shù)
一個(gè)常見的命令行如下所示
/nginx-ingress-controller --default-backend-service=ingress-nginx/default-http-backend --configmap=ingress-nginx/nginx-configuration --tcp-services-configmap=ingress-nginx/tcp-services --udp-services-configmap=ingress-nginx/udp-services --publish-service=ingress-nginx/ingress-nginx --annotations-prefix=nginx.ingress.kubernetes.io - 顯示nginx的版本號
實(shí)際調(diào)用命令為:nginx -v或者nginx -V - 創(chuàng)建 API Server客戶端
有兩種模式來獲取APIServer的客戶端對象:
第一種是指定配置APIServerHost、配置KubeConfigFie曲管;
第二種是InCluster模式转培,這種一般是基于k8s調(diào)用Ingress Nginx運(yùn)行,作為容器啟動(dòng)怠硼,會(huì)從環(huán)境變量中讀取相關(guān)參數(shù):
1)KUBERNETES_SERVICE_HOST 例如:KUBERNETES_PORT=tcp://10.96.0.1:443
2)/var/run/secrets/kubernetes.io/serviceaccount 目錄下的token, ca.crt文件 - 解析并驗(yàn)證是否存在指定的缺省后端服務(wù)的名字空間和Service名稱
這個(gè)是ingress-nginx的默認(rèn)后端,用來將未知請求全部負(fù)載到這個(gè)默認(rèn)后端上移怯,這個(gè)默認(rèn)后端會(huì)返回404頁面香璃。 - 創(chuàng)建偽SSL證書
TODO:具體用處待分析 - 普羅米修斯監(jiān)控初始化
- 創(chuàng)建并啟動(dòng)NGINXController對象,后面
- 啟動(dòng)HTTP服務(wù)舟误,主要用于健康檢查葡秒、指標(biāo)查看、Profile功能
創(chuàng)建NGINXController對象
這個(gè)功能在NewNGINXController方法中完成嵌溢,代碼的分布解析如下:
- 創(chuàng)建并啟動(dòng)事件廣播
eventBroadcaster := record.NewBroadcaster() // 創(chuàng)建事件廣播對象
eventBroadcaster.StartLogging(glog.Infof) // 啟動(dòng)事件日志記錄功能
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ // 啟動(dòng)日志sink功能眯牧,同步到API Server
Interface: config.Client.CoreV1().Events(config.Namespace),
})
上面的代碼創(chuàng)建并啟動(dòng)了事件廣播對象,事件產(chǎn)生器是在NginxController對象構(gòu)建中創(chuàng)建
n := &NGINXController{
......
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
})
......
}
// 產(chǎn)生事件的代碼赖草,譬如創(chuàng)建了一個(gè)Ingress
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
- 接下來會(huì)獲取系統(tǒng)名字服務(wù)器的IP信息学少,具體調(diào)用
h, err := dns.GetSystemNameServers()
從 /etc/resolv.conf 文件中讀取dns的resolve ip信息
- 創(chuàng)建NGINXController對象
n := &NGINXController{
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
stopLock: &sync.Mutex{},
fileSystem: fs,
runningConfig: new(ingress.Configuration), // 運(yùn)行時(shí)配置,剛開始為空秧骑,會(huì)在同步Ingress信息的時(shí)候版确,填充完整,見syncIngress方法
Proxy: &TCPProxy{},
metricCollector: mc,
}
syncRateLimiter 成員是流控對象乎折,k8s流控依賴于golang.org/x/time/rate中的頻率限制模塊绒疗,流量控制的接口如下:
type RateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}
實(shí)現(xiàn)流量控制,一般我們會(huì)設(shè)置流控的QPS骂澄,在執(zhí)行操作執(zhí)行吓蘑,我們先調(diào)用一下流控的Accept函數(shù),等待令牌滿足坟冲,才接著往下執(zhí)行磨镶,這樣就達(dá)到了流控的效果。
updateCh成員是RingChannel實(shí)例對象樱衷,它實(shí)現(xiàn)了一個(gè)永遠(yuǎn)不會(huì)阻塞寫操作的Channel接口棋嘲。比如:如果當(dāng)我們寫入RingChannel時(shí),RingChannel緩存滿了矩桂,那么Buffer中最老的數(shù)據(jù)就會(huì)被丟棄沸移。
store.Storer
Storer 是一個(gè)接口痪伦,它封裝了一個(gè)獲取ingress、service雹锣、secrets和ingress annotations的方法网沾。
type Storer interface {
// GetBackendConfiguration returns the nginx configuration stored in a configmap
GetBackendConfiguration() ngx_config.Configuration
// GetConfigMap returns the ConfigMap matching key.
GetConfigMap(key string) (*corev1.ConfigMap, error)
// GetSecret returns the Secret matching key.
GetSecret(key string) (*corev1.Secret, error)
// GetService returns the Service matching key.
GetService(key string) (*corev1.Service, error)
// GetServiceEndpoints returns the Endpoints of a Service matching key.
GetServiceEndpoints(key string) (*corev1.Endpoints, error)
// GetIngress returns the Ingress matching key.
GetIngress(key string) (*extensions.Ingress, error)
// ListIngresses returns a list of all Ingresses in the store.
ListIngresses() []*extensions.Ingress
// GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
GetIngressAnnotations(key string) (*annotations.Ingress, error)
// GetLocalSSLCert returns the local copy of a SSLCert
GetLocalSSLCert(name string) (*ingress.SSLCert, error)
// ListLocalSSLCerts returns the list of local SSLCerts
ListLocalSSLCerts() []*ingress.SSLCert
// GetAuthCertificate resolves a given secret name into an SSL certificate.
// The secret must contain 3 keys named:
// ca.crt: contains the certificate chain used for authentication
GetAuthCertificate(string) (*resolver.AuthSSLCert, error)
// GetDefaultBackend returns the default backend configuration
GetDefaultBackend() defaults.Backend
// Run initiates the synchronization of the controllers
Run(stopCh chan struct{})
}
Storer對象是與API Server溝通的入口,所以這塊是系統(tǒng)的關(guān)鍵蕊爵,所有的數(shù)據(jù)變更都是從API Server過來辉哥,所以,Storer是驅(qū)動(dòng)系統(tǒng)運(yùn)行的關(guān)鍵模塊攒射。Storer的具體的實(shí)現(xiàn)類為k8sStore醋旦,它的定義如下:
type k8sStore struct {
isOCSPCheckEnabled bool
// backendConfig contains the running configuration from the configmap
// this is required because this rarely changes but is a very expensive
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration
// informer contains the cache Informers
informers *Informer // 封裝了所有關(guān)心的組件的通知機(jī)制
// listers contains the cache.Store interfaces used in the ingress controller
listers *Lister // 從通知中獲取的對應(yīng)的只讀存儲(chǔ)信息
// sslStore local store of SSL certificates (certificates used in ingress)
// this is required because the certificates must be present in the
// container filesystem
sslStore *SSLCertTracker
annotations annotations.Extractor // 提供了Ingress Annotations的名字與提取方法對
// secretIngressMap contains information about which ingress references a
// secret in the annotations.
secretIngressMap ObjectRefMap // 保存了每個(gè)Ingress引用了哪些secrets的信息
filesystem file.Filesystem
// updateCh
updateCh *channels.RingChannel
// mu protects against simultaneous invocations of syncSecret
mu *sync.Mutex
defaultSSLCertificate string
}
在k8sStore中封裝了幾個(gè)關(guān)鍵的對象Informers、Listener会放、updateCh饲齐, 其中Informers是關(guān)鍵驅(qū)動(dòng)邏輯,它實(shí)時(shí)從API Server獲取各種資源的變化信息咧最,針對不同類型的資源變化進(jìn)行相應(yīng)的回調(diào)處理捂人,最終回調(diào)函數(shù)都會(huì)形成事件放入updateCh隊(duì)列中去處理。
Informer
Informer封裝了ingress需要的SharedIndexInformers矢沿,用于與API Server交互滥搭,它是在Storer構(gòu)建時(shí)創(chuàng)建的,由于篇幅比較中捣鲸,所以這里單獨(dú)拉出來探討瑟匆。
SharedIndexInformer是基于一種共享的數(shù)據(jù)通知機(jī)制,共享數(shù)據(jù)通知對象的構(gòu)建是基于一個(gè)Factory來創(chuàng)建和返回摄狱。Factory會(huì)緩存創(chuàng)建過的對象脓诡,下次再次獲取同樣的對象時(shí)无午,會(huì)從緩存中換回媒役。SharedIndexInformer基于兩項(xiàng):底層數(shù)據(jù)(一般是API Server)和緩存數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)生變更時(shí)宪迟,在更新緩存的同時(shí)酣衷,可以同時(shí)向多個(gè)偵聽器發(fā)送通知回調(diào)處理。
type Informer struct {
Ingress cache.SharedIndexInformer
Endpoint cache.SharedIndexInformer
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
}
從Informer的定義我們可以看出次泽,系統(tǒng)關(guān)心的資源有:Ingress穿仪、EndPoint、Service意荤、Secret啊片、ConfigMap。
SharedIndexInformer的創(chuàng)建
所有的SharedIndexInformer都是基于factory創(chuàng)建玖像,如下所示:
// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
每個(gè)資源都添加了一個(gè)事件處理器紫谷,負(fù)責(zé)處理資源變更事件,這里限于篇幅,只舉例Ingress的資源進(jìn)行說明:
ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ing := obj.(*extensions.Ingress)
if !class.IsValid(ing) {
a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) // 產(chǎn)生一個(gè)創(chuàng)建Ingress的事件通知
store.extractAnnotations(ing) // 提取Annotation信息
store.updateSecretIngressMap(ing) // 注意這里笤昨,保存的是該Ingress引用的Secrets信息
store.syncSecrets(ing) // 把Ingress相應(yīng)的Secrets信息同步到對應(yīng)的文件系統(tǒng)中祖驱,主要是TLS Secrets(包括證書和Key)
updateCh.In() <- Event{ // 放入隊(duì)列,用于更新配置文件
Type: CreateEvent,
Obj: obj,
}
},
DeleteFunc: func(obj interface{}) {
ing, ok := obj.(*extensions.Ingress)
if !ok {
// If we reached here it means the ingress was deleted but its final state is unrecorded.
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %#v", obj)
return
}
ing, ok = tombstone.Obj.(*extensions.Ingress)
if !ok {
glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
return
}
}
if !class.IsValid(ing) {
glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
return
}
recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
store.listers.IngressAnnotation.Delete(ing)
key := k8s.MetaNamespaceKey(ing)
store.secretIngressMap.Delete(key)
updateCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
}
},
UpdateFunc: func(old, cur interface{}) {
oldIng := old.(*extensions.Ingress)
curIng := cur.(*extensions.Ingress)
validOld := class.IsValid(oldIng)
validCur := class.IsValid(curIng)
if !validOld && validCur {
glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validOld && !validCur {
glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
} else if validCur && !reflect.DeepEqual(old, cur) {
recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
}
store.extractAnnotations(curIng)
store.updateSecretIngressMap(curIng)
store.syncSecrets(curIng)
updateCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
}
},
}
// 添加事件處理器
store.informers.Ingress.AddEventHandler(ingEventHandler)
資源事件變化處理是驅(qū)動(dòng)Ingress Nginx運(yùn)行的關(guān)鍵瞒窒,這里先不打算在本篇文章中詳細(xì)描述了捺僻,如果有必要,將會(huì)另外寫一篇文章去講述崇裁。
啟動(dòng)Informer
Informer的啟動(dòng)在k8sStore的Run方法中驅(qū)動(dòng)匕坯,啟動(dòng)。
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start informers
s.informers.Run(stopCh)
if s.isOCSPCheckEnabled {
go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
}
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh, // 需要等待除了Ingress之外的各種資源同步完成采取處理里面的
i.Endpoint.HasSynced,
i.Service.HasSynced,
i.Secret.HasSynced,
i.ConfigMap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
// we can start syncing ingress objects only after other caches are
// ready, because ingress rules require content from other listers, and
// 'add' events get triggered in the handlers during caches population.
go i.Ingress.Run(stopCh) // 啟動(dòng)Ingress Informer
if !cache.WaitForCacheSync(stopCh,
i.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
}
Informers的啟動(dòng)代碼比較清晰拔稳,首先啟動(dòng)除了Ingress Informer之外的其他資源的Informer醒颖,并且等待cache同步完成后,采取啟動(dòng)Ingress Informer壳炎,因?yàn)镮ngress的規(guī)則需要其他資源的內(nèi)容泞歉。