[k8s源碼分析][kube-scheduler]scheduler之啟動run(1)

1. 前言

轉載請說明原文出處, 尊重他人勞動成果!

本文將分析cmd/kube-schedulerpkg/scheduler/scheduler.gopkg/scheduler/factory/factory.go等目錄或文件. 其中比較重要的兩個類configFactory(factory.go)和Scheduler(scheduler.go).
源碼位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)

2. 流程圖

run_1.png

3. 代碼流程

接下來就從代碼的角度看看kube-scheduler是如何啟動的. 為了節(jié)約篇幅, 有些無關或者不影響理解的代碼我將不放到代碼中.

3.1 cmd/kube-scheduler

// cmd/kube-scheduler/scheduler.go

func main() {
    ...
    command := app.NewSchedulerCommand()
    ...
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

通過NewSchedulerCommand()方法進入到了cmd/kube-scheduler/app/server.go.

// cmd/kube-scheduler/app/server.go

// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        ...
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    ...
    return cmd
}

關于cobra可以自己去官網(wǎng)看, 就是一個命令行的工具, 這里不多加介紹了.
主要需要關注一下opts, err := options.NewOptions(), 因為這里會生成一些默認的屬性, 比較重要的兩個地方就是:
DefaultProvider, 就是默認調度器的名字.
LeaderElection的屬性會設置為true, 就是kube-scheduler要啟動高可用, 這里會有一篇單獨的博客來進行介紹.

另外如果kube-scheduler命令設置了--config文件來設置自定義調度器, 會從cmd/kube-scheduler/app/options/options.go中的Flags進行解析.

// cmd/kube-scheduler/app/options/options.go

// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
    fs := nfs.FlagSet("misc")
    fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
    fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
    fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

    o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
    o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
    o.Authentication.AddFlags(nfs.FlagSet("authentication"))
    o.Authorization.AddFlags(nfs.FlagSet("authorization"))
    o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

    leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
    utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))

    return nfs
}

現(xiàn)在回到上面的NewSchedulerCommand方法中, 已經(jīng)完成了opts, 所以就調用了runCommand方法.

// cmd/kube-scheduler/app/server.go

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    ...
    // 對opts的屬性進行驗證
    if errs := opts.Validate(); len(errs) > 0 {
        fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
        os.Exit(1)
    }
    // 如果需要 就把opts的ComponentConfig文件保存起來
    if len(opts.WriteConfigTo) > 0 {
        if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
    }
    // 根據(jù)opts生成一個scheduler config 對象
    c, err := opts.Config()
    if err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
    stopCh := make(chan struct{})
    // Get the completed config
    // 根據(jù)scheduler config 生成一個completed config
    cc := c.Complete()
    // 看看打開哪些feature
    algorithmprovider.ApplyFeatureGates()
    // 向componentconfig中注冊配置文件 
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    // 上面的一系列操作 就是為了獲得一個completed config
    return Run(cc, stopCh)
}

這里的一系列操作 就是為了獲得一個completed config, 然后給Run調用. 這里需要關注一個地方就是opts.Config().

// cmd/kube-scheduler/app/options/options.go

func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
    // 如果kube-scheduler 沒有指定--config 就是從默認配置(o.ComponentConfig)拿 
    if len(o.ConfigFile) == 0 {
        c.ComponentConfig = o.ComponentConfig

        // only apply deprecated flags if no config file is loaded (this is the old behaviour).
        if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
            return err
        }
        if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
            return err
        }
    } else {
        // 如果kube-scheduler 指定了--config 那就會從配置文件中取
        cfg, err := loadConfigFromFile(o.ConfigFile)
        if err != nil {
            return err
        }

        // use the loaded config file only, with the exception of --address and --port. This means that
        // none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
        // behaviour of the flags we have to keep.
        c.ComponentConfig = *cfg

        if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
            return err
        }
    }
    ...
    return nil
}

