1. 前言
轉載請說明原文出處, 尊重他人勞動成果!
本文將分析
cmd/kube-scheduler
和pkg/scheduler/scheduler.go
和pkg/scheduler/factory/factory.go
等目錄或文件. 其中比較重要的兩個類configFactory
(factory.go
)和Scheduler
(scheduler.go
).
源碼位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 流程圖
run_1.png
3. 代碼流程
接下來就從代碼的角度看看
kube-scheduler
是如何啟動的. 為了節(jié)約篇幅, 有些無關或者不影響理解的代碼我將不放到代碼中.
3.1 cmd/kube-scheduler
// cmd/kube-scheduler/scheduler.go
func main() {
...
command := app.NewSchedulerCommand()
...
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
通過
NewSchedulerCommand()
方法進入到了cmd/kube-scheduler/app/server.go
.
// cmd/kube-scheduler/app/server.go
// NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
...
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
...
return cmd
}
關于
cobra
可以自己去官網(wǎng)看, 就是一個命令行的工具, 這里不多加介紹了.
主要需要關注一下opts, err := options.NewOptions()
, 因為這里會生成一些默認的屬性, 比較重要的兩個地方就是:
DefaultProvider
, 就是默認調度器的名字.
LeaderElection
的屬性會設置為true
, 就是kube-scheduler
要啟動高可用, 這里會有一篇單獨的博客來進行介紹.
另外如果
kube-scheduler
命令設置了--config
文件來設置自定義調度器, 會從cmd/kube-scheduler/app/options/options.go
中的Flags
進行解析.
// cmd/kube-scheduler/app/options/options.go
// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs apiserverflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
utilfeature.DefaultFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
return nfs
}
現(xiàn)在回到上面的
NewSchedulerCommand
方法中, 已經(jīng)完成了opts
, 所以就調用了runCommand
方法.
// cmd/kube-scheduler/app/server.go
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
...
// 對opts的屬性進行驗證
if errs := opts.Validate(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
os.Exit(1)
}
// 如果需要 就把opts的ComponentConfig文件保存起來
if len(opts.WriteConfigTo) > 0 {
if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
}
// 根據(jù)opts生成一個scheduler config 對象
c, err := opts.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
stopCh := make(chan struct{})
// Get the completed config
// 根據(jù)scheduler config 生成一個completed config
cc := c.Complete()
// 看看打開哪些feature
algorithmprovider.ApplyFeatureGates()
// 向componentconfig中注冊配置文件
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
// 上面的一系列操作 就是為了獲得一個completed config
return Run(cc, stopCh)
}
這里的一系列操作 就是為了獲得一個
completed config
, 然后給Run
調用. 這里需要關注一個地方就是opts.Config()
.
// cmd/kube-scheduler/app/options/options.go
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// 如果kube-scheduler 沒有指定--config 就是從默認配置(o.ComponentConfig)拿
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// only apply deprecated flags if no config file is loaded (this is the old behaviour).
if err := o.Deprecated.ApplyTo(&c.ComponentConfig); err != nil {
return err
}
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
// 如果kube-scheduler 指定了--config 那就會從配置文件中取
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
...
return nil
}
func (o *Options) Config() (*schedulerappconfig.Config, error) {
if o.SecureServing != nil {
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
}
c := &schedulerappconfig.Config{}
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// Prepare kube clients.
// 生成client 可以調用api-server
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
...
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
// 默認值就是true 只要用戶不設置為false 這一步就會執(zhí)行
// 也就是說kube-scheduler 默認就是支持高可用
if c.ComponentConfig.LeaderElection.LeaderElect {
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
if err != nil {
return nil, err
}
}
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = factory.NewPodInformer(client, 0)
c.EventClient = eventClient
c.Recorder = recorder
c.Broadcaster = eventBroadcaster
c.LeaderElection = leaderElectionConfig
return c, nil
}
ApplyTo: 主要是操作是否有配置文件, 如果有配置文件就會從配置文件中讀取.
Config: 主要為了生成與api-server
通信的client
以及leaderElectionConfig
用于支持kube-scheduler
高可用.
接下來回到
cmd/kube-scheduler/app/server.go
中的runCommand
, 然后進行Run(cc, stopCh)
方法. 因為該Run
是真正的核心方法, 所以這里我們主要分塊分析, 先看看是如何生成pkg/scheduler/scheduler.go
中的Scheduler
對象.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler對象
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
...
}
可以看到之前的一系列操作都是為了生成
Scheduler
所需要的配置. 包括了cc.Client
,cc.ComponentConfig.AlgorithmSource
等等.
3.2 pkg/scheduler/scheduler.go
可以看到整個
Scheduler
結構體就一個屬性, 就是pkg/scheduler/factory/factory.go
中的Config
結構體.
type Scheduler struct {
config *factory.Config
}
來看看
New
方法
func New(client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
/**
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
*/
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// Set up the configurator which can create schedulers from configs.
// 生成factory的config-factory
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: options.schedulerName,
Client: client,
NodeInformer: nodeInformer,
PodInformer: podInformer,
PvInformer: pvInformer,
PvcInformer: pvcInformer,
ReplicationControllerInformer: replicationControllerInformer,
ReplicaSetInformer: replicaSetInformer,
StatefulSetInformer: statefulSetInformer,
ServiceInformer: serviceInformer,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
})
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 默認調度器會進入到這里 *source.Provider = DefaultProvider
// Create the config from a named algorithm provider.
sc, err := configurator.CreateFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
config = sc
case source.Policy != nil:
// 自定義調度器會進入到這里
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil
}
1. 根據(jù)傳起來的
opts
方法生成options
, 因為默認的屬性是下面的幾個, 如果需要改變, 就是通過opts
方法中來改變.
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
}
2. 根據(jù)參數(shù)生成
factory
的configFactory
對象名字為configurator
, 這個后面部分會具體研究.
3. 根據(jù)source
的不同來選擇如何生成Scheduler
對象的Config
, 這里就討論默認調度器的, 自定義調度器會有一篇專門博客介紹. 所以就是會進入sc, err := configurator.CreateFromProvider(*source.Provider)
中生成所需的Config
.
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the name of a registered algorithm provider.
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}
這里在[k8s源碼分析][kube-scheduler]scheduler/algorithmprovider之注冊default-scheduler已經(jīng)介紹了
GetAlgorithmProvider(providerName)
其中providerName="DefaultProvider"
獲得了默認調度器的所有預選和優(yōu)選方法的key
.
然后根據(jù)
configFactory
的CreateFromKeys
根據(jù)所有的預選方法和優(yōu)選方法以及擴展方法(這里是空的)生成了Scheduler
所需要的Config
. 這里CreateFromKeys
放到后面的configFactory
一起說明.
4. 根據(jù)Config
生成Scheduler
對象sched
.
// pkg/scheduler/scheduler.go
// NewFromConfig returns a new scheduler using the provided Config.
func NewFromConfig(config *factory.Config) *Scheduler {
metrics.Register()
return &Scheduler{
config: config,
}
}
3.3 pkg/scheduler/factory/factory.go
這里將分析
3.2 pkg/scheduler/scheduler.go
中提到的configurator := factory.NewConfigFactory
部分.
3.3.1 configFactory
下面是關于
configFactory
結構體的定義
type configFactory struct {
// 與api-server通信的客戶端
client clientset.Interface
// queue for pods that need scheduling
// 存著那些需要調度的pod
podQueue internalqueue.SchedulingQueue
// a means to list all known scheduled pods.
// 可以獲得所有已經(jīng)調度的pod
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
// 可以獲得所有已經(jīng)調度的pod和那些assumed pod
podLister algorithm.PodLister
// a means to list all nodes
nodeLister corelisters.NodeLister
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister appslisters.ReplicaSetLister
// a means to list all statefulsets
statefulSetLister appslisters.StatefulSetLister
// a means to list all PodDisruptionBudgets
pdbLister policylisters.PodDisruptionBudgetLister
// a means to list all StorageClasses
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything <-chan struct{}
scheduledPodsHasSynced cache.InformerSynced
schedulerCache schedulerinternalcache.Cache
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's "spec.schedulerName".
// 調度器的名字 默認為default-scheduler
schedulerName string
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
hardPodAffinitySymmetricWeight int32
// Equivalence class cache
// 加速predicate階段的equivalence class cache
equivalencePodCache *equivalence.Cache
// Enable equivalence class cache
enableEquivalenceClassCache bool
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
// Always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
// Disable pod preemption or not.
// 是否禁止搶占
disablePreemption bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
}
NewFactory
方法
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
client: args.Client,
podLister: schedulerCache,
podQueue: internalqueue.NewSchedulingQueue(stopEverything),
nodeLister: args.NodeInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)
...
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
...
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-c.StopEverything:
c.podQueue.Close()
return
case <-ch:
debugger.Comparer.Compare()
debugger.Dumper.DumpAll()
}
}
}()
return c
}
這里主要需要注意幾點就是:
1.schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
實例化了一個schedulerCache
, 它的具體實現(xiàn)和結構在[k8s源碼分析][kube-scheduler]scheduler/internal/cache之node_tree和cache已經(jīng)分析過了, 這里主要看一下它在哪里會用到.
2. 可以看到configFactory
的podLister
和schedulerCache
用的是同一個schedulerCache
對象. 因為podLister
的定義就是可以獲得所有已經(jīng)調度的pod和那些assumed pod, 所以用schedulerCache
很好理解.
3.configFactory
的scheduledPodLister
定義是可以獲得所有已經(jīng)調度的pod,args.PodInformer.Lister()
可以得到所有的pod
, 關于informer
在client-go
系列會有專門博客分析, 這里不細說, 很明顯assignedPodLister
就是在args.PodInformer.Lister()
外面加了一層過濾那些已經(jīng)被調度的pods
.
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
type assignedPodLister struct {
corelisters.PodLister
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
list, err := l.PodLister.List(selector)
if err != nil {
return nil, err
}
filtered := make([]*v1.Pod, 0, len(list))
for _, pod := range list {
// 選擇那些已經(jīng)被調度過的
if len(pod.Spec.NodeName) > 0 {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
// List lists all Pods in the indexer for a given namespace.
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
}
4.
podQueue
的定義是存著那些需要調度的pod, 因此用的internalqueue.NewSchedulingQueue(stopEverything)
, 關于scheduling_queue
在[k8s源碼分析][kube-scheduler]scheduler/internal/queue之優(yōu)先隊列scheduling_queue(1) 和 [k8s源碼分析][kube-scheduler]scheduler/internal/queue之優(yōu)先隊列scheduling_queue(2) 中有詳細分析過.
5. 就是關于各種informers
添加各種處理邏輯EventHandler
, 包括podInformer
,serviceInformer
,NodeInformer
,PvInformer
,PvcInformer
,StorageClassInformer
等等, 該部分會在下一個主題分析.
3.2 Config
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
// 一個schedulerCache 就是configFactory的schedulerCache
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
// 就是configFactory的equivalencePodCache
Ecache *equivalence.Cache
// 獲得所有Node的Lister
NodeLister algorithm.NodeLister
// 用于調度的算法
Algorithm algorithm.ScheduleAlgorithm
// Bind方法
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
// 搶占器
PodPreemptor PodPreemptor
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
// 取下一個需要調度的pod
// 如果沒有了, 則block一直等到有
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
VolumeBinder *volumebinder.VolumeBinder
DisablePreemption bool
// cache需要被調度的pod
SchedulingQueue internalqueue.SchedulingQueue
}
這里需要注意的是:
NextPod: 是一個方法, 所有的需要調度的pod
都會存到這里, 然后一個一個出來進行調度.
接下來看看上面提到
configFactory
的CreateFromKeys
, 該方法根據(jù)當前的configFactory
根據(jù)提供的預選方法, 優(yōu)選方法和擴展方法從而生成一個factory.go
中的Config
對象.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}
// 根據(jù)當前的預選key得到所有的預選方法
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
// 根據(jù)當前的優(yōu)選key得到所有的優(yōu)選方法
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// priorityMetaProducer 在算分的時候會用到
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
// predicateMetaProducer 在真正預選的時候會用到
predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
// 是否打開了加速predicate的equivalence class cache
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
// 生成真正進行調度計算的Algorithm algorithm.ScheduleAlgorithm
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
podBackoff := util.CreateDefaultPodBackoff()
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
GetBinder: c.getBinderFunc(extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
PodPreemptor: &podPreemptor{c.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
},
NextPod: func() *v1.Pod {
return c.getNextPod()
},
Error: c.MakeDefaultErrorFunc(podBackoff, c.podQueue),
StopEverything: c.StopEverything,
VolumeBinder: c.volumeBinder,
SchedulingQueue: c.podQueue,
}, nil
}
這里需要注意的是:
1. 根據(jù)預選, 優(yōu)選key
得到其對應的預選和優(yōu)選方法. 并得到注冊的priorityMetaProducer
和predicateMetaProducer
.
2. 生成真正進行調度計算的algorithm.ScheduleAlgorithm
接口類, 返回一個它的實現(xiàn)類genericScheduler(pkg/scheduler/core/generic_scheduler.go
)對象.
// pkg/scheduler/algorithm/scheduler_interface.go
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Prioritizers() []PriorityConfig
}
// pkg/scheduler/core/generic_scheduler.go
func NewGenericScheduler(
cache schedulerinternalcache.Cache,
eCache *equivalence.Cache,
podQueue internalqueue.SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
prioritizers []algorithm.PriorityConfig,
priorityMetaProducer algorithm.PriorityMetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister algorithm.PDBLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}
3. 生成
GetBinder
,getBinderFunc
返回一個對該pod
支持的extender
綁定器或者默認綁定器.
// pkg/scheduler/factory/factory.go
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
var extenderBinder algorithm.SchedulerExtender
for i := range extenders {
if extenders[i].IsBinder() {
extenderBinder = extenders[i]
break
}
}
defaultBinder := &binder{c.client}
return func(pod *v1.Pod) Binder {
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
return extenderBinder
}
return defaultBinder
}
}
4. 生成
PodConditionUpdater
和PodPreemptor
, 都是與api-server
通信的客戶端(client
).
5. NextPod這里最核心的一個函數(shù), 因為所有需要調度的pod
都是從這里出來的.
NextPod: func() *v1.Pod {
return c.getNextPod()
}
func (c *configFactory) getNextPod() *v1.Pod {
pod, err := c.podQueue.Pop()
if err == nil {
klog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
return pod
}
klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
可以看到所有的
pod
都是從podQueue
中出來的, 所以對于pod
是在哪里進入podQueue
就比較重要了, 這里就會涉及了上面說的各種informers
, 所以放到下一篇博客說明.
3.4 返回到Run
3.3 中分析了3.1中
cmd/kube-scheduler/app/server.go
中Run
方法中是如何生成pkg/scheduler/scheduler.go
中的Scheduler
對象的. 那么現(xiàn)在該對象創(chuàng)建完了會怎么樣呢膏斤?所以需要回到cmd/kube-scheduler/app/server.go
中Run
方法中.
// cmd/kube-scheduler/app/server.go
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
...
// Create the scheduler.
// 生成pkg/scheduler/scheduler.go 的Scheduler對象
sched, err := scheduler.New
...
// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, runCommand via LeaderElector until done and exit.
// 啟動高可用
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
// 調用run方法
OnStartedLeading: run,
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
1. 啟動了所有
informers
.
2. 因為默認是支持高可用的, 所以會以高可用的方式啟動sched.Run()
方法.
接下來看看
sched.Run
方法.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
...
}
可以看到每隔
0
秒執(zhí)行scheduleOne
方法, 而schedulerOne
方法中就是調用sched.config.NextPod()
從它的podQueue
中pop
出一個pod
進行調度.
說白了就是不斷從
podQueue
中出一個pod
進行調度, 如果podQueue
中沒有, 就block
在這里.
4. 總結
分析完整個過程, 可以看到
1. 解析文件或者根據(jù)默認配置生成一個completed config
.
2. 啟動跟pod
有關的informers
監(jiān)控集群中的變化并按照相關規(guī)則進入到一個scheduling_queue
, 也就是podQueue
.
3. 啟動無限制循環(huán)一直讀podQueue
來進行調度.