Kuberneters源碼分析 - Ingress nginx 主流程

Ingress Nginx的系統(tǒng)架構(gòu)

nginx-ingress-controller.png

Ingress Nginx的主流程邏輯

  • 解析命令行參數(shù)
    一個(gè)常見的命令行如下所示
    /nginx-ingress-controller --default-backend-service=ingress-nginx/default-http-backend --configmap=ingress-nginx/nginx-configuration --tcp-services-configmap=ingress-nginx/tcp-services --udp-services-configmap=ingress-nginx/udp-services --publish-service=ingress-nginx/ingress-nginx --annotations-prefix=nginx.ingress.kubernetes.io
  • 顯示nginx的版本號
    實(shí)際調(diào)用命令為:nginx -v或者nginx -V
  • 創(chuàng)建 API Server客戶端
    有兩種模式來獲取APIServer的客戶端對象:
    第一種是指定配置APIServerHost、配置KubeConfigFie曲管;
    第二種是InCluster模式转培,這種一般是基于k8s調(diào)用Ingress Nginx運(yùn)行,作為容器啟動(dòng)怠硼,會(huì)從環(huán)境變量中讀取相關(guān)參數(shù):
    1)KUBERNETES_SERVICE_HOST 例如:KUBERNETES_PORT=tcp://10.96.0.1:443
    2)/var/run/secrets/kubernetes.io/serviceaccount 目錄下的token, ca.crt文件
  • 解析并驗(yàn)證是否存在指定的缺省后端服務(wù)的名字空間和Service名稱
    這個(gè)是ingress-nginx的默認(rèn)后端,用來將未知請求全部負(fù)載到這個(gè)默認(rèn)后端上移怯,這個(gè)默認(rèn)后端會(huì)返回404頁面香璃。
  • 創(chuàng)建偽SSL證書
    TODO:具體用處待分析
  • 普羅米修斯監(jiān)控初始化
  • 創(chuàng)建并啟動(dòng)NGINXController對象,后面
  • 啟動(dòng)HTTP服務(wù)舟误,主要用于健康檢查葡秒、指標(biāo)查看、Profile功能

創(chuàng)建NGINXController對象

這個(gè)功能在NewNGINXController方法中完成嵌溢,代碼的分布解析如下:

  • 創(chuàng)建并啟動(dòng)事件廣播
    eventBroadcaster := record.NewBroadcaster()  // 創(chuàng)建事件廣播對象
    eventBroadcaster.StartLogging(glog.Infof)         // 啟動(dòng)事件日志記錄功能
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{  // 啟動(dòng)日志sink功能眯牧,同步到API Server
        Interface: config.Client.CoreV1().Events(config.Namespace),
    })

上面的代碼創(chuàng)建并啟動(dòng)了事件廣播對象,事件產(chǎn)生器是在NginxController對象構(gòu)建中創(chuàng)建

    n := &NGINXController{
            ......
            recorder :=  eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
              Component: "nginx-ingress-controller",
            })
            ......
       }

       // 產(chǎn)生事件的代碼赖草,譬如創(chuàng)建了一個(gè)Ingress
       recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
  • 接下來會(huì)獲取系統(tǒng)名字服務(wù)器的IP信息学少,具體調(diào)用
    h, err := dns.GetSystemNameServers()

從 /etc/resolv.conf 文件中讀取dns的resolve ip信息

  • 創(chuàng)建NGINXController對象
    n := &NGINXController{
        isIPV6Enabled: ing_net.IsIPv6Enabled(),
        resolver:        h,
        cfg:             config,
        syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
        recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
            Component: "nginx-ingress-controller",
        }),
        stopCh:   make(chan struct{}),
        updateCh: channels.NewRingChannel(1024),
        stopLock: &sync.Mutex{},
        fileSystem: fs,
        runningConfig: new(ingress.Configuration), // 運(yùn)行時(shí)配置,剛開始為空秧骑,會(huì)在同步Ingress信息的時(shí)候版确,填充完整,見syncIngress方法
        Proxy: &TCPProxy{},
        metricCollector: mc,
    }

syncRateLimiter 成員是流控對象乎折,k8s流控依賴于golang.org/x/time/rate中的頻率限制模塊绒疗,流量控制的接口如下:

type RateLimiter interface {
    // TryAccept returns true if a token is taken immediately. Otherwise,
    // it returns false.
    TryAccept() bool
    // Accept returns once a token becomes available.
    Accept()
    // Stop stops the rate limiter, subsequent calls to CanAccept will return false
    Stop()
    // QPS returns QPS of this rate limiter
    QPS() float32
}

