1.controller-manager在集群中的作用
作為集群的管理控制中心,維護(hù)集群中的所有控制器士鸥,對(duì)維持集群的穩(wěn)定和自我修復(fù),實(shí)現(xiàn)高可用挪挤,副本控制等起關(guān)鍵作用。
2.controller-manager內(nèi)部結(jié)構(gòu)圖
3.controller-manager源碼中的關(guān)鍵性調(diào)用鏈
4.具體的源碼分析過程
4.1.組件啟動(dòng)的入口
位置: k8s.io/kubernetes/cmd/kube-controller-manager/controller-manager.go
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewControllerManagerCommand()
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
4.2.讀取配置文件关翎,進(jìn)行配置讀取和初始化默認(rèn)配置
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go ->NewControllerManagerCommand
- 初始化Controller-manager的配置選項(xiàng)結(jié)構(gòu):NewKubeControllerManagerOptions()
- 創(chuàng)建執(zhí)行命令結(jié)構(gòu)包括Use,Long,和Run:cmd := &cobra.Command{
- 解析配置文件: s.AddFlags
1.KnownControllers()獲取所有controller
2.將配置文件中的配置選項(xiàng)注入到配置對(duì)象中
3.同時(shí)將controller需要的參數(shù)寫入.
func NewControllerManagerCommand() *cobra.Command {
s, err := options.NewKubeControllerManagerOptions()
if err != nil {
glog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-controller-manager",
Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
if err := Run(c.Complete()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
s.AddFlags(cmd.Flags(), KnownControllers(), ControllersDisabledByDefault.List())
return cmd
}
4.3.組件啟動(dòng)執(zhí)行
從main中的command.Execute()到4.2中構(gòu)造的Run
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
//加載所有控制器扛门,并將對(duì)應(yīng)參數(shù)注入到控制器中
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
KnownControllers()中的NewControllerInitializers初始化所有的控制器
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
// TODO: volume controller into the IncludeCloudLoops only set.
// TODO: Separate cluster in cloud check from node lifecycle controller.
}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
return controllers
}
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
真正進(jìn)入執(zhí)行
- 啟動(dòng)controller-manager的http服務(wù)和對(duì)應(yīng)處理器,包括安全和非安全:BuildHandlerChain
- 構(gòu)造run的執(zhí)行體
- 需要選主的情況,選主完執(zhí)行run;不需要選主的直接執(zhí)行run,然后panic
// Run runs the KubeControllerManagerOptions. This should never exit.
func Run(c *config.CompletedConfig) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if cfgz, err := configz.New("componentconfig"); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
glog.Errorf("unable to register configz: %c", err)
}
// Start the controller manager HTTP server
stopCh := make(chan struct{})
if c.SecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
if c.InsecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// It'c possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
//啟動(dòng)控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
//note 如果未啟用選主(只是單節(jié)點(diǎn))鲤脏,直接啟動(dòng),并且panic葬凳,不在往下走,因?yàn)閞un內(nèi)部有select掛起
if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
//生成唯一ID,相當(dāng)于進(jìn)程鎖
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(c.ComponentConfig.GenericComponent.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
c.LeaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
//進(jìn)行選主室奏,并在選為主節(jié)點(diǎn)后執(zhí)行run
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.GenericComponent.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
//選主完成后執(zhí)行
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
轉(zhuǎn)到run內(nèi)部核心的三個(gè)動(dòng)作 :CreateControllerContext 火焰、 StartControllers和ctx.InformerFactory.Start
CreateControllerContext
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
- 拿到對(duì)kube-APIserver中資源的操作句柄
- 確認(rèn)Kube-APIServer的健康(最多等待10s),然后拿獲取連接
- 創(chuàng)建控制器上下文
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
//拿到對(duì)APIServer資源的操作句柄
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
//gaogao note : 10s內(nèi)檢查APIserver服務(wù)是否可用
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
//note: DiscoveryClient = discoveryClient.Discovery()
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, stop)
availableResources, err := GetAvailableResources(rootClientBuilder)
if err != nil {
return ControllerContext{}, err
}
cloud, loopMode, err := createCloudProvider(s.ComponentConfig.CloudProvider.Name, s.ComponentConfig.ExternalCloudVolumePlugin,
s.ComponentConfig.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
if err != nil {
return ControllerContext{}, err
}
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
return ctx, nil
}
StartControllers
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
啟動(dòng)初始化的所有控制器
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
···
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
//note : initFn為初始化controller是創(chuàng)建的初始化函數(shù)
started, err := initFn(ctx)
···
}
return nil
}
ctx.InformerFactory.Start
controller-manager中的informer開始啟動(dòng)監(jiān)聽資源的事件胧沫,將事件放到自己的隊(duì)列中(具有限流特性)昌简。處理進(jìn)程從隊(duì)列總獲取事件開始進(jìn)行任務(wù)處理。
將新建的ReplicaSet绒怨,放入隊(duì)列
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}
從隊(duì)列中獲取對(duì)象進(jìn)行處理(具體過程見下方)
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
4.4.以startReplicaSetController為例分析controller的啟動(dòng)和執(zhí)行過程
在StartControllers中initFn方法是NewControllerInitializers中初始化Controller是定義江场,以下主要看下startReplicaSetController。
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go
其中NewReplicaSetController主要是初始化ReplicaSetController的結(jié)構(gòu)窖逗,包括apiserver的客戶端,informer的回調(diào)函數(shù)等等餐蔬。NewReplicaSetController->NewBaseController
func startReplicaSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return true, nil
}
關(guān)鍵函數(shù)run:k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go
run中執(zhí)行rsc.worker碎紊。
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
glog.Infof("Starting %v controller", controllerName)
defer glog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
rsc.worker即為rsc.syncHandler,而syncHandler在創(chuàng)建時(shí)來源于rsc.syncReplicaSet(見NewBaseController方法)
那么我們轉(zhuǎn)到syncReplicaSet
位置:k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go
updateReplicaSetStatus:在pod死亡或者新建時(shí)更新
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
//從key中解析出namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
//根據(jù)名稱通過apiserver獲取rs
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
return nil
}
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
//在pod死亡或者新建時(shí)更新
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
···
return manageReplicasErr
}
轉(zhuǎn)到updateReplicaSetStatus:k8s.io/kubernetes/pkg/controller/replicaset/replica_set_utils.go
調(diào)用UpdateStatus樊诺,通過apiserver更新
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
···
updatedRS, updateErr = c.UpdateStatus(rs)
···
return nil, updateErr
}
func (c *replicaSets) UpdateStatus(replicaSet *v1.ReplicaSet) (result *v1.ReplicaSet, err error) {
result = &v1.ReplicaSet{}
err = c.client.Put().
Namespace(c.ns).
Resource("replicasets").
Name(replicaSet.Name).
SubResource("status").
Body(replicaSet).
Do().
Into(result)
return
}
5.此處強(qiáng)調(diào)一下controller-manager中PodGCController的清理依據(jù)
1.gc掉超過閾值限制的pod仗考,按時(shí)間排序gc
func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
terminatedPods := []*v1.Pod{}
for _, pod := range pods {
if isPodTerminated(pod) {
terminatedPods = append(terminatedPods, pod)
}
}
terminatedPodCount := len(terminatedPods)
sort.Sort(byCreationTimestamp(terminatedPods))
deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount
}
if deleteCount > 0 {
glog.Infof("garbage collecting %v pods", deleteCount)
}
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
defer wait.Done()
if err := gcc.deletePod(namespace, name); err != nil {
// ignore not founds
defer utilruntime.HandleError(err)
}
}(terminatedPods[i].Namespace, terminatedPods[i].Name)
}
wait.Wait()
}
2.gc掉孤兒pod:pod上的node信息不在當(dāng)前可調(diào)度的節(jié)點(diǎn)上,即沒有和有效node綁定
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
glog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return
}
nodeNames := sets.NewString()
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if nodeNames.Has(pod.Spec.NodeName) {
continue
}
glog.V(2).Infof("Found orphaned Pod %v assigned to the Node %v. Deleting.", pod.Name, pod.Spec.NodeName)
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
utilruntime.HandleError(err)
} else {
glog.V(0).Infof("Forced deletion of orphaned Pod %s succeeded", pod.Name)
}
}
}
3.gc掉沒有調(diào)度成功的pod:表現(xiàn)在pod的NodeName為空,主要由于資源等條件不滿足
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
glog.V(4).Infof("GC'ing unscheduled pods which are terminating.")
for _, pod := range pods {
if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
continue
}
glog.V(2).Infof("Found unscheduled terminating Pod %v not assigned to any Node. Deleting.", pod.Name)
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
utilruntime.HandleError(err)
} else {
glog.V(0).Infof("Forced deletion of unscheduled terminating Pod %s succeeded", pod.Name)
}
}
}