func (o *Options) Config() (*schedulerappconfig.Config, error) {
    if o.SecureServing != nil {
        if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
            return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
        }
    }

    c := &schedulerappconfig.Config{}
    if err := o.ApplyTo(c); err != nil {
        return nil, err
    }

    // Prepare kube clients.
    // 生成client 可以調用api-server
    client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
    if err != nil {
        return nil, err
    }
    ...
    // Set up leader election if enabled.
    var leaderElectionConfig *leaderelection.LeaderElectionConfig
    // 默認值就是true 只要用戶不設置為false 這一步就會執(zhí)行
    // 也就是說kube-scheduler 默認就是支持高可用
    if c.ComponentConfig.LeaderElection.LeaderElect {
        leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
        if err != nil {
            return nil, err
        }
    }

    c.Client = client
    c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
    c.PodInformer = factory.NewPodInformer(client, 0)
    c.EventClient = eventClient
    c.Recorder = recorder
    c.Broadcaster = eventBroadcaster
    c.LeaderElection = leaderElectionConfig

    return c, nil
}

ApplyTo: 主要是操作是否有配置文件, 如果有配置文件就會從配置文件中讀取.
Config: 主要為了生成與api-server通信的client以及leaderElectionConfig用于支持kube-scheduler高可用.

接下來回到cmd/kube-scheduler/app/server.go中的runCommand, 然后進行Run(cc, stopCh)方法. 因為該Run是真正的核心方法, 所以這里我們主要分塊分析, 先看看是如何生成pkg/scheduler/scheduler.go中的Scheduler對象.

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    // Create the scheduler.
    // 生成pkg/scheduler/scheduler.go 的Scheduler對象
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        storageClassInformer,
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
...
}

可以看到之前的一系列操作都是為了生成Scheduler所需要的配置. 包括了cc.Client, cc.ComponentConfig.AlgorithmSource等等.

3.2 pkg/scheduler/scheduler.go

可以看到整個Scheduler結構體就一個屬性, 就是pkg/scheduler/factory/factory.go中的Config結構體.

type Scheduler struct {
    config *factory.Config
}

來看看New方法

func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    /**
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
     */
    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    // 生成factory的config-factory
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        EnableEquivalenceClassCache:    options.enableEquivalenceClassCache,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // 默認調度器會進入到這里 *source.Provider = DefaultProvider
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // 自定義調度器會進入到這里
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}

1. 根據(jù)傳起來的opts方法生成options, 因為默認的屬性是下面的幾個, 如果需要改變, 就是通過opts方法中來改變.

var defaultSchedulerOptions = schedulerOptions{
    schedulerName:                  v1.DefaultSchedulerName,
    hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
    enableEquivalenceClassCache:    false,
    disablePreemption:              false,
    percentageOfNodesToScore:       schedulerapi.DefaultPercentageOfNodesToScore,
    bindTimeoutSeconds:             BindTimeoutSeconds,
}

2. 根據(jù)參數(shù)生成factoryconfigFactory對象名字為configurator, 這個后面部分會具體研究.
3. 根據(jù)source的不同來選擇如何生成Scheduler對象的Config, 這里就討論默認調度器的, 自定義調度器會有一篇專門博客介紹. 所以就是會進入sc, err := configurator.CreateFromProvider(*source.Provider)中生成所需的Config.

// pkg/scheduler/factory/factory.go

// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    provider, err := GetAlgorithmProvider(providerName)
    if err != nil {
        return nil, err
    }
    return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

這里在[k8s源碼分析][kube-scheduler]scheduler/algorithmprovider之注冊default-scheduler已經(jīng)介紹了GetAlgorithmProvider(providerName)其中providerName="DefaultProvider"獲得了默認調度器的所有預選和優(yōu)選方法的key.

然后根據(jù)configFactoryCreateFromKeys根據(jù)所有的預選方法和優(yōu)選方法以及擴展方法(這里是空的)生成了Scheduler所需要的Config. 這里CreateFromKeys放到后面的configFactory一起說明.

4. 根據(jù)Config生成Scheduler對象sched.

// pkg/scheduler/scheduler.go

// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
    metrics.Register()
    return &Scheduler{
        config: config,
    }
}