實(shí)現(xiàn)流量控制,一般我們會(huì)設(shè)置流控的QPS骂澄,在執(zhí)行操作執(zhí)行吓蘑,我們先調(diào)用一下流控的Accept函數(shù),等待令牌滿足坟冲,才接著往下執(zhí)行磨镶,這樣就達(dá)到了流控的效果。

updateCh成員是RingChannel實(shí)例對象樱衷,它實(shí)現(xiàn)了一個(gè)永遠(yuǎn)不會(huì)阻塞寫操作的Channel接口棋嘲。比如:如果當(dāng)我們寫入RingChannel時(shí),RingChannel緩存滿了矩桂,那么Buffer中最老的數(shù)據(jù)就會(huì)被丟棄沸移。

store.Storer

Storer 是一個(gè)接口痪伦,它封裝了一個(gè)獲取ingress、service雹锣、secrets和ingress annotations的方法网沾。

type Storer interface {
    // GetBackendConfiguration returns the nginx configuration stored in a configmap
    GetBackendConfiguration() ngx_config.Configuration

    // GetConfigMap returns the ConfigMap matching key.
    GetConfigMap(key string) (*corev1.ConfigMap, error)

    // GetSecret returns the Secret matching key.
    GetSecret(key string) (*corev1.Secret, error)

    // GetService returns the Service matching key.
    GetService(key string) (*corev1.Service, error)

    // GetServiceEndpoints returns the Endpoints of a Service matching key.
    GetServiceEndpoints(key string) (*corev1.Endpoints, error)

    // GetIngress returns the Ingress matching key.
    GetIngress(key string) (*extensions.Ingress, error)

    // ListIngresses returns a list of all Ingresses in the store.
    ListIngresses() []*extensions.Ingress

    // GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
    GetIngressAnnotations(key string) (*annotations.Ingress, error)

    // GetLocalSSLCert returns the local copy of a SSLCert
    GetLocalSSLCert(name string) (*ingress.SSLCert, error)

    // ListLocalSSLCerts returns the list of local SSLCerts
    ListLocalSSLCerts() []*ingress.SSLCert

    // GetAuthCertificate resolves a given secret name into an SSL certificate.
    // The secret must contain 3 keys named:
    //   ca.crt: contains the certificate chain used for authentication
    GetAuthCertificate(string) (*resolver.AuthSSLCert, error)

    // GetDefaultBackend returns the default backend configuration
    GetDefaultBackend() defaults.Backend

    // Run initiates the synchronization of the controllers
    Run(stopCh chan struct{})
}

Storer對象是與API Server溝通的入口,所以這塊是系統(tǒng)的關(guān)鍵蕊爵,所有的數(shù)據(jù)變更都是從API Server過來辉哥,所以,Storer是驅(qū)動(dòng)系統(tǒng)運(yùn)行的關(guān)鍵模塊攒射。Storer的具體的實(shí)現(xiàn)類為k8sStore醋旦,它的定義如下:

type k8sStore struct {
    isOCSPCheckEnabled bool

    // backendConfig contains the running configuration from the configmap
    // this is required because this rarely changes but is a very expensive
    // operation to execute in each OnUpdate invocation
    backendConfig ngx_config.Configuration

    // informer contains the cache Informers
    informers *Informer  // 封裝了所有關(guān)心的組件的通知機(jī)制

    // listers contains the cache.Store interfaces used in the ingress controller
    listers *Lister           // 從通知中獲取的對應(yīng)的只讀存儲(chǔ)信息

    // sslStore local store of SSL certificates (certificates used in ingress)
    // this is required because the certificates must be present in the
    // container filesystem
    sslStore *SSLCertTracker

    annotations annotations.Extractor // 提供了Ingress Annotations的名字與提取方法對

    // secretIngressMap contains information about which ingress references a
    // secret in the annotations.
    secretIngressMap ObjectRefMap  // 保存了每個(gè)Ingress引用了哪些secrets的信息

    filesystem file.Filesystem

    // updateCh
    updateCh *channels.RingChannel

    // mu protects against simultaneous invocations of syncSecret
    mu *sync.Mutex

    defaultSSLCertificate string
}

在k8sStore中封裝了幾個(gè)關(guān)鍵的對象Informers、Listener会放、updateCh饲齐, 其中Informers是關(guān)鍵驅(qū)動(dòng)邏輯,它實(shí)時(shí)從API Server獲取各種資源的變化信息咧最,針對不同類型的資源變化進(jìn)行相應(yīng)的回調(diào)處理捂人,最終回調(diào)函數(shù)都會(huì)形成事件放入updateCh隊(duì)列中去處理。

