1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
本文將分析
kube-scheduler
的自定義調(diào)度器, 主要是研究帶有擴(kuò)展方法的自定義調(diào)度器.
源碼位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 例子
2.1 準(zhǔn)備工作
2.1.1 啟動帶配置文件的kube-scheduler服務(wù)
在某一臺機(jī)器(
worker(172.21.0.12)
)運行帶有配置的kube-scheduler
.
schedulerConfig.yaml
文件
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: policy.yaml
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system
policy.yaml
文件
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"extenders" : [{
"urlPrefix": "http://localhost/scheduler",
"filterVerb": "predicates/always_true",
"prioritizeVerb": "priorities/zero_score",
"preemptVerb": "preemption",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false
}],
"hardPodAffinitySymmetricWeight" : 10
}
運行命令
./kube-scheduler --master=http://172.21.0.16:8080 --config=schedulerConfig.yaml
.
請注意
urlPrefix
是你的自定義kube-scheduler
要訪問你的extender
服務(wù)的地址, 如果我把這兩個服務(wù)部署在同一機(jī)器上, 所以用的是"urlPrefix": "http://localhost/scheduler"
2.1.2 啟動extender服務(wù)
在某一臺機(jī)器(
worker(172.21.0.12)
)上運行該代碼k8s-scheduler-extender-example, 下載到機(jī)器.
[root@worker nicktming]# pwd
/root/go/src/github.com/nicktming
[root@worker nicktming]# git clone https://github.com/nicktming/k8s-scheduler-extender-example.git
[root@worker nicktming]# cd k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# go build .
// 如果沒有g(shù)o環(huán)境 可以直接用代碼中已經(jīng)編譯好的二進(jìn)制文件k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
關(guān)于這個
extender
服務(wù), 就是一個API
服務(wù),kube-scheduelr
會根據(jù)配置文件來向這個extender
發(fā)送請求.
以預(yù)選方法為例
請求:
kube-scheduler
會發(fā)送一個schedulerapi.ExtenderArgs
類型的對象給extender
,該對象會包含一些節(jié)點和一個pod
.
type ExtenderArgs struct {
// 正在被調(diào)度的pod
Pod *v1.Pod
// 節(jié)點信息
Nodes *v1.NodeList
NodeNames *[]string
}
返回:
extender
處理結(jié)束后需要返回一個schedulerapi.ExtenderFilterResult
對象來告訴kube-scheduler
該pod
在哪些節(jié)點是可以運行的, 哪些節(jié)點是不可以運行的.
type ExtenderFilterResult struct {
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == false
Nodes *v1.NodeList
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == true
NodeNames *[]string
// Filtered out nodes where the pod can't be scheduled and the failure messages
FailedNodes FailedNodesMap
// 錯誤信息
Error string
}
extender
服務(wù)中Handler
接受一個schedulerapi.ExtenderArgs
的對象, 處理結(jié)束后返回schedulerapi.ExtenderFilterResult
類型的對象給kube-scheduler
.
// main.go
TruePredicate = Predicate{
Name: "always_true",
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
if node.Name == "172.21.0.16" {
return false, nil
}
return true, nil
},
}
// predicate.go
type Predicate struct {
Name string
Func func(pod v1.Pod, node v1.Node) (bool, error)
}
func (p Predicate) Handler(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
pod := args.Pod
canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
canNotSchedule := make(map[string]string)
for _, node := range args.Nodes.Items {
// 調(diào)用自己的處理邏輯方法 判斷該pod可不可以在該節(jié)點上運行
result, err := p.Func(*pod, node)
fmt.Printf("===>extender node:%v, result:%v\n", node.Name, result)
if err != nil {
canNotSchedule[node.Name] = err.Error()
} else {
if result {
canSchedule = append(canSchedule, node)
}
}
}
// 返回值
result := schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: canSchedule,
},
FailedNodes: canNotSchedule,
Error: "",
}
return &result
}
kube-scheduler
會調(diào)用配置文件中policy.yaml
(在下文中)中的urlPrefix+filterVerb
也就是http://localhost/scheduler/predicates/always_true
進(jìn)到上面的Handler
方法并且調(diào)用always_true
的這個filter
方法.
具體
extender
服務(wù)的調(diào)用邏輯可以自己看一下, 比較簡單就不多說了.
2.2 驗證
2.2.1 創(chuàng)建帶有schedulerName: my-scheduler
的pod
.
[root@master kubectl]# cat pod-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
name: test-schduler
spec:
schedulerName: my-scheduler
containers:
- name: podtest-scheduler
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml
pod/test-schduler created
[root@master kubectl]#
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test-schduler 1/1 Running 0 17s
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i nodeName
nodeName: 172.21.0.12
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i schedulerName
{"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"test-schduler","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"podtest-scheduler","ports":[{"containerPort":80}]}],"schedulerName":"my-scheduler"}}
schedulerName: my-scheduler
[root@master kubectl]#
同時查看
extender
服務(wù)的日志:
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
[ warn ] 2019/10/16 16:16:50 main.go:87: LOG_LEVEL="" is empty or invalid, fallling back to "INFO".
[ info ] 2019/10/16 16:16:50 main.go:101: Log level was set to INFO
[ info ] 2019/10/16 16:16:50 main.go:119: server starting on the port :80
[ info ] 2019/10/16 16:19:51 routes.go:29: always_true ExtenderArgs =
===>extender node:172.21.0.16, result:false
===>extender node:172.21.0.12, result:true
[ info ] 2019/10/16 16:19:51 routes.go:49: always_true extenderFilterResult = {"Nodes":{"metadata":{},"items":[{"metadata":{"name":"172.21.0.12","selfLink":"/api/v1/nodes/172.21.0.12","uid":"ec8685f1-ef5f-11e9-8482-525400d54f7e","resourceVersion":"23957","creationTimestamp":"2019-10-15T15:24:48Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/hostname":"172.21.0.12"},"annotations":{"node.alpha.kubernetes.io/ttl":"0","volumes.kubernetes.io/controller-managed-attach-detach":"true"}},"spec":{},"status":{"capacity":{"cpu":"2","ephemeral-storage":"51473888Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3880944Ki","pods":"110"},"allocatable":{"cpu":"2","ephemeral-storage":"47438335103","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3778544Ki","pods":"110"},"conditions":[{"type":"MemoryPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientMemory","message":"kubelet has sufficient memory available"},{"type":"DiskPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasNoDiskPressure","message":"kubelet has no disk pressure"},{"type":"PIDPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientPID","message":"kubelet has sufficient PID available"},{"type":"Ready","status":"True","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletReady","message":"kubelet is posting ready status"}],"addresses":[{"type":"InternalIP","address":"172.21.0.12"},{"type":"Hostname","address":"172.21.0.12"}],"daemonEndpoints":{"kubeletEndpoint":{"Port":10250}},"nodeInfo":{"machineID":"c28d40cbc8e3adcb4e32d9779a77b39e","systemUUID":"2C6B0169-85AC-48F3-9377-35EFC668E23C","bootID":"f5081260-8e17-446c-9b2c-8c2766e49e0e","kernelVersion":"3.10.0-862.el7.x86_64","osImage":"CentOS Linux 7 (Core)","containerRuntimeVersion":"docker://17.9.1","kubeletVersion":"v0.0.0-master+$Format:%h$","kubeProxyVersion":"v0.0.0-master+$Format:%h$","operatingSystem":"linux","architecture":"amd64"},"images":[{"names":["nginx@sha256:aeded0f2a861747f43a01cf1018cf9efe2bdd02afd57d2b11fcc7fcadc16ccd1","nginx:latest"],"sizeBytes":125952483},{"names":["mirrorgooglecontainers/pause@sha256:59eec8837a4d942cc19a52b8c09ea75121acc38114a2c68b98983ce9356b8610","mirrorgooglecontainers/pause:3.1"],"sizeBytes":742472},{"names":["hello-world@sha256:c3b4ada4687bbaa170745b3e4dd8ac3f194ca95b2d0518b417fb47e5879d9b5f","hello-world:latest"],"sizeBytes":1840}]}}]},"NodeNames":null,"FailedNodes":{},"Error":""}
從日志中可以看到
kube-scheduler
已經(jīng)調(diào)用了extender
服務(wù), 并且沒有把test-schduler
這個pod
部署在172.21.0.16
這個節(jié)點上.
2.2.2 創(chuàng)建不帶有schedulerName
的pod
.
可以看到該
pod
是使用默認(rèn)調(diào)度器調(diào)度的, 也可以創(chuàng)建成功.
root@master kubectl]# cat pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: podtest
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod.yaml
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 1/1 Running 0 2m39s
test-schduler 1/1 Running 0 24m
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i nodeName
nodeName: 172.21.0.16
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i schedulerName
schedulerName: default-scheduler
[root@master kubectl]#
3. 相關(guān)源碼部分分析
在 [k8s源碼分析][kube-scheduler]scheduler之自定義調(diào)度器(1) 中已經(jīng)分析了
kube-scheduler --config
加載配置文件的流程. 所以這里就著重分析與extender
服務(wù)交互的部分.
在
CreateFromConfig
中可以看到extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
就是根據(jù)plicy.yaml
中的第ii
個extenders
的配置生成對應(yīng)的extender
.
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
// 生成擴(kuò)展
var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
}
3.1 extender接口
type SchedulerExtender interface {
// Name returns a unique name that identifies the extender.
// 該extender的名字 onfig.URLPrefix
Name() string
// 過濾方法 也就是相當(dāng)于預(yù)選方法
Filter(pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
// 打分
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
// 是否支持搶占
SupportsPreemption() bool
// 是否可以容忍錯誤
// 設(shè)置為true時 如果該extender執(zhí)行過程中發(fā)生了錯誤 可以容忍 就是直接跳過
// 設(shè)置為false時 如果該extender執(zhí)行過程中發(fā)生了錯誤 那scheduler就會返回了
IsIgnorable() bool
}
3.1 extender接口實現(xiàn)類HTTPExtender
根據(jù)
schedulerapi.ExtenderConfig
生成一個HTTPExtender
, 包括創(chuàng)建客戶端與extender
進(jìn)行交互.
// HTTPExtender implements the algorithm.SchedulerExtender interface.
type HTTPExtender struct {
extenderURL string
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
weight int
client *http.Client
nodeCacheCapable bool
managedResources sets.String
ignorable bool
}
func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
if config.HTTPTimeout.Nanoseconds() == 0 {
config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
}
transport, err := makeTransport(config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
managedResources := sets.NewString()
for _, r := range config.ManagedResources {
managedResources.Insert(string(r.Name))
}
return &HTTPExtender{
extenderURL: config.URLPrefix,
preemptVerb: config.PreemptVerb,
filterVerb: config.FilterVerb,
prioritizeVerb: config.PrioritizeVerb,
bindVerb: config.BindVerb,
weight: config.Weight,
client: client,
nodeCacheCapable: config.NodeCacheCapable,
managedResources: managedResources,
ignorable: config.Ignorable,
}, nil
}
3.4 與extender交互
因為都是大同小異, 所以就以預(yù)選階段為例看一下.
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
...
// 對預(yù)選方法過濾出來的所有節(jié)點 再重新從extenders中一個個過濾
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
// 如果出現(xiàn)失敗 返回
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
// 如果failedPredicateMap中不存在 加入到failedPredicateMap中
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
// 返回最終的filtered 適合的節(jié)點
// failedPredicateMap 失敗的節(jié)點以及原因
return filtered, failedPredicateMap, nil
}
可以看到在預(yù)選階段過濾出一部分節(jié)點之后, 又調(diào)用
extenders
中每個extender
過濾一遍, 每個extender
調(diào)用自己的Filter
方法進(jìn)行過濾.
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
var (
result schedulerapi.ExtenderFilterResult
nodeList *v1.NodeList
nodeNames *[]string
nodeResult []*v1.Node
args *schedulerapi.ExtenderArgs
)
if h.filterVerb == "" {
return nodes, schedulerapi.FailedNodesMap{}, nil
}
// 如果nodeCacheCapable等于true 則使用nodeNames
// 否則使用nodeList
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
// 組裝發(fā)送給extender服務(wù)的ExtenderArgs
args = &schedulerapi.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 給其對應(yīng)的api發(fā)POST請求
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
}
// 取結(jié)果
if h.nodeCacheCapable && result.NodeNames != nil {
nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
for i := range *result.NodeNames {
nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
}
} else if result.Nodes != nil {
nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
for i := range result.Nodes.Items {
nodeResult = append(nodeResult, &result.Nodes.Items[i])
}
}
return nodeResult, result.FailedNodes, nil
}
Filter
方法給該extender
對應(yīng)的filterVerb
(也就是請求路徑(predicates/always_true
), 加上該extender
的extenderURL=http://localhost/scheduler
, 所以全局路徑http://localhost/scheduler/predicates/always_true
)發(fā)送請求, 這個就回到了前面分析extender
服務(wù)接受到請求然后去Handler
方法中進(jìn)行處理. 然后把結(jié)果符合給Filter
方法存到result
中.
4. 總結(jié)
可以看到 k8s-scheduler-extender-example 中是以
configMap
和Deployment
文件的形式部署一個調(diào)度器, 可以看到有兩個容器.
第一個: 為
gcr.io/google_containers/hyperkube:v1.11.1
, 這個鏡像里面有編譯的k8s
的各個組件, 所以該容器my-scheduler-ctr
運行的是kube-scheduler
組件, 并且以configmap
的形式把配置文件加載進(jìn)去, 然后后面的運行就跟[k8s源碼分析][kube-scheduler]scheduler之自定義調(diào)度器(1) 分析的一樣了.
第二個: 容器my-scheduler-extender-ctr
就是該extender
服務(wù), 只不過打成鏡像了而已.
這兩個容器在一個pod
, 所以相互直接用localhost
訪問即可.
containers:
- name: my-scheduler-ctr
image: gcr.io/google_containers/hyperkube:v1.11.1
imagePullPolicy: IfNotPresent
args:
- kube-scheduler
- --config=/my-scheduler/config.yaml
- -v=4
volumeMounts:
- name: my-scheduler-config
mountPath: /my-scheduler
- name: my-scheduler-extender-ctr
image: a/b:c
imagePullPolicy: IfNotPresent
所以這樣就以插件的形式部署到
k8s
集群上了.