kubernetes源碼分析之kube-scheduler

1.kube-scheduler在集群中的作用

kube-scheduler是以插件形式存在的組件,正因為以插件形式存在,所以其具有可擴展可定制的特性。kube-scheduler相當于整個集群的調度決策者,其通過預選和優(yōu)選兩個過程決定容器的最佳調度位置癌别。

3.kube-scheduler源碼中的關鍵性調用鏈

kube-scheduler.png

4.具體的源碼分析過程

4.1.組件啟動入口

位置: k8s.io/kubernetes/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    command := app.NewSchedulerCommand()

    // TODO: once we switch everything over to Cobra commands, we can go back to calling
    // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    // normalize func and add the go flag set by hand.
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

這里需要特別說明的是調度策略的注冊過程,在default.go的init中對默認的Predicate掉奄、AlgorithmProvider以及Priority的策略進行注冊规个。同時指明了默認的調度算法DefaultProvider

位置:k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go

func init() {
    // Register functions that extract metadata used by predicates and priorities computations.
    factory.RegisterPredicateMetadataProducerFactory(
        func(args factory.PluginFactoryArgs) algorithm.PredicateMetadataProducer {
            return predicates.NewPredicateMetadataFactory(args.PodLister)
        })
    ···
    registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
    factory.RegisterFitPredicate("PodFitsPorts", predicates.PodFitsHostPorts)
    // Fit is defined based on the absence of port conflicts.
    // This predicate is actually a default predicate, because it is invoked from
    // predicates.GeneralPredicates()
    factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
    // Fit is determined by resource availability.
    // This predicate is actually a default predicate, because it is invoked from
    // predicates.GeneralPredicates()
    factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
    // Fit is determined by the presence of the Host parameter and a string match
    // This predicate is actually a default predicate, because it is invoked from
    // predicates.GeneralPredicates()
    factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
    // Fit is determined by node selector query.
    factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
    ···
}
···
func registerAlgorithmProvider(predSet, priSet sets.String) {
    //這里指明了默認的調度算法DefaultProvider
    // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
    // by specifying flag.
    factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
    // Cluster autoscaler friendly scheduling algorithm.
    factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
        copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
}

下面回到從組件入口

4.2.讀取配置文件凤薛,進行配置讀取和初始化默認配置

位置:k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

  • 創(chuàng)建SchedulerOptions對象姓建,只要是針對配置參數的對象,以及默認參數初始化NewOptions
  • 構造執(zhí)行命令對象缤苫,后續(xù)excute是真正執(zhí)行cobra.Command
  • 讀取配置參數速兔,并注入到需要使用的對象中,opts.AddFlags活玲。例如負責選主的leaderelectionconfig