Informer

Informer封裝了ingress需要的SharedIndexInformers矢沿,用于與API Server交互滥搭,它是在Storer構(gòu)建時(shí)創(chuàng)建的,由于篇幅比較中捣鲸,所以這里單獨(dú)拉出來探討瑟匆。

SharedIndexInformer是基于一種共享的數(shù)據(jù)通知機(jī)制,共享數(shù)據(jù)通知對象的構(gòu)建是基于一個(gè)Factory來創(chuàng)建和返回摄狱。Factory會(huì)緩存創(chuàng)建過的對象脓诡,下次再次獲取同樣的對象時(shí)无午,會(huì)從緩存中換回媒役。SharedIndexInformer基于兩項(xiàng):底層數(shù)據(jù)(一般是API Server)和緩存數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)生變更時(shí)宪迟,在更新緩存的同時(shí)酣衷,可以同時(shí)向多個(gè)偵聽器發(fā)送通知回調(diào)處理。

type Informer struct {
    Ingress   cache.SharedIndexInformer
    Endpoint  cache.SharedIndexInformer
    Service   cache.SharedIndexInformer
    Secret    cache.SharedIndexInformer
    ConfigMap cache.SharedIndexInformer
}

從Informer的定義我們可以看出次泽,系統(tǒng)關(guān)心的資源有:Ingress穿仪、EndPoint、Service意荤、Secret啊片、ConfigMap。

SharedIndexInformer的創(chuàng)建

所有的SharedIndexInformer都是基于factory創(chuàng)建玖像,如下所示:

    // create informers factory, enable and assign required informers
    infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})

    store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
    store.listers.Ingress.Store = store.informers.Ingress.GetStore()

    store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
    store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

    store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
    store.listers.Secret.Store = store.informers.Secret.GetStore()

    store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
    store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

    store.informers.Service = infFactory.Core().V1().Services().Informer()
    store.listers.Service.Store = store.informers.Service.GetStore()

每個(gè)資源都添加了一個(gè)事件處理器紫谷,負(fù)責(zé)處理資源變更事件,這里限于篇幅,只舉例Ingress的資源進(jìn)行說明:

          ingEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ing := obj.(*extensions.Ingress)
            if !class.IsValid(ing) {
                a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))  // 產(chǎn)生一個(gè)創(chuàng)建Ingress的事件通知

            store.extractAnnotations(ing)  // 提取Annotation信息
            store.updateSecretIngressMap(ing)  // 注意這里笤昨,保存的是該Ingress引用的Secrets信息
            store.syncSecrets(ing)  // 把Ingress相應(yīng)的Secrets信息同步到對應(yīng)的文件系統(tǒng)中祖驱,主要是TLS Secrets(包括證書和Key)
            updateCh.In() <- Event{ // 放入隊(duì)列,用于更新配置文件
                Type: CreateEvent,
                Obj:  obj,
            }
        },
        DeleteFunc: func(obj interface{}) {
            ing, ok := obj.(*extensions.Ingress)
            if !ok {
                // If we reached here it means the ingress was deleted but its final state is unrecorded.
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                    glog.Errorf("couldn't get object from tombstone %#v", obj)
                    return
                }
                ing, ok = tombstone.Obj.(*extensions.Ingress)
                if !ok {
                    glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
                    return
                }
            }
            if !class.IsValid(ing) {
                glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            store.listers.IngressAnnotation.Delete(ing)

            key := k8s.MetaNamespaceKey(ing)
            store.secretIngressMap.Delete(key)

            updateCh.In() <- Event{
                Type: DeleteEvent,
                Obj:  obj,
            }
        },
        UpdateFunc: func(old, cur interface{}) {
            oldIng := old.(*extensions.Ingress)
            curIng := cur.(*extensions.Ingress)
            validOld := class.IsValid(oldIng)
            validCur := class.IsValid(curIng)
            if !validOld && validCur {
                glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validOld && !validCur {
                glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validCur && !reflect.DeepEqual(old, cur) {
                recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            }

            store.extractAnnotations(curIng)
            store.updateSecretIngressMap(curIng)
            store.syncSecrets(curIng)

            updateCh.In() <- Event{
                Type: UpdateEvent,
                Obj:  cur,
            }
        },
    }

      // 添加事件處理器
      store.informers.Ingress.AddEventHandler(ingEventHandler)

資源事件變化處理是驅(qū)動(dòng)Ingress Nginx運(yùn)行的關(guān)鍵瞒窒,這里先不打算在本篇文章中詳細(xì)描述了捺僻,如果有必要,將會(huì)另外寫一篇文章去講述崇裁。

啟動(dòng)Informer

Informer的啟動(dòng)在k8sStore的Run方法中驅(qū)動(dòng)匕坯,啟動(dòng)。

// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
    // start informers
    s.informers.Run(stopCh)

    if s.isOCSPCheckEnabled {
        go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
    }
}

// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
    go i.Endpoint.Run(stopCh)
    go i.Service.Run(stopCh)
    go i.Secret.Run(stopCh)
    go i.ConfigMap.Run(stopCh)

    // wait for all involved caches to be synced before processing items
    // from the queue
    if !cache.WaitForCacheSync(stopCh,  // 需要等待除了Ingress之外的各種資源同步完成采取處理里面的
        i.Endpoint.HasSynced,
        i.Service.HasSynced,
        i.Secret.HasSynced,
        i.ConfigMap.HasSynced,
    ) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    }

    // in big clusters, deltas can keep arriving even after HasSynced
    // functions have returned 'true'
    time.Sleep(1 * time.Second)

    // we can start syncing ingress objects only after other caches are
    // ready, because ingress rules require content from other listers, and
    // 'add' events get triggered in the handlers during caches population.
    go i.Ingress.Run(stopCh) // 啟動(dòng)Ingress Informer
    if !cache.WaitForCacheSync(stopCh,  
        i.Ingress.HasSynced,
    ) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    }
}

Informers的啟動(dòng)代碼比較清晰拔稳,首先啟動(dòng)除了Ingress Informer之外的其他資源的Informer醒颖,并且等待cache同步完成后,采取啟動(dòng)Ingress Informer壳炎,因?yàn)镮ngress的規(guī)則需要其他資源的內(nèi)容泞歉。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市匿辩,隨后出現(xiàn)的幾起案子腰耙,更是在濱河造成了極大的恐慌,老刑警劉巖铲球,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挺庞,死亡現(xiàn)場離奇詭異,居然都是意外死亡稼病,警方通過查閱死者的電腦和手機(jī)选侨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來然走,“玉大人援制,你說我怎么就攤上這事∩秩穑” “怎么了晨仑?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拆檬。 經(jīng)常有香客問我洪己,道長,這世上最難降的妖魔是什么竟贯? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任答捕,我火速辦了婚禮,結(jié)果婚禮上屑那,老公的妹妹穿的比我還像新娘拱镐。我一直安慰自己晌缘,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布痢站。 她就那樣靜靜地躺著磷箕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪阵难。 梳的紋絲不亂的頭發(fā)上岳枷,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機(jī)與錄音呜叫,去河邊找鬼空繁。 笑死,一個(gè)胖子當(dāng)著我的面吹牛朱庆,可吹牛的內(nèi)容都是我干的盛泡。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼娱颊,長吁一口氣:“原來是場噩夢啊……” “哼傲诵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起箱硕,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤拴竹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后剧罩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體栓拜,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年惠昔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了幕与。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡镇防,死狀恐怖啦鸣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情营罢,我是刑警寧澤赏陵,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布饼齿,位于F島的核電站饲漾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏缕溉。R本人自食惡果不足惜考传,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望证鸥。 院中可真熱鬧僚楞,春花似錦勤晚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至膜赃,卻和暖如春挺邀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背跳座。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工端铛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人疲眷。 一個(gè)月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓禾蚕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親狂丝。 傳聞我的和親對象是個(gè)殘疾皇子换淆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)几颜,斷路器产舞,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 一、 K8s 是什么菠剩? Kubernetes(k8s)是自動(dòng)化容器操作的開源平臺易猫,這些操作包括部署,調(diào)度和節(jié)點(diǎn)集群...
    loveroot閱讀 6,638評論 1 21
  • 1具壮、基礎(chǔ)架構(gòu) 1.1 Master Master節(jié)點(diǎn)上面主要由四個(gè)模塊組成:APIServer准颓、scheduler...
    阿斯蒂芬2閱讀 10,852評論 0 44
  • 簡介 kubernetes 使用service和ingress共同構(gòu)建了,外部訪問k8s內(nèi)部容器的通道棺妓。 Serv...
    梅_梅閱讀 986評論 0 2
  • 文似看山不喜平攘已,出自清代袁枚《隨園詩話》,意思是寫文章好比觀賞山峰那樣怜跑,喜歡奇勢迭出样勃,最忌平坦。 凡是好文章性芬,總是...
    云在風(fēng)中誰在聽閱讀 376評論 0 0