3.3 pkg/scheduler/factory/factory.go

這里將分析3.2 pkg/scheduler/scheduler.go中提到的configurator := factory.NewConfigFactory部分.

3.3.1 configFactory

下面是關于configFactory結構體的定義

type configFactory struct {
    // 與api-server通信的客戶端
    client clientset.Interface
    // queue for pods that need scheduling
    // 存著那些需要調度的pod
    podQueue internalqueue.SchedulingQueue
    // a means to list all known scheduled pods.
    // 可以獲得所有已經(jīng)調度的pod
    scheduledPodLister corelisters.PodLister
    // a means to list all known scheduled pods and pods assumed to have been scheduled.
    // 可以獲得所有已經(jīng)調度的pod和那些assumed pod
    podLister algorithm.PodLister
    // a means to list all nodes
    nodeLister corelisters.NodeLister
    // a means to list all PersistentVolumes
    pVLister corelisters.PersistentVolumeLister
    // a means to list all PersistentVolumeClaims
    pVCLister corelisters.PersistentVolumeClaimLister
    // a means to list all services
    serviceLister corelisters.ServiceLister
    // a means to list all controllers
    controllerLister corelisters.ReplicationControllerLister
    // a means to list all replicasets
    replicaSetLister appslisters.ReplicaSetLister
    // a means to list all statefulsets
    statefulSetLister appslisters.StatefulSetLister
    // a means to list all PodDisruptionBudgets
    pdbLister policylisters.PodDisruptionBudgetLister
    // a means to list all StorageClasses
    storageClassLister storagelisters.StorageClassLister
    // Close this to stop all reflectors
    StopEverything <-chan struct{}
    scheduledPodsHasSynced cache.InformerSynced
    schedulerCache schedulerinternalcache.Cache
    // SchedulerName of a scheduler is used to select which pods will be
    // processed by this scheduler, based on pods's "spec.schedulerName".
    // 調度器的名字 默認為default-scheduler
    schedulerName string
    // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
    // corresponding to every RequiredDuringScheduling affinity rule.
    // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
    hardPodAffinitySymmetricWeight int32
    // Equivalence class cache
    // 加速predicate階段的equivalence class cache
    equivalencePodCache *equivalence.Cache
    // Enable equivalence class cache
    enableEquivalenceClassCache bool
    // Handles volume binding decisions
    volumeBinder *volumebinder.VolumeBinder
    // Always check all predicates even if the middle of one predicate fails.
    alwaysCheckAllPredicates bool
    // Disable pod preemption or not.
    // 是否禁止搶占
    disablePreemption bool
    // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
    percentageOfNodesToScore int32
}

NewFactory 方法

func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        enableEquivalenceClassCache:    args.EnableEquivalenceClassCache,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    ...
    c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
    ...
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, compareSignal)

    go func() {
        for {
            select {
            case <-c.StopEverything:
                c.podQueue.Close()
                return
            case <-ch:
                debugger.Comparer.Compare()
                debugger.Dumper.DumpAll()
            }
        }
    }()

    return c
}

這里主要需要注意幾點就是:
1. schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)實例化了一個schedulerCache, 它的具體實現(xiàn)和結構在[k8s源碼分析][kube-scheduler]scheduler/internal/cache之node_tree和cache已經(jīng)分析過了, 這里主要看一下它在哪里會用到.
2. 可以看到configFactorypodListerschedulerCache用的是同一個schedulerCache對象. 因為podLister的定義就是可以獲得所有已經(jīng)調度的pod和那些assumed pod, 所以用schedulerCache很好理解.
3. configFactoryscheduledPodLister定義是可以獲得所有已經(jīng)調度的pod, args.PodInformer.Lister()可以得到所有的pod, 關于informerclient-go系列會有專門博客分析, 這里不細說, 很明顯assignedPodLister就是在args.PodInformer.Lister()外面加了一層過濾那些已經(jīng)被調度的pods.

c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
type assignedPodLister struct {
    corelisters.PodLister
}

// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
    list, err := l.PodLister.List(selector)
    if err != nil {
        return nil, err
    }
    filtered := make([]*v1.Pod, 0, len(list))
    for _, pod := range list {
        // 選擇那些已經(jīng)被調度過的
        if len(pod.Spec.NodeName) > 0 {
            filtered = append(filtered, pod)
        }
    }
    return filtered, nil
}

// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
    return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
}

4. podQueue的定義是存著那些需要調度的pod, 因此用的internalqueue.NewSchedulingQueue(stopEverything), 關于scheduling_queue[k8s源碼分析][kube-scheduler]scheduler/internal/queue之優(yōu)先隊列scheduling_queue(1)[k8s源碼分析][kube-scheduler]scheduler/internal/queue之優(yōu)先隊列scheduling_queue(2) 中有詳細分析過.
5. 就是關于各種informers添加各種處理邏輯EventHandler, 包括podInformer, serviceInformer, NodeInformer, PvInformer, PvcInformer, StorageClassInformer等等, 該部分會在下一個主題分析.

3.2 Config
type Config struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    // 一個schedulerCache 就是configFactory的schedulerCache
    SchedulerCache schedulerinternalcache.Cache
    // Ecache is used for optimistically invalid affected cache items after
    // successfully binding a pod
    // 就是configFactory的equivalencePodCache
    Ecache     *equivalence.Cache
    // 獲得所有Node的Lister
    NodeLister algorithm.NodeLister
    // 用于調度的算法
    Algorithm  algorithm.ScheduleAlgorithm
    // Bind方法
    GetBinder  func(pod *v1.Pod) Binder
    // PodConditionUpdater is used only in case of scheduling errors. If we succeed
    // with scheduling, PodScheduled condition will be updated in apiserver in /bind
    // handler so that binding and setting PodCondition it is atomic.
    PodConditionUpdater PodConditionUpdater
    // PodPreemptor is used to evict pods and update pod annotations.
    // 搶占器
    PodPreemptor PodPreemptor
    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    // 取下一個需要調度的pod
    // 如果沒有了, 則block一直等到有
    NextPod func() *v1.Pod
    // WaitForCacheSync waits for scheduler cache to populate.
    // It returns true if it was successful, false if the controller should shutdown.
    WaitForCacheSync func() bool
    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*v1.Pod, error)
    // Recorder is the EventRecorder to use
    Recorder record.EventRecorder
    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}
    VolumeBinder *volumebinder.VolumeBinder
    DisablePreemption bool
    // cache需要被調度的pod
    SchedulingQueue internalqueue.SchedulingQueue
}

這里需要注意的是:
NextPod: 是一個方法, 所有的需要調度的pod都會存到這里, 然后一個一個出來進行調度.

接下來看看上面提到configFactoryCreateFromKeys, 該方法根據(jù)當前的configFactory根據(jù)提供的預選方法, 優(yōu)選方法和擴展方法從而生成一個factory.go中的Config對象.

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
    klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

    if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
        return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
    }

    // 根據(jù)當前的預選key得到所有的預選方法
    predicateFuncs, err := c.GetPredicates(predicateKeys)
    if err != nil {
        return nil, err
    }

    // 根據(jù)當前的優(yōu)選key得到所有的優(yōu)選方法
    priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
    if err != nil {
        return nil, err
    }

    // priorityMetaProducer 在算分的時候會用到
    priorityMetaProducer, err := c.GetPriorityMetadataProducer()
    if err != nil {
        return nil, err
    }
    // predicateMetaProducer 在真正預選的時候會用到
    predicateMetaProducer, err := c.GetPredicateMetadataProducer()
    if err != nil {
        return nil, err
    }

    // 是否打開了加速predicate的equivalence class cache
    // Init equivalence class cache
    if c.enableEquivalenceClassCache {
        c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
        klog.Info("Created equivalence class cache")
    }

    // 生成真正進行調度計算的Algorithm algorithm.ScheduleAlgorithm
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.equivalencePodCache,
        c.podQueue,
        predicateFuncs,
        predicateMetaProducer,
        priorityConfigs,
        priorityMetaProducer,
        extenders,
        c.volumeBinder,
        c.pVCLister,
        c.pdbLister,
        c.alwaysCheckAllPredicates,
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )

    podBackoff := util.CreateDefaultPodBackoff()
    return &Config{
        SchedulerCache: c.schedulerCache,
        Ecache:         c.equivalencePodCache,
        // The scheduler only needs to consider schedulable nodes.
        NodeLister:          &nodeLister{c.nodeLister},
        Algorithm:           algo,
        GetBinder:           c.getBinderFunc(extenders),
        PodConditionUpdater: &podConditionUpdater{c.client},
        PodPreemptor:        &podPreemptor{c.client},
        WaitForCacheSync: func() bool {
            return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
        },
        NextPod: func() *v1.Pod {
            return c.getNextPod()
        },
        Error:           c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
        StopEverything:  c.StopEverything,
        VolumeBinder:    c.volumeBinder,
        SchedulingQueue: c.podQueue,
    }, nil
}

