深入分析kube-batch(2)——cache

深入分析kube-batch(2)——cache

熟悉K8S的同學,一定對cache機制不陌生泵殴;在之前啟動過程一文中分析過媒惕,cache的作用,本文將詳細分析cache的實現(xiàn)务热。

struct

interface

kube-batch\pkg\scheduler\cache\interface.go

// Cache collects pods/nodes/queues information
// and provides information snapshot
type Cache interface {
    // Run start informer
    Run(stopCh <-chan struct{})

    // Snapshot deep copy overall cache information into snapshot
    Snapshot() *api.ClusterInfo

    // SchedulerConf return the property of scheduler configuration
    LoadSchedulerConf(path string) (map[string]string, error)

    // WaitForCacheSync waits for all cache synced
    WaitForCacheSync(stopCh <-chan struct{}) bool

    // Bind binds Task to the target host.
    // TODO(jinzhej): clean up expire Tasks.
    Bind(task *api.TaskInfo, hostname string) error

    // Evict evicts the task to release resources.
    Evict(task *api.TaskInfo, reason string) error

    // Backoff puts job in backlog for a while.
    Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
}

type Binder interface {
    Bind(task *v1.Pod, hostname string) error
}

type Evictor interface {
    Evict(pod *v1.Pod) error
}

重點關(guān)注接口的Run/Snaoshot

implements

具體的實現(xiàn)在

kube-batch\pkg\scheduler\cache\cache.go

type SchedulerCache struct {
   sync.Mutex

   kubeclient *kubernetes.Clientset
   arbclient  *versioned.Clientset

   podInformer      infov1.PodInformer
   nodeInformer     infov1.NodeInformer
   pdbInformer      policyv1.PodDisruptionBudgetInformer
   nsInformer       infov1.NamespaceInformer
   podGroupInformer arbcoreinfo.PodGroupInformer
   queueInformer    arbcoreinfo.QueueInformer

   Binder  Binder
   Evictor Evictor

   recorder record.EventRecorder

   Jobs   map[arbapi.JobID]*arbapi.JobInfo
   Nodes  map[string]*arbapi.NodeInfo
   Queues map[arbapi.QueueID]*arbapi.QueueInfo

   errTasks    *cache.FIFO
   deletedJobs *cache.FIFO

   namespaceAsQueue bool
}

SchedulerCache主要由以下組件組成:

  • 鎖忆嗜,解決快照與內(nèi)存一致性問題
  • K8S clients,訪問apiserver
  • Informers崎岂,ListWatch REST
  • Jobs/Nodes/Queues捆毫,緩存REST

new

newSchedulerCache函數(shù)代碼比較多,就不都貼了冲甘。我們可以關(guān)注各個Informer的事件注冊绩卤,其中最重要的就是Pod/PodGroup相關(guān)的事件處理。

Pod

sc.podInformer.Informer().AddEventHandler(
   cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
         switch obj.(type) {
         case *v1.Pod:
            pod := obj.(*v1.Pod)
            if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
               return true
            }
            return pod.Status.Phase == v1.PodRunning
         default:
            return false
         }
      },
      Handler: cache.ResourceEventHandlerFuncs{
         AddFunc:    sc.AddPod,
         UpdateFunc: sc.UpdatePod,
         DeleteFunc: sc.DeletePod,
      },
   })
   

這里可以看到江醇,kube-batch只關(guān)心需要自己調(diào)度濒憋,并且Pending的Pod;以及Running的Pod陶夜。

kube-batch\pkg\scheduler\cache\event_handlers.go

func (sc *SchedulerCache) AddPod(obj interface{}) {
    sc.Mutex.Lock()
    defer sc.Mutex.Unlock()

    err := sc.addPod(pod)
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
    pi := arbapi.NewTaskInfo(pod)

    return sc.addTask(pi)
}

全局一把鎖凛驮,以后會是性能瓶頸。這里我們看到kube-batch會將Pod轉(zhuǎn)換成TaskInfo緩存起來条辟。

kube-batch\pkg\scheduler\api\job_info.go

func NewTaskInfo(pod *v1.Pod) *TaskInfo {
   req := EmptyResource()

   // TODO(k82cn): also includes initContainers' resource.
   for _, c := range pod.Spec.Containers {
      req.Add(NewResource(c.Resources.Requests))
   }

   ti := &TaskInfo{
      UID:       TaskID(pod.UID),
      Job:       getJobID(pod),
      Name:      pod.Name,
      Namespace: pod.Namespace,
      NodeName:  pod.Spec.NodeName,
      Status:    getTaskStatus(pod),
      Priority:  1,

      Pod:    pod,
      Resreq: req,
   }

   if pod.Spec.Priority != nil {
      ti.Priority = *pod.Spec.Priority
   }

   return ti
}

轉(zhuǎn)換過程比較簡單黔夭,注意兩點:

  • 需要統(tǒng)計資源請求量
  • JobID通過pod.Annotations[arbcorev1.GroupNameAnnotationKey]或者所屬的controller

kube-batch\pkg\scheduler\cache\event_handlers.go

func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
   if len(pi.Job) != 0 {
      if _, found := sc.Jobs[pi.Job]; !found {
         sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
      }

      sc.Jobs[pi.Job].AddTaskInfo(pi)
   }
}

kube-batch\pkg\scheduler\api\job_info.go

func NewJobInfo(uid JobID) *JobInfo {
   return &JobInfo{
      UID: uid,

      MinAvailable: 0,
      NodeSelector: make(map[string]string),

      Allocated:    EmptyResource(),
      TotalRequest: EmptyResource(),

      TaskStatusIndex: map[TaskStatus]tasksMap{},
      Tasks:           tasksMap{},
   }
}

