深入分析kube-batch(2)——cache
熟悉K8S的同學,一定對cache機制不陌生泵殴;在之前啟動過程一文中分析過媒惕,cache的作用,本文將詳細分析cache的實現(xiàn)务热。
struct
interface
kube-batch\pkg\scheduler\cache\interface.go
// Cache collects pods/nodes/queues information
// and provides information snapshot
type Cache interface {
// Run start informer
Run(stopCh <-chan struct{})
// Snapshot deep copy overall cache information into snapshot
Snapshot() *api.ClusterInfo
// SchedulerConf return the property of scheduler configuration
LoadSchedulerConf(path string) (map[string]string, error)
// WaitForCacheSync waits for all cache synced
WaitForCacheSync(stopCh <-chan struct{}) bool
// Bind binds Task to the target host.
// TODO(jinzhej): clean up expire Tasks.
Bind(task *api.TaskInfo, hostname string) error
// Evict evicts the task to release resources.
Evict(task *api.TaskInfo, reason string) error
// Backoff puts job in backlog for a while.
Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
}
type Binder interface {
Bind(task *v1.Pod, hostname string) error
}
type Evictor interface {
Evict(pod *v1.Pod) error
}
重點關(guān)注接口的Run/Snaoshot
implements
具體的實現(xiàn)在
kube-batch\pkg\scheduler\cache\cache.go
type SchedulerCache struct {
sync.Mutex
kubeclient *kubernetes.Clientset
arbclient *versioned.Clientset
podInformer infov1.PodInformer
nodeInformer infov1.NodeInformer
pdbInformer policyv1.PodDisruptionBudgetInformer
nsInformer infov1.NamespaceInformer
podGroupInformer arbcoreinfo.PodGroupInformer
queueInformer arbcoreinfo.QueueInformer
Binder Binder
Evictor Evictor
recorder record.EventRecorder
Jobs map[arbapi.JobID]*arbapi.JobInfo
Nodes map[string]*arbapi.NodeInfo
Queues map[arbapi.QueueID]*arbapi.QueueInfo
errTasks *cache.FIFO
deletedJobs *cache.FIFO
namespaceAsQueue bool
}
SchedulerCache
主要由以下組件組成:
- 鎖忆嗜,解決快照與內(nèi)存一致性問題
- K8S clients,訪問apiserver
- Informers崎岂,ListWatch REST
- Jobs/Nodes/Queues捆毫,緩存REST
new
newSchedulerCache
函數(shù)代碼比較多,就不都貼了冲甘。我們可以關(guān)注各個Informer的事件注冊绩卤,其中最重要的就是Pod/PodGroup
相關(guān)的事件處理。
Pod
sc.podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch obj.(type) {
case *v1.Pod:
pod := obj.(*v1.Pod)
if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
return true
}
return pod.Status.Phase == v1.PodRunning
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPod,
UpdateFunc: sc.UpdatePod,
DeleteFunc: sc.DeletePod,
},
})
這里可以看到江醇,kube-batch
只關(guān)心需要自己調(diào)度濒憋,并且Pending
的Pod;以及Running
的Pod陶夜。
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) AddPod(obj interface{}) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.addPod(pod)
}
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)
return sc.addTask(pi)
}
全局一把鎖凛驮,以后會是性能瓶頸。這里我們看到kube-batch
會將Pod轉(zhuǎn)換成TaskInfo緩存起來条辟。
kube-batch\pkg\scheduler\api\job_info.go
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
req := EmptyResource()
// TODO(k82cn): also includes initContainers' resource.
for _, c := range pod.Spec.Containers {
req.Add(NewResource(c.Resources.Requests))
}
ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: getJobID(pod),
Name: pod.Name,
Namespace: pod.Namespace,
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Priority: 1,
Pod: pod,
Resreq: req,
}
if pod.Spec.Priority != nil {
ti.Priority = *pod.Spec.Priority
}
return ti
}
轉(zhuǎn)換過程比較簡單黔夭,注意兩點:
- 需要統(tǒng)計資源請求量
- JobID通過
pod.Annotations[arbcorev1.GroupNameAnnotationKey]
或者所屬的controller
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
if len(pi.Job) != 0 {
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
}
sc.Jobs[pi.Job].AddTaskInfo(pi)
}
}
kube-batch\pkg\scheduler\api\job_info.go
func NewJobInfo(uid JobID) *JobInfo {
return &JobInfo{
UID: uid,
MinAvailable: 0,
NodeSelector: make(map[string]string),
Allocated: EmptyResource(),
TotalRequest: EmptyResource(),
TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
}
}
func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
ji.Tasks[ti.UID] = ti
ji.addTaskIndex(ti)
ji.TotalRequest.Add(ti.Resreq)
}
func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
if _, found := ji.TaskStatusIndex[ti.Status]; !found {
ji.TaskStatusIndex[ti.Status] = tasksMap{}
}
ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}
最終task會歸于一個job,job主要保存tasks羽嫡,資源請求總量等信息本姥。
PodGroup
sc.podGroupInformer = arbinformer.Scheduling().V1alpha1().PodGroups()
sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddPodGroup,
UpdateFunc: sc.UpdatePodGroup,
DeleteFunc: sc.DeletePodGroup,
})
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) AddPodGroup(obj interface{}) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
err := sc.setPodGroup(ss)
}
func (sc *SchedulerCache) setPodGroup(ss *arbv1.PodGroup) error {
job := getJobID(ss)
if _, found := sc.Jobs[job]; !found {
sc.Jobs[job] = arbapi.NewJobInfo(job)
}
sc.Jobs[job].SetPodGroup(ss)
return nil
}
func getJobID(pg *arbv1.PodGroup) arbapi.JobID {
return arbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
這里我們可以看到Job就是PodGroup
kube-batch\pkg\scheduler\api\job_info.go
func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
ji.Name = pg.Name
ji.Namespace = pg.Namespace
ji.MinAvailable = pg.Spec.MinMember
if len(pg.Spec.Queue) == 0 {
ji.Queue = QueueID(pg.Namespace)
} else {
ji.Queue = QueueID(pg.Spec.Queue)
}
ji.PodGroup = pg
}
重點關(guān)注ji.MinAvailable = pg.Spec.MinMember
run
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pdbInformer.Informer().Run(stopCh)
go sc.podInformer.Informer().Run(stopCh)
go sc.nodeInformer.Informer().Run(stopCh)
go sc.podGroupInformer.Informer().Run(stopCh)
if sc.namespaceAsQueue {
go sc.nsInformer.Informer().Run(stopCh)
} else {
go sc.queueInformer.Informer().Run(stopCh)
}
// Re-sync error tasks.
go sc.resync()
// Cleanup jobs.
go sc.cleanupJobs()
}
run方法比較簡單,主要負責:
- 開始各個REST的ListWatch
- 根據(jù)errTasks隊列杭棵,重新同步Pod狀態(tài)
- 根據(jù)deletedJobs隊列婚惫,清理緩存
Snapshot
func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
snapshot := &arbapi.ClusterInfo{
Nodes: make([]*arbapi.NodeInfo, 0, len(sc.Nodes)),
Jobs: make([]*arbapi.JobInfo, 0, len(sc.Jobs)),
Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)),
Others: make([]*arbapi.TaskInfo, 0, 10),
}
for _, value := range sc.Nodes {
snapshot.Nodes = append(snapshot.Nodes, value.Clone())
}
for _, value := range sc.Queues {
snapshot.Queues = append(snapshot.Queues, value.Clone())
}
for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
continue
}
snapshot.Jobs = append(snapshot.Jobs, value.Clone())
}
return snapshot
}
利用Deep Clone dump cache ,唯一需要注意的是必須要創(chuàng)建PodGroup,才能繼續(xù)調(diào)度先舷。