這里需要注意的是:
1. 根據(jù)預選, 優(yōu)選key得到其對應的預選和優(yōu)選方法. 并得到注冊的priorityMetaProducerpredicateMetaProducer.
2. 生成真正進行調度計算的algorithm.ScheduleAlgorithm接口類, 返回一個它的實現(xiàn)類genericScheduler(pkg/scheduler/core/generic_scheduler.go)對象.

// pkg/scheduler/algorithm/scheduler_interface.go

type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    // Preempt receives scheduling errors for a pod and tries to create room for
    // the pod by preempting lower priority pods if possible.
    // It returns the node where preemption happened, a list of preempted pods, a
    // list of pods whose nominated node name should be removed, and error if any.
    Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    // Predicates() returns a pointer to a map of predicate functions. This is
    // exposed for testing.
    Predicates() map[string]FitPredicate
    // Prioritizers returns a slice of priority config. This is exposed for
    // testing.
    Prioritizers() []PriorityConfig
}

// pkg/scheduler/core/generic_scheduler.go 

func NewGenericScheduler(
    cache schedulerinternalcache.Cache,
    eCache *equivalence.Cache,
    podQueue internalqueue.SchedulingQueue,
    predicates map[string]algorithm.FitPredicate,
    predicateMetaProducer algorithm.PredicateMetadataProducer,
    prioritizers []algorithm.PriorityConfig,
    priorityMetaProducer algorithm.PriorityMetadataProducer,
    extenders []algorithm.SchedulerExtender,
    volumeBinder *volumebinder.VolumeBinder,
    pvcLister corelisters.PersistentVolumeClaimLister,
    pdbLister algorithm.PDBLister,
    alwaysCheckAllPredicates bool,
    disablePreemption bool,
    percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
    return &genericScheduler{
        cache:                    cache,
        equivalenceCache:         eCache,
        schedulingQueue:          podQueue,
        predicates:               predicates,
        predicateMetaProducer:    predicateMetaProducer,
        prioritizers:             prioritizers,
        priorityMetaProducer:     priorityMetaProducer,
        extenders:                extenders,
        cachedNodeInfoMap:        make(map[string]*schedulercache.NodeInfo),
        volumeBinder:             volumeBinder,
        pvcLister:                pvcLister,
        pdbLister:                pdbLister,
        alwaysCheckAllPredicates: alwaysCheckAllPredicates,
        disablePreemption:        disablePreemption,
        percentageOfNodesToScore: percentageOfNodesToScore,
    }
}

3. 生成GetBinder, getBinderFunc返回一個對該pod支持的extender綁定器或者默認綁定器.

// pkg/scheduler/factory/factory.go 

func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
    var extenderBinder algorithm.SchedulerExtender
    for i := range extenders {
        if extenders[i].IsBinder() {
            extenderBinder = extenders[i]
            break
        }
    }
    defaultBinder := &binder{c.client}
    return func(pod *v1.Pod) Binder {
        if extenderBinder != nil && extenderBinder.IsInterested(pod) {
            return extenderBinder
        }
        return defaultBinder
    }
}