func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
    ji.Tasks[ti.UID] = ti
    ji.addTaskIndex(ti)

    ji.TotalRequest.Add(ti.Resreq)
}

func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
    if _, found := ji.TaskStatusIndex[ti.Status]; !found {
        ji.TaskStatusIndex[ti.Status] = tasksMap{}
    }

    ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}

最終task會歸于一個job,job主要保存tasks羽嫡,資源請求總量等信息本姥。

PodGroup

sc.podGroupInformer = arbinformer.Scheduling().V1alpha1().PodGroups()
sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc:    sc.AddPodGroup,
   UpdateFunc: sc.UpdatePodGroup,
   DeleteFunc: sc.DeletePodGroup,
})

kube-batch\pkg\scheduler\cache\event_handlers.go

func (sc *SchedulerCache) AddPodGroup(obj interface{}) {
   sc.Mutex.Lock()
   defer sc.Mutex.Unlock()

   err := sc.setPodGroup(ss)
}

func (sc *SchedulerCache) setPodGroup(ss *arbv1.PodGroup) error {
    job := getJobID(ss)

    if _, found := sc.Jobs[job]; !found {
        sc.Jobs[job] = arbapi.NewJobInfo(job)
    }

    sc.Jobs[job].SetPodGroup(ss)

    return nil
}

func getJobID(pg *arbv1.PodGroup) arbapi.JobID {
    return arbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}

這里我們可以看到Job就是PodGroup

kube-batch\pkg\scheduler\api\job_info.go

func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
   ji.Name = pg.Name
   ji.Namespace = pg.Namespace
   ji.MinAvailable = pg.Spec.MinMember

   if len(pg.Spec.Queue) == 0 {
      ji.Queue = QueueID(pg.Namespace)
   } else {
      ji.Queue = QueueID(pg.Spec.Queue)
   }

   ji.PodGroup = pg
}

重點關(guān)注ji.MinAvailable = pg.Spec.MinMember

run

func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
   go sc.pdbInformer.Informer().Run(stopCh)
   go sc.podInformer.Informer().Run(stopCh)
   go sc.nodeInformer.Informer().Run(stopCh)
   go sc.podGroupInformer.Informer().Run(stopCh)

   if sc.namespaceAsQueue {
      go sc.nsInformer.Informer().Run(stopCh)
   } else {
      go sc.queueInformer.Informer().Run(stopCh)
   }

   // Re-sync error tasks.
   go sc.resync()

   // Cleanup jobs.
   go sc.cleanupJobs()
}

run方法比較簡單,主要負責:

  • 開始各個REST的ListWatch
  • 根據(jù)errTasks隊列杭棵,重新同步Pod狀態(tài)
  • 根據(jù)deletedJobs隊列婚惫,清理緩存

Snapshot

func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo {
   sc.Mutex.Lock()
   defer sc.Mutex.Unlock()

   snapshot := &arbapi.ClusterInfo{
      Nodes:  make([]*arbapi.NodeInfo, 0, len(sc.Nodes)),
      Jobs:   make([]*arbapi.JobInfo, 0, len(sc.Jobs)),
      Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)),
      Others: make([]*arbapi.TaskInfo, 0, 10),
   }

   for _, value := range sc.Nodes {
      snapshot.Nodes = append(snapshot.Nodes, value.Clone())
   }

   for _, value := range sc.Queues {
      snapshot.Queues = append(snapshot.Queues, value.Clone())
   }

   for _, value := range sc.Jobs {
      // If no scheduling spec, does not handle it.
      if value.PodGroup == nil && value.PDB == nil {
         continue
      }

      snapshot.Jobs = append(snapshot.Jobs, value.Clone())
   }

   return snapshot
}

利用Deep Clone dump cache ,唯一需要注意的是必須要創(chuàng)建PodGroup,才能繼續(xù)調(diào)度先舷。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末艰管,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子密浑,更是在濱河造成了極大的恐慌蛙婴,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尔破,死亡現(xiàn)場離奇詭異,居然都是意外死亡浇衬,警方通過查閱死者的電腦和手機懒构,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耘擂,“玉大人胆剧,你說我怎么就攤上這事∽碓” “怎么了秩霍?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蚁阳。 經(jīng)常有香客問我铃绒,道長,這世上最難降的妖魔是什么螺捐? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任颠悬,我火速辦了婚禮,結(jié)果婚禮上定血,老公的妹妹穿的比我還像新娘赔癌。我一直安慰自己,他們只是感情好澜沟,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布灾票。 她就那樣靜靜地躺著,像睡著了一般茫虽。 火紅的嫁衣襯著肌膚如雪刊苍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天席噩,我揣著相機與錄音班缰,去河邊找鬼。 笑死悼枢,一個胖子當著我的面吹牛埠忘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼莹妒,長吁一口氣:“原來是場噩夢啊……” “哼名船!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起旨怠,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤渠驼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后鉴腻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體迷扇,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年爽哎,在試婚紗的時候發(fā)現(xiàn)自己被綠了蜓席。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡课锌,死狀恐怖厨内,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情渺贤,我是刑警寧澤雏胃,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站志鞍,受9級特大地震影響瞭亮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜述雾,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一街州、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧玻孟,春花似錦唆缴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至匣掸,卻和暖如春趟紊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背碰酝。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工霎匈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人送爸。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓铛嘱,卻偏偏與公主長得像暖释,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子墨吓,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348