func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()
    if err != nil {
        glog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes ······`,
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())

            if len(args) != 0 {
                fmt.Fprint(os.Stderr, "arguments are not supported\n")
            }

            if errs := opts.Validate(); len(errs) > 0 {
                fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
                os.Exit(1)
            }

            if len(opts.WriteConfigTo) > 0 {
                if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
                    fmt.Fprintf(os.Stderr, "%v\n", err)
                    os.Exit(1)
                }
                glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
                return
            }

            c, err := opts.Config()
            if err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }

            stopCh := make(chan struct{})
            if err := Run(c.Complete(), stopCh); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }

    opts.AddFlags(cmd.Flags())
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

4.3.組件啟動執(zhí)行

本質上執(zhí)行上一步的&cobra.Command

if err := command.Execute(); err != nil {

位置:k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

  • 設置調度算法的一些特性
  • 初始化schedulerConfig
  • 創(chuàng)建Scheduler對象
  • 根據是否安全提供健康檢查和指標服務
  • 啟動和資源相關的informer
  • 根據需要進行選主涣狗,然后執(zhí)行調度任務

重點關注NewSchedulerConfigsched.Run()

func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    glog.Infof("Version: %+v", version.Get())

    // Apply algorithms based on feature gates.
    // TODO: make configurable?
    algorithmprovider.ApplyFeatureGates()

    // Configz registration.
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(c.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    // Build a scheduler config from the provided algorithm source.
    schedulerConfig, err := NewSchedulerConfig(c)
    if err != nil {
        return err
    }

    // Create the scheduler.
    sched := scheduler.NewFromConfig(schedulerConfig)

    // Prepare the event broadcaster.
    if c.Broadcaster != nil && c.EventClient != nil {
        c.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.EventClient.Events("")})
    }

    // Start up the healthz server.
    if c.InsecureServing != nil {
        separateMetrics := c.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, separateMetrics), nil, nil)
        if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if c.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&c.ComponentConfig), nil, nil)
        if err := c.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if c.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&c.ComponentConfig, false), c.Authentication.Authenticator, c.Authorization.Authorizer)
        if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }

    // Start all informers.
    go c.PodInformer.Informer().Run(stopCh)
    c.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    c.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)

    // Prepare a reusable run function.
    run := func(stopCh <-chan struct{}) {
        sched.Run()
        <-stopCh
    }

    // If leader election is enabled, run via LeaderElector until done and exit.
    if c.LeaderElection != nil {
        c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run()

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so run inline until done.
    run(stopCh)
    return fmt.Errorf("finished without leader elect")
}

轉到NewSchedulerConfig
位置:k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

  • 創(chuàng)建配置管理器,內部包含各類資源的Informer
  • 根據算法名稱舒憾,通過配置器創(chuàng)建scheduler.Config镀钓,加載相關策略

重點關注NewConfigFactoryCreateFromProvider镀迂,selectHost

// NewSchedulerConfig creates the scheduler configuration. This is exposed for use by tests.
func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Config, error) {
    var storageClassInformer storageinformers.StorageClassInformer
    if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
        storageClassInformer = s.InformerFactory.Storage().V1().StorageClasses()
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(
        s.ComponentConfig.SchedulerName,
        s.Client,
        s.InformerFactory.Core().V1().Nodes(),
        s.PodInformer,
        s.InformerFactory.Core().V1().PersistentVolumes(),
        s.InformerFactory.Core().V1().PersistentVolumeClaims(),
        s.InformerFactory.Core().V1().ReplicationControllers(),
        s.InformerFactory.Extensions().V1beta1().ReplicaSets(),
        s.InformerFactory.Apps().V1beta1().StatefulSets(),
        s.InformerFactory.Core().V1().Services(),
        s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        storageClassInformer,
        s.ComponentConfig.HardPodAffinitySymmetricWeight,
        utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
        s.ComponentConfig.DisablePreemption,
    )

    source := s.ComponentConfig.AlgorithmSource
    var config *scheduler.Config
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            // Use a policy serialized in a file.
            policyFile := source.Policy.File.Path
            _, err := os.Stat(policyFile)
            if err != nil {
                return nil, fmt.Errorf("missing policy config file %s", policyFile)
            }
            data, err := ioutil.ReadFile(policyFile)
            if err != nil {
                return nil, fmt.Errorf("couldn't read policy config: %v", err)
            }
            err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
            if err != nil {
                return nil, fmt.Errorf("invalid policy: %v", err)
            }
        case source.Policy.ConfigMap != nil:
            // Use a policy serialized in a config map value.
            policyRef := source.Policy.ConfigMap
            policyConfigMap, err := s.Client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
            if err != nil {
                return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
            }
            data, found := policyConfigMap.Data[componentconfig.SchedulerPolicyConfigMapKey]
            if !found {
                return nil, fmt.Errorf("missing policy config map value at key %q", componentconfig.SchedulerPolicyConfigMapKey)
            }
            err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
            if err != nil {
                return nil, fmt.Errorf("invalid policy: %v", err)
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = s.Recorder

    config.DisablePreemption = s.ComponentConfig.DisablePreemption
    return config, nil
}

轉到NewConfigFactory
位置:k8s.io/kubernetes/pkg/scheduler/factory/factory.go

  • 為Informer初始化事件監(jiān)聽的回調函數
func NewConfigFactory(
    schedulerName string,
    client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer extensionsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    hardPodAffinitySymmetricWeight int32,
    enableEquivalenceClassCache bool,
    disablePreemption bool,
) scheduler.Configurator {
    stopEverything := make(chan struct{})
    schedulerCache := schedulercache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if storageClassInformer != nil {
        storageClassLister = storageClassInformer.Lister()
    }

    c := &configFactory{
        client:                         client,
        podLister:                      schedulerCache,
        podQueue:                       core.NewSchedulingQueue(),
        pVLister:                       pvInformer.Lister(),
        pVCLister:                      pvcInformer.Lister(),
        serviceLister:                  serviceInformer.Lister(),
        controllerLister:               replicationControllerInformer.Lister(),
        replicaSetLister:               replicaSetInformer.Lister(),
        statefulSetLister:              statefulSetInformer.Lister(),
        pdbLister:                      pdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  schedulerName,
        hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
        enableEquivalenceClassCache:    enableEquivalenceClassCache,
        disablePreemption:              disablePreemption,
    }

    c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
    // scheduled pod cache
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedNonTerminatedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedNonTerminatedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
  ···
}

轉到CreateFromProvider
位置:k8s.io/kubernetes/pkg/scheduler/factory/factory.go

  • 根據名稱獲取調度算法
func (c *configFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
    glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    //provider = algorithmProviderMap[name]
    //
    provider, err := GetAlgorithmProvider(providerName)
    if err != nil {
        return nil, err
    }

    return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

轉到CreateFromKeys
位置:k8s.io/kubernetes/pkg/scheduler/factory/factory.go

  • 根據調度算法和策略實例化調度器
//Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
    ···
    //默認調度器
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.equivalencePodCache,
        c.podQueue,
        predicateFuncs,
        predicateMetaProducer,
        priorityConfigs,
        priorityMetaProducer,
        extenders,
        c.volumeBinder,
        c.pVCLister,
        c.alwaysCheckAllPredicates,
        c.disablePreemption,
    )
    ···
}

回到sched.Run() 真正開始執(zhí)行調度任務
位置:k8s.io/kubernetes/cmd/kube-scheduler/scheduler.go

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
        go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

轉到scheduleOne 串行調度pod
位置:k8s.io/kubernetes/cmd/kube-scheduler/scheduler.go

  • 從隊列中獲取pod 丁溅,為pod選擇合適的調度位置
  • 在緩存中預先綁定Volume資源,
  • 在緩存中預先綁定主機探遵,主要愿意是真正綁定是調用apiserver是會有延遲窟赏,會晚于下一次調度
  • 通過APIserver的client實現真正綁定
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    //從podQueue中獲取新后者更新的pod
    pod := sched.config.NextPod()
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    suggestedHost, err := sched.schedule(pod)
    if err != nil {
        // schedule() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        if fitError, ok := err.(*core.FitError); ok {
            preemptionStartTime := time.Now()
            sched.preempt(pod, fitError)
            metrics.PreemptionAttempts.Inc()
            metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
            metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
        }
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPod := pod.DeepCopy()
    ···
    err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
    if err != nil {
        return
    }

    // assume modifies `assumedPod` by setting NodeName=suggestedHost
    err = sched.assume(assumedPod, suggestedHost)
    if err != nil {
        return
    }
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
        if err != nil {
            glog.Errorf("Internal error binding pod: (%v)", err)
        }
    }()
}

轉到schedule
位置:k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

  • 獲取當前的節(jié)點信息,并更新到本地緩存中
  • 找到符合條件的節(jié)點:findNodesThatFit
  • 根據策略和權重并行給滿足條件的節(jié)點進行打分:PrioritizeNodes
  • 根據優(yōu)先級(得分)篩選出最佳的調度位置

重點關注findNodesThatFitPrioritizeNodes

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return "", err
    }

    nodes, err := nodeLister.List()
    if err != nil {
        return "", err
    }
    if len(nodes) == 0 {
        return "", ErrNoNodesAvailable
    }

    // Used for all fit and priority funcs.
    //每次調度更新新的node
    err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
    if err != nil {
        return "", err
    }

    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return "", err
    }

    if len(filteredNodes) == 0 {
        return "", &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }
    metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

    trace.Step("Prioritizing")
    startPriorityEvalTime := time.Now()
    // When only one node after predicate, just use it.
    if len(filteredNodes) == 1 {
        metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
        return filteredNodes[0].Name, nil
    }

    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return "", err
    }
    metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

    trace.Step("Selecting host")
    return g.selectHost(priorityList)
}

轉到findNodesThatFit
位置:k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go
找到滿足硬性條件的節(jié)點箱季,例如資源涯穷,親和性,反親和性等

重點關注podFitsOnNode

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
···
checkNode := func(i int) {
    nodeName := nodes[i].Name
    fits, failedPredicates, err := podFitsOnNode(
        pod,
        meta,
        g.cachedNodeInfoMap[nodeName],
        g.predicates,
        g.cache,
        g.equivalenceCache,
        g.schedulingQueue,
        g.alwaysCheckAllPredicates,
        equivCacheInfo,
    )
    if err != nil {
        predicateResultLock.Lock()
        errs[err.Error()]++
        predicateResultLock.Unlock()
        return
    }
    if fits {
        filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
    } else {
        predicateResultLock.Lock()
        failedPredicateMap[nodeName] = failedPredicates
        predicateResultLock.Unlock()
    }
  }

  workqueue.Parallelize(16, len(nodes), checkNode)
···
}

轉到podFitsOnNode
位置:k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

在同一個case中藏雏,運行兩次謂詞拷况。
如果node中已經有優(yōu)先級大于或者等于的提名的pod,那么當這些pod將被添加到源數據和節(jié)點信息中時將被啟動
如果所有的謂詞都通過,那么當這些被提名的pod沒有被添加時候掘殴,也會運行
第二次是必須要通過的蝠嘉,因為有些謂詞沒有通過是不能夠被提名的,例如節(jié)點的親和性
如果節(jié)點上沒有被提名的pod杯巨,或者第一次運行的謂詞失敗蚤告,將不再運行第二次
我們只考慮在第一次通過的同等或者更高優(yōu)先級的pod,因為那些當前的"pod"必須給他們讓步,并且不能為了運行它們服爷,占用一個打開的空間
如果當前的pod釋放資源給低優(yōu)先級的pod是OK的
在兩種情況下pod是可調度的:
當被提名的pod被視為運行時杜恰,資源相關的謂詞和pod間的反親和性往往更容易失敗获诈,
而當指定的pod被視為不運行時,pod親和性相關的謂詞更有可能失敗心褐。
我們不能只假設被提名的pod都是運行的舔涎,因為他們不是馬上運行,符合條件的node可能不止一個逗爹,它們最終可能會被調度到不同的節(jié)點

func podFitsOnNode(
    pod *v1.Pod,
    meta algorithm.PredicateMetadata,
    info *schedulercache.NodeInfo,
    predicateFuncs map[string]algorithm.FitPredicate,
    cache schedulercache.Cache,
    ecache *EquivalenceCache,
    queue SchedulingQueue,
    alwaysCheckAllPredicates bool,
    equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
    ···

    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            break
        }
        eCacheAvailable = equivCacheInfo != nil && !podsAdded
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []algorithm.PredicateFailureReason
                err     error
            )
            //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                if eCacheAvailable {
                    fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache)
                } else {
                    fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                }
                if err != nil {
                    return false, []algorithm.PredicateFailureReason{}, err
                }

                if !fit {
                    // eCache is available and valid, and predicates result is unfit, record the fail reasons
                    failedPredicates = append(failedPredicates, reasons...)
                    // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
                    if !alwaysCheckAllPredicates {
                        glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate" +
                            "evaluation is short circuited and there are chances" +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }
    }

    return len(failedPredicates) == 0, failedPredicates, nil
}

回到PrioritizeNodes
位置:k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

  • 如果之前沒有注冊優(yōu)先級配置亡嫌,直接返回空的優(yōu)先級列表,如果有優(yōu)先級配置掘而,那么按照node維初始化每個node的得分1
  • 計算每個優(yōu)先級策略在每個node上的得分
  • 將每個node上的得分進行匯總(當然這里還需要乘以每個分值所占的權重)
  • 得到每個node上的總分列表
func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    // If no priority configs are provided, then the EqualPriority function is applied
    // This is required to generate the priority list in the required format
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }

    var (
        mu   = sync.Mutex{}
        wg   = sync.WaitGroup{}
        errs []error
    )
    appendError := func(err error) {
        mu.Lock()
        defer mu.Unlock()
        errs = append(errs, err)
    }

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    for i, priorityConfig := range priorityConfigs {
        if priorityConfig.Function != nil {
            // DEPRECATED
            wg.Add(1)
            go func(index int, config algorithm.PriorityConfig) {
                defer wg.Done()
                var err error
                results[index], err = config.Function(pod, nodeNameToInfo, nodes)
                if err != nil {
                    appendError(err)
                }
            }(i, priorityConfig)
        } else {
            results[i] = make(schedulerapi.HostPriorityList, len(nodes))
        }
    }
    processNode := func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        var err error
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                return
            }
        }
    }
    //開啟小于等于16個協程處理
    workqueue.Parallelize(16, len(nodes), processNode)
    for i, priorityConfig := range priorityConfigs {
        if priorityConfig.Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int, config algorithm.PriorityConfig) {
            defer wg.Done()
            if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
            if glog.V(10) {
                for _, hostPriority := range results[index] {
                    glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score)
                }
            }
        }(i, priorityConfig)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    if len(extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int, len(nodeNameToInfo))
        for _, extender := range extenders {
            if !extender.IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(ext algorithm.SchedulerExtender) {
                defer wg.Done()
                prioritizedList, weight, err := ext.Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(extender)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            result[i].Score += combinedScores[result[i].Host]
        }
    }

    if glog.V(10) {
        for i := range result {
            glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
        }
    }
    return result, nil
}

回到selectHost
位置:k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go

  • 按照分值排序后返回最佳的主機
// selectHost takes a prioritized list of nodes and then picks one
// in a round-robin manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
    if len(priorityList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }

    sort.Sort(sort.Reverse(priorityList))
    //找到分數排名第二的分值的位置挟冠,實際上firstAfterMaxScore只為1
    // 如果找到返回排序后的至二個索引,即為1,找不到返回長度袍睡,只有此種情況返回著不為1
    maxScore := priorityList[0].Score
    firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })

    g.lastNodeIndexLock.Lock()
    ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
    g.lastNodeIndex++
    g.lastNodeIndexLock.Unlock()

    return priorityList[ix].Host, nil
}

此時知染,回到最開始執(zhí)行scheduleOne,拿到最佳調度節(jié)點后,會通過apiserver將pod和node的關系綁定

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末斑胜,一起剝皮案震驚了整個濱河市控淡,隨后出現的幾起案子,更是在濱河造成了極大的恐慌止潘,老刑警劉巖掺炭,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異凭戴,居然都是意外死亡涧狮,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門簇宽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來勋篓,“玉大人,你說我怎么就攤上這事魏割∑┫” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵钞它,是天一觀的道長拜银。 經常有香客問我,道長遭垛,這世上最難降的妖魔是什么尼桶? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮锯仪,結果婚禮上泵督,老公的妹妹穿的比我還像新娘。我一直安慰自己庶喜,他們只是感情好小腊,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布救鲤。 她就那樣靜靜地躺著,像睡著了一般秩冈。 火紅的嫁衣襯著肌膚如雪本缠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天入问,我揣著相機與錄音丹锹,去河邊找鬼。 笑死芬失,一個胖子當著我的面吹牛楣黍,可吹牛的內容都是我干的。 我是一名探鬼主播麸折,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼锡凝,長吁一口氣:“原來是場噩夢啊……” “哼粘昨!你這毒婦竟也來了垢啼?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤张肾,失蹤者是張志新(化名)和其女友劉穎芭析,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體吞瞪,經...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡馁启,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了芍秆。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惯疙。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖妖啥,靈堂內的尸體忽然破棺而出霉颠,到底是詐尸還是另有隱情,我是刑警寧澤荆虱,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布蒿偎,位于F島的核電站,受9級特大地震影響怀读,放射性物質發(fā)生泄漏诉位。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一菜枷、第九天 我趴在偏房一處隱蔽的房頂上張望苍糠。 院中可真熱鬧,春花似錦啤誊、人聲如沸岳瞭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寝优。三九已至条舔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間乏矾,已是汗流浹背孟抗。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钻心,地道東北人凄硼。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像捷沸,于是被迫代替她去往敵國和親摊沉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348