4. 生成PodConditionUpdaterPodPreemptor, 都是與api-server通信的客戶端(client).
5. NextPod這里最核心的一個函數(shù), 因為所有需要調度的pod都是從這里出來的.

NextPod: func() *v1.Pod {
            return c.getNextPod()
        }
func (c *configFactory) getNextPod() *v1.Pod {
    pod, err := c.podQueue.Pop()
    if err == nil {
        klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
        return pod
    }
    klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
    return nil
}

可以看到所有的pod都是從podQueue中出來的, 所以對于pod是在哪里進入podQueue就比較重要了, 這里就會涉及了上面說的各種informers, 所以放到下一篇博客說明.

3.4 返回到Run

3.3 中分析了3.1cmd/kube-scheduler/app/server.goRun方法中是如何生成pkg/scheduler/scheduler.go中的Scheduler對象的. 那么現(xiàn)在該對象創(chuàng)建完了會怎么樣呢膏斤?所以需要回到cmd/kube-scheduler/app/server.goRun方法中.

// cmd/kube-scheduler/app/server.go

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    ...
    // Create the scheduler.
    // 生成pkg/scheduler/scheduler.go 的Scheduler對象
    sched, err := scheduler.New
    ...
    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    // 啟動高可用
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            // 調用run方法
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

1. 啟動了所有informers.
2. 因為默認是支持高可用的, 所以會以高可用的方式啟動sched.Run()方法.

接下來看看sched.Run方法.

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    ...
}

可以看到每隔0秒執(zhí)行scheduleOne方法, 而schedulerOne方法中就是調用sched.config.NextPod()從它的podQueuepop出一個pod進行調度.

說白了就是不斷從podQueue中出一個pod進行調度, 如果podQueue中沒有, 就block在這里.

4. 總結

分析完整個過程, 可以看到
1. 解析文件或者根據(jù)默認配置生成一個completed config.
2. 啟動跟pod有關的informers監(jiān)控集群中的變化并按照相關規(guī)則進入到一個scheduling_queue, 也就是podQueue.
3. 啟動無限制循環(huán)一直讀podQueue來進行調度.

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末球切,一起剝皮案震驚了整個濱河市氓癌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌统扳,老刑警劉巖喘帚,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異咒钟,居然都是意外死亡吹由,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門朱嘴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來倾鲫,“玉大人,你說我怎么就攤上這事萍嬉∥谖簦” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵壤追,是天一觀的道長磕道。 經(jīng)常有香客問我,道長行冰,這世上最難降的妖魔是什么溺蕉? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮悼做,結果婚禮上疯特,老公的妹妹穿的比我還像新娘。我一直安慰自己肛走,他們只是感情好漓雅,可當我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般邻吞。 火紅的嫁衣襯著肌膚如雪庶灿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天吃衅,我揣著相機與錄音,去河邊找鬼腾誉。 笑死徘层,一個胖子當著我的面吹牛,可吹牛的內容都是我干的利职。 我是一名探鬼主播趣效,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼猪贪!你這毒婦竟也來了跷敬?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤热押,失蹤者是張志新(化名)和其女友劉穎西傀,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體桶癣,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡拥褂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了牙寞。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片饺鹃。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖间雀,靈堂內的尸體忽然破棺而出悔详,到底是詐尸還是另有隱情,我是刑警寧澤惹挟,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布茄螃,位于F島的核電站,受9級特大地震影響匪煌,放射性物質發(fā)生泄漏责蝠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一萎庭、第九天 我趴在偏房一處隱蔽的房頂上張望霜医。 院中可真熱鬧,春花似錦驳规、人聲如沸肴敛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽医男。三九已至砸狞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間镀梭,已是汗流浹背刀森。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工研底, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人榜晦。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像羽圃,于是被迫代替她去往敵國和親乾胶。 傳聞我的和親對象是個殘疾皇子朽寞,可洞房花燭夜當晚...
    茶點故事閱讀 44,781評論 2 354