- 1. startNodeLifecycleController
- 2. NewNodeLifecycleController
- 3. NodeLifecycleController.run
- 4 總結(jié)
k8s版本:1.17.4
1. startNodeLifecycleController
可以看到startNodeLifecycleController就是分為2個步驟:
- NodeLifecycleController
- NodeLifecycleController.run
func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
ctx.InformerFactory.Coordination().V1().Leases(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.InformerFactory.Apps().V1().DaemonSets(),
// node lifecycle controller uses existing cluster role from node-controller
ctx.ClientBuilder.ClientOrDie("node-controller"),
// 就是node-monitor-period參數(shù)
ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
// 就是node-startup-grace-period參數(shù)
ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
// 就是node-monitor-grace-period參數(shù)
ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
// 就是pod-eviction-timeout參數(shù)
ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
// 就是node-eviction-rate參數(shù)
ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
// 就是secondary-node-eviction-rate參數(shù)
ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
// 就是large-cluster-size-threshold參數(shù)
ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
// 就是unhealthy-zone-threshold參數(shù)
ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
// 就是enable-taint-manager參數(shù) (默認打開的)
ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
// 就是這個是否打開--feature-gates=TaintBasedEvictions=true (默認打開的)
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)
if err != nil {
return nil, true, err
}
go lifecycleController.Run(ctx.Stop)
return nil, true, nil
}
具體參數(shù)介紹
- enable-taint-manager 默認為true, 表示允許NoExecute污點愧膀,并且將會驅(qū)逐pod
- large-cluster-size-threshold 默認50,基于這個閾值來判斷所在集群是否為大規(guī)模集群捎迫。當集群規(guī)模小于等于這個值的時候表谊,會將--secondary-node-eviction-rate參數(shù)強制賦值為0
- secondary-node-eviction-rate 默認0.01橘沥。 當zone unhealthy時候毅戈,一秒內(nèi)多少個node進行驅(qū)逐node上pod。二級驅(qū)趕速率检痰,當集群中宕機節(jié)點過多時包归,相應(yīng)的驅(qū)趕速率也降低,默認為0.01攀细。
- node-eviction-rate float32 默認為0.1箫踩。驅(qū)趕速率,即驅(qū)趕Node的速率谭贪,由令牌桶流控算法實現(xiàn),默認為0.1锦担,即每秒驅(qū)趕0.1個節(jié)點俭识,注意這里不是驅(qū)趕Pod的速率,而是驅(qū)趕節(jié)點的速率洞渔。相當于每隔10s套媚,清空一個節(jié)點。
- node-monitor-grace-period duration 默認40s, 多久node沒有響應(yīng)認為node為unhealthy
- node-startup-grace-period duration 默認1分鐘磁椒。多久允許剛啟動的node未響應(yīng)堤瘤,認為unhealthy
- pod-eviction-timeout duration 默認5min。當node unhealthy時候多久刪除上面的pod(只在taint manager未啟用時候生效)
- unhealthy-zone-threshold float32 默認55%浆熔,多少比例的unhealthy node認為zone unhealthy
2. NewNodeLifecycleController
2.1 NodeLifecycleController結(jié)構(gòu)體介紹
// Controller is the controller that manages node's life cycle.
type Controller struct {
// taintManager監(jiān)聽節(jié)點的Taint/Toleration變化本辐,用于驅(qū)逐pod
taintManager *scheduler.NoExecuteTaintManager
// 監(jiān)聽pod
podLister corelisters.PodLister
podInformerSynced cache.InformerSynced
kubeClient clientset.Interface
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
// to avoid the problem with time skew across the cluster.
now func() metav1.Time
// 返回secondary-node-eviction-rate參數(shù)值。就是根據(jù)集群是否為大集群医增,如果是大集群慎皱,返回secondary-node-eviction-rate,否則返回0
enterPartialDisruptionFunc func(nodeNum int) float32
// 返回evictionLimiterQPS參數(shù)
enterFullDisruptionFunc func(nodeNum int) float32
// 返回集群有多少nodeNotReady, 并且返回bool值ZoneState用于判斷zone是否健康。利用了unhealthyZoneThreshold參數(shù)
computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
// node map
knownNodeSet map[string]*v1.Node
// node健康信息map表
// per Node map storing last observed health together with a local time when it was observed.
nodeHealthMap *nodeHealthMap
// evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
// TODO(#83954): API calls shouldn't be executed under the lock.
evictorLock sync.Mutex
// 存放node上pod是否已經(jīng)執(zhí)行驅(qū)逐的狀態(tài)叶骨, 從這讀取node eviction的狀態(tài)是evicted茫多、tobeeviced
nodeEvictionMap *nodeEvictionMap
// workers that evicts pods from unresponsive nodes.
// zone的需要pod evictor的node列表
zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
// 存放需要更新taint的unready node列表--令牌桶隊列
// workers that are responsible for tainting nodes.
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
// 重試列表
nodesToRetry sync.Map
// 存放每個zone的健康狀態(tài),有stateFullDisruption、statePartialDisruption忽刽、stateNormal天揖、stateInitial
zoneStates map[string]ZoneState
// 監(jiān)聽ds相關(guān)
daemonSetStore appsv1listers.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
// 監(jiān)聽node相關(guān)
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
recorder record.EventRecorder
// 之前推到的一對參數(shù)
// Value controlling Controller monitoring period, i.e. how often does Controller
// check node health signal posted from kubelet. This value should be lower than
// nodeMonitorGracePeriod.
// TODO: Change node health monitor to watch based.
nodeMonitorPeriod time.Duration
// When node is just created, e.g. cluster bootstrap or node creation, we give
// a longer grace period.
nodeStartupGracePeriod time.Duration
// Controller will not proactively sync node health, but will monitor node
// health signal updated from kubelet. There are 2 kinds of node healthiness
// signals: NodeStatus and NodeLease. NodeLease signal is generated only when
// NodeLease feature is enabled. If it doesn't receive update for this amount
// of time, it will start posting "NodeReady==ConditionUnknown". The amount of
// time before which Controller start evicting pods is controlled via flag
// 'pod-eviction-timeout'.
// Note: be cautious when changing the constant, it must work with
// nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
// controller. The node health signal update frequency is the minimal of the
// two.
// There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than the node health signal
// update frequency, where N means number of retries allowed for kubelet to
// post node status/lease. It is pointless to make nodeMonitorGracePeriod
// be less than the node health signal update frequency, since there will
// only be fresh values from Kubelet at an interval of node health signal
// update frequency. The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger
// value takes longer for user to see up-to-date node health.
nodeMonitorGracePeriod time.Duration
podEvictionTimeout time.Duration
evictionLimiterQPS float32
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// if set to true Controller will start TaintManager that will evict Pods from
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
// pod, node隊列
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.RateLimitingInterface
}
2.2 NewNodeLifecycleController
核心邏輯如下:
(1)根據(jù)參數(shù)初始化Controller
(2)定義了pod的監(jiān)聽處理邏輯夺欲。都是先nc.podUpdated,如果enable-taint-manager=true,還會經(jīng)過nc.taintManager.PodUpdated函數(shù)處理
(3)實現(xiàn)找出所有node上pod的函數(shù)
(4)如果enable-taint-manager=true今膊,node有變化都需要經(jīng)過 nc.taintManager.NodeUpdated函數(shù)
(5)實現(xiàn)node的監(jiān)聽處理洁闰,這里不管開沒開taint-manager,都是要監(jiān)聽
(6)實現(xiàn)node, ds, lease的list万细,用于獲取對象
// NewNodeLifecycleController returns a new taint controller.
func NewNodeLifecycleController(
leaseInformer coordinformers.LeaseInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer appsv1informers.DaemonSetInformer,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorGracePeriod time.Duration,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
secondaryEvictionLimiterQPS float32,
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
runTaintManager bool,
useTaintBasedEvictions bool,
) (*Controller, error) {
// 1.根據(jù)參數(shù)初始化Controller
nc := &Controller{
省略代碼
....
}
if useTaintBasedEvictions {
klog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
nc.computeZoneStateFunc = nc.ComputeZoneState
// 2.定義了pod的監(jiān)聽處理邏輯扑眉。都是先nc.podUpdated,如果enable-taint-manager=true,還會經(jīng)過nc.taintManager.PodUpdated
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
赖钞。腰素。。
省略代碼
})
// 3.實現(xiàn)找出所有node上pod的函數(shù)
nc.podInformerSynced = podInformer.Informer().HasSynced
podInformer.Informer().AddIndexers(cache.Indexers{
nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
podIndexer := podInformer.Informer().GetIndexer()
nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, pod)
}
return pods, nil
}
nc.podLister = podInformer.Lister()
// 4.如果enable-taint-manager=true雪营,node有變化都需要經(jīng)過 nc.taintManager.NodeUpdated函數(shù)
if nc.runTaintManager {
podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
})
}
// 5. 實現(xiàn)node的監(jiān)聽處理弓千,這里不管開沒開taint-manager,都是要監(jiān)聽
klog.Infof("Controller will reconcile labels.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
nc.nodeEvictionMap.registerNode(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.nodesToRetry.Delete(node.Name)
nc.nodeEvictionMap.unregisterNode(node.Name)
return nil
}),
})
// 6. 實現(xiàn)node, ds, lease的list献起,用于獲取對象
nc.leaseLister = leaseInformer.Lister()
nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
return nc, nil
}
3. NodeLifecycleController.run
邏輯如下:
(1)等待leaseInformer洋访、nodeInformer、podInformerSynced谴餐、daemonSetInformerSynced同步完成姻政。
(2)如果enable-taint-manager=true,開啟nc.taintManager.Run
(3)執(zhí)行doNodeProcessingPassWorker,這個是處理nodeUpdateQueue隊列的node
(4)doPodProcessingWorker岂嗓,這個是處理podUpdateQueue隊列的pod
(5)如果開啟了feature-gates=TaintBasedEvictions=true汁展,執(zhí)行doNoExecuteTaintingPass函數(shù)。否則執(zhí)行doEvictionPass函數(shù)
(6)一直監(jiān)聽node狀態(tài)是否健康
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
// 1.等待leaseInformer厌殉、nodeInformer食绿、podInformerSynced、daemonSetInformerSynced同步完成公罕。
if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
// 2.如果enable-taint-manager=true,開啟nc.taintManager.Run
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
defer nc.podUpdateQueue.ShutDown()
// 3.執(zhí)行doNodeProcessingPassWorker器紧,這個是處理nodeUpdateQueue隊列的node
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
// 4.doPodProcessingWorker,這個是處理podUpdateQueue隊列的pod
for i := 0; i < podUpdateWorkerSize; i++ {
go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
}
// 5. 如果開啟了feature-gates=TaintBasedEvictions=true楼眷,執(zhí)行doNoExecuteTaintingPass函數(shù)铲汪。否則執(zhí)行doEvictionPass函數(shù)
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// 6.一直監(jiān)聽node狀態(tài)是否健康
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
3.1 nc.taintManager.Run
在newNodeLifecycleContainer的時候就初始化了NewNoExecuteTaintManager。
taint manager是由pod和node事件觸發(fā)執(zhí)行摩桶,根據(jù)node或pod綁定的node是否有的noExcute taint桥状,如果有則對node上所有的pod或這個pod執(zhí)行刪除。
具體邏輯為:如果啟用了taint manager就會調(diào)用NewNoExecuteTaintManager對taint manager進行初始化硝清「ㄕ澹可以看出來這里就是初始化了nodeUpdateQueue,podUpdateQueue隊列以及事件上報芦拿。
核心數(shù)據(jù)機構(gòu):
nodeUpdateQueue 在nodelifecycleController的時候定義了士飒,node變化會扔進這個隊列
podUpdateQueue 在nodelifecycleController的時候定義了查邢,pod變化會扔進這個隊列
taintedNodes是存放node上所有的noExecute taint,handlePodUpdate會從taintedNodes查詢node的noExecute taint酵幕。
taintEvictionQueuetaintEvictionQueue是一個TimedWorkerQueue–定時自動執(zhí)行隊列扰藕。因為有的pod設(shè)置了污點容忍時間,所以需要一個時間隊列來定時刪除芳撒。
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartLogging(klog.Infof)
if c != nil {
klog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}
tm := &NoExecuteTaintManager{
client: c,
recorder: recorder,
getPod: getPod,
getNode: getNode,
getPodsAssignedToNode: getPodsAssignedToNode,
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
return tm
}
run函數(shù)邏輯如下:
這里的核心其實就是從nodeUpdateQueue, UpdateWorkerSize 取出一個元素邓深,然后執(zhí)行worker處理樱调。和一般的controller思想是一樣的祖娘。
注意: 這里用了負載均衡的思想屡穗。因為worker數(shù)量是UpdateWorkerSize個澳化,所以這里就定義UpdateWorkerSize個channel。然后開啟UpdateWorkerSize個協(xié)程嘴瓤,處理對應(yīng)的channel场梆。這樣通過哈希取模的方式摹芙,就相當于盡可能使得每個channel的元素盡可能相等日月。
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
klog.V(0).Infof("Starting NoExecuteTaintManager")
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
3.1.1 worker處理
worker的處理邏輯其實很簡單袱瓮。就是每個worker協(xié)程從對應(yīng)的chanel取出一個nodeUpdate/podUpdate 事件進行處理。
分別對應(yīng):handleNodeUpdate函數(shù)和handlePodUpdate函數(shù)
但是:這里又得注意的是:worker會優(yōu)先處理nodeUpdate事件爱咬。(很好理解尺借,因為處理node事件是驅(qū)逐整個節(jié)點的Pod, 這個可能包括了Pod)
func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
3.1.2 handleNodeUpdate
核心邏輯:
(1)先得到該node上所有的taint
(2)得到這個node上所有的pod
(3)for循環(huán)執(zhí)行processPodOnNode來一個個的處理pod
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// 1.先得到該node上所有的taint
// Create or Update
klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// 2. 得到這個node上所有的pod
// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
// 3. for循環(huán)執(zhí)行processPodOnNode來一個個的處理pod
now := time.Now()
for _, pod := range pods {
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
3.1.2.1 processPodOnNode
核心邏輯如下:
(1) 如果node沒有taint了,那就取消該pod的處理(可能在定時隊列中掛著)
(2)通過pod的Tolerations和node的taints進行對比台颠,看該pod有沒有完全容忍褐望。
(3)如果沒有完全容忍,那就先取消對該pod的處理(防止如果pod已經(jīng)在隊列中串前,不能添加到隊列中去),然后再通過AddWork重新掛進去实蔽。注意這里設(shè)置的時間都是time.now荡碾,意思就是馬上刪除
(4)如果完全容忍,找出來最短能夠容忍的時間局装√秤酰看這個函數(shù)就知道。如果沒有身容忍時間或者容忍時間為負數(shù)铐尚,都賦值為0拨脉,表示馬上刪除。如果設(shè)置了最大值math.MaxInt64宣增。表示一直容忍玫膀,永遠不刪除。否則就找設(shè)置的最小的容忍時間
(5)接下里就是根據(jù)最小時間來設(shè)置等多久觸發(fā)刪除pod了爹脾,但是設(shè)置之前還要和之前已有的觸發(fā)再判斷一下
- 如果之前就有在等著到時間刪除的帖旨,并且這次的觸發(fā)刪除時間在那之前箕昭。不刪除。舉例解阅,podA應(yīng)該是11點刪除落竹,這次更新發(fā)現(xiàn)pod應(yīng)該是10.50刪除,那么這次就忽略货抄,還是以上次為準
- 否則刪除后述召,再次設(shè)置這次的刪除時間
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
// 1. 如果node沒有taint了,那就取消該pod的處理(可能在定時隊列中掛著)
if len(taints) == 0 {
tc.cancelWorkWithEvent(podNamespacedName)
}
// 2.通過pod的Tolerations和node的taints進行對比蟹地,看該pod有沒有完全容忍积暖。
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
// 3.如果沒有完全容忍,那就先取消對該pod的處理(防止如果pod已經(jīng)在隊列中锈津,不能添加到隊列中去)呀酸,然后再通過AddWork重新掛進去。注意這里設(shè)置的時間都是time.now琼梆,意思就是馬上刪除
if !allTolerated {
klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.cancelWorkWithEvent(podNamespacedName)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
// 4.如果完全容忍性誉,找出來最短能夠容忍的時間【ピ樱看這個函數(shù)就知道错览。如果沒有身容忍時間或者容忍時間為負數(shù),都賦值為0煌往,表示馬上刪除倾哺。如果設(shè)置了最大值math.MaxInt64。表示一直容忍刽脖,永遠不刪除羞海。否則就找設(shè)置的最小的容忍時間
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
return
}
// 5. 接下里就是根據(jù)最小時間來設(shè)置等多久觸發(fā)刪除pod了
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
// 5.1 如果之前就有在等著到時間刪除的,并且這次的觸發(fā)刪除時間在那之前曲管。不刪除却邓。舉例,podA應(yīng)該是11點刪除院水,這次更新發(fā)現(xiàn)pod應(yīng)該是10.50刪除腊徙,那么這次就忽略,還是以上次為準
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
// 5.2 否則刪除后檬某,再次設(shè)置這次的刪除時間
tc.cancelWorkWithEvent(podNamespacedName)
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
3.1.3 handlePodUpdate
handlePodUpdate是handleNodeUpdate的子集撬腾。核心邏輯就是processPodOnNode。這個上面分析了恢恼,不在分析了
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
3.1.3 nc.taintManager.Run總結(jié)
可以看出來nc.taintManager針對NoExecute污點立即生效的民傻,只要節(jié)點有污點,我就要開始驅(qū)逐,pod你自身通過設(shè)置容忍時間來避免馬上驅(qū)逐
(1)監(jiān)聽pod, node的add/update事件
(2)通過多個channel的方式饰潜,hash打斷pod/node事件到不容的chanenl初坠,這樣讓n個worker負載均衡處理
(3)優(yōu)先處理node事件,但實際node處理和pod處理是一樣的彭雾。處理node是將上面的pod一個一個的判斷碟刺,是否需要驅(qū)逐。判斷驅(qū)逐邏輯核心就是:
如果node沒有taint了薯酝,那就取消該pod的處理(可能在定時隊列中掛著)
通過pod的Tolerations和node的taints進行對比半沽,看該pod有沒有完全容忍。
如果沒有完全容忍吴菠,那就先取消對該pod的處理(防止如果pod已經(jīng)在隊列中者填,不能添加到隊列中去),然后再通過AddWork重新掛進去做葵。注意這里設(shè)置的時間都是time.now占哟,意思就是馬上刪除
如果完全容忍,找出來最短能夠容忍的時間酿矢≌ズ酰看這個函數(shù)就知道。如果沒有身容忍時間或者容忍時間為負數(shù)瘫筐,都賦值為0蜜暑,表示馬上刪除。如果設(shè)置了最大值math.MaxInt64策肝。表示一直容忍肛捍,永遠不刪除。否則就找設(shè)置的最小的容忍時間
-
接下里就是根據(jù)最小時間來設(shè)置等多久觸發(fā)刪除pod了之众,但是設(shè)置之前還要和之前已有的觸發(fā)再判斷一下
- 如果之前就有在等著到時間刪除的拙毫,并且這次的觸發(fā)刪除時間在那之前。不刪除棺禾。舉例恬偷,podA應(yīng)該是11點刪除,這次更新發(fā)現(xiàn)pod應(yīng)該是10.50刪除帘睦,那么這次就忽略,還是以上次為準
- 否則刪除后坦康,再次設(shè)置這次的刪除時間
3.2 doNodeProcessingPassWorker
可以看出來doNodeProcessingPassWorker核心就是2件事:
(1)給node添加NoScheduleTaint
(2)給node添加lables
func (nc *Controller) doNodeProcessingPassWorker() {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
nodeName := obj.(string)
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
// TODO(k82cn): Add nodeName back to the queue
}
// TODO: re-evaluate whether there are any labels that need to be
// reconcile in 1.19. Remove this function if it's no longer necessary.
if err := nc.reconcileNodeLabels(nodeName); err != nil {
klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
// TODO(yujuhong): Add nodeName back to the queue
}
nc.nodeUpdateQueue.Done(nodeName)
}
}
3.2.1 doNoScheduleTaintingPass
核心邏輯就是檢查該 node 是否需要添加對應(yīng)的NoSchedule
邏輯為:
- 1竣付、從 nodeLister 中獲取該 node 對象;
- 2滞欠、判斷該 node 是否存在以下幾種 Condition:(1) False 或 Unknown 狀態(tài)的 NodeReady Condition古胆;(2) MemoryPressureCondition;(3) DiskPressureCondition;(4) NetworkUnavailableCondition逸绎;(5) PIDPressureCondition惹恃;若任一一種存在會添加對應(yīng)的
NoSchedule
taint; - 3棺牧、判斷 node 是否處于
Unschedulable
狀態(tài)巫糙,若為Unschedulable
也添加對應(yīng)的NoSchedule
taint; - 4颊乘、對比 node 已有的 taints 以及需要添加的 taints参淹,以需要添加的 taints 為準,調(diào)用
nodeutil.SwapNodeControllerTaint
為 node 添加不存在的 taints 并刪除不需要的 taints乏悄;
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: v1.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == v1.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
v1.NodeReady: {
v1.ConditionFalse: v1.TaintNodeNotReady,
v1.ConditionUnknown: v1.TaintNodeUnreachable,
},
v1.NodeMemoryPressure: {
v1.ConditionTrue: v1.TaintNodeMemoryPressure,
},
v1.NodeDiskPressure: {
v1.ConditionTrue: v1.TaintNodeDiskPressure,
},
v1.NodeNetworkUnavailable: {
v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
},
v1.NodePIDPressure: {
v1.ConditionTrue: v1.TaintNodePIDPressure,
},
}
3.2.2 reconcileNodeLabels
reconcileNodeLabels就是及時給node更新:
beta.kubernetes.io/arch: amd64
beta.kubernetes.io/os: linux
kubernetes.io/arch: amd64
kubernetes.io/os: linux
// reconcileNodeLabels reconciles node labels.
func (nc *Controller) reconcileNodeLabels(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if node.Labels == nil {
// Nothing to reconcile.
return nil
}
labelsToUpdate := map[string]string{}
for _, r := range labelReconcileInfo {
primaryValue, primaryExists := node.Labels[r.primaryKey]
secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
if !primaryExists {
// The primary label key does not exist. This should not happen
// within our supported version skew range, when no external
// components/factors modifying the node object. Ignore this case.
continue
}
if secondaryExists && primaryValue != secondaryValue {
// Secondary label exists, but not consistent with the primary
// label. Need to reconcile.
labelsToUpdate[r.secondaryKey] = primaryValue
} else if !secondaryExists && r.ensureSecondaryExists {
// Apply secondary label based on primary label.
labelsToUpdate[r.secondaryKey] = primaryValue
}
}
if len(labelsToUpdate) == 0 {
return nil
}
if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}
return nil
}
3.3 doPodProcessingWorker
doPodProcessingWorker從podUpdateQueue讀取一個pod浙值,執(zhí)行processPod。(注意這里的podUpdateQueue和tainManger的podUpdateQueue不是一個隊列檩小,是同名而已)
processPod和新邏輯如下:
(1) 判斷NodeCondition是否notReady
(2)如果feature-gates=TaintBasedEvictions=false开呐,則執(zhí)行processNoTaintBaseEviction
(3)最終都會判斷node ReadyCondition是否不為true,如果不為true, 執(zhí)行MarkPodsNotReady–如果pod的ready condition不為false规求, 將pod的ready condition設(shè)置為false筐付,并更新LastTransitionTimestamp;否則不更新pod
func (nc *Controller) doPodProcessingWorker() {
for {
obj, shutdown := nc.podUpdateQueue.Get()
// "podUpdateQueue" will be shutdown when "stopCh" closed;
// we do not need to re-check "stopCh" again.
if shutdown {
return
}
podItem := obj.(podUpdateItem)
nc.processPod(podItem)
}
}
// processPod is processing events of assigning pods to nodes. In particular:
// 1. for NodeReady=true node, taint eviction for this pod will be cancelled
// 2. for NodeReady=false or unknown node, taint eviction of pod will happen and pod will be marked as not ready
// 3. if node doesn't exist in cache, it will be skipped and handled later by doEvictionPass
func (nc *Controller) processPod(podItem podUpdateItem) {
defer nc.podUpdateQueue.Done(podItem)
pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
if err != nil {
if apierrors.IsNotFound(err) {
// If the pod was deleted, there is no need to requeue.
return
}
klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
nodeName := pod.Spec.NodeName
nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
if nodeHealth == nil {
// Node data is not gathered yet or node has beed removed in the meantime.
// Pod will be handled by doEvictionPass method.
return
}
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
klog.Warningf("Failed to read node %v: %v.", nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
// 1. 判斷NodeCondition是否notReady
_, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
if currentReadyCondition == nil {
// Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
// In both cases, the pod will be handled correctly (evicted if needed) during processing
// of the next node update event.
return
}
// 2.如果feature-gates=TaintBasedEvictions=false颓哮,則執(zhí)行processNoTaintBaseEviction
pods := []*v1.Pod{pod}
// In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
// Pods are processed by TaintManager.
if !nc.useTaintBasedEvictions {
if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
return
}
}
// 3.最終都會判斷node ReadyCondition是否不為true家妆,如果不為true, 執(zhí)行MarkPodsNotReady–如果pod的ready condition不為false, 將pod的ready condition設(shè)置為false冕茅,并更新LastTransitionTimestamp伤极;否則不更新pod
if currentReadyCondition.Status != v1.ConditionTrue {
if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
}
}
}
3.3.1 processNoTaintBaseEviction
核心邏輯如下:
(1)node最后發(fā)現(xiàn)ReadyCondition為false,如果nodeHealthMap里的readyTransitionTimestamp加上podEvictionTimeout的時間是過去的時間–ReadyCondition為false狀態(tài)已經(jīng)持續(xù)了至少podEvictionTimeout姨伤,執(zhí)行evictPods哨坪。
(2)node最后發(fā)現(xiàn)ReadyCondition為unknown,如果nodeHealthMap里的probeTimestamp加上podEvictionTimeout的時間是過去的時間–ReadyCondition為false狀態(tài)已經(jīng)持續(xù)了至少podEvictionTimeout乍楚,執(zhí)行evictPods当编。
(3)node最后發(fā)現(xiàn)ReadyCondition為true,則執(zhí)行cancelPodEviction–在nodeEvictionMap設(shè)置status為unmarked徒溪,然后node從zonePodEvictor隊列中移除忿偷。
evictPods并不會馬上驅(qū)逐pod,他還是看node是否已經(jīng)是驅(qū)逐狀態(tài)臊泌。
evictPods先從nodeEvictionMap獲取node驅(qū)逐的狀態(tài)鲤桥,如果是evicted說明node已經(jīng)發(fā)生驅(qū)逐,則把node上的這個pod刪除渠概。否則設(shè)置狀態(tài)為toBeEvicted茶凳,然后node加入zonePodEvictor隊列等待執(zhí)行驅(qū)逐pod
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
decisionTimestamp := nc.now()
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
if nodeHealthData == nil {
return fmt.Errorf("health data doesn't exist for node %q", node.Name)
}
// Check eviction timeout against decisionTimestamp
switch observedReadyCondition.Status {
case v1.ConditionFalse:
if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout,
)
}
}
case v1.ConditionUnknown:
if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
enqueued, err := nc.evictPods(node, pods)
if err != nil {
return err
}
if enqueued {
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
node.Name,
decisionTimestamp,
nodeHealthData.readyTransitionTimestamp,
nc.podEvictionTimeout-gracePeriod,
)
}
}
case v1.ConditionTrue:
if nc.cancelPodEviction(node) {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
return nil
}
// evictPods:
// - adds node to evictor queue if the node is not marked as evicted.
// Returns false if the node name was already enqueued.
// - deletes pods immediately if node is already marked as evicted.
// Returns false, because the node wasn't added to the queue.
func (nc *Controller) evictPods(node *v1.Node, pods []*v1.Pod) (bool, error) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
status, ok := nc.nodeEvictionMap.getStatus(node.Name)
if ok && status == evicted {
// Node eviction already happened for this node.
// Handling immediate pod deletion.
_, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore)
if err != nil {
return false, fmt.Errorf("unable to delete pods from node %q: %v", node.Name, err)
}
return false, nil
}
if !nc.nodeEvictionMap.setStatus(node.Name, toBeEvicted) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
}
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil
}
3.4 doEvictionPass(if useTaintBasedEvictions==false)
doEvictionPass是一個令牌桶限速隊列(受參數(shù)evictionLimiterQPS影響嫂拴,默認0.1也就是10s驅(qū)逐一個node),+加入這個隊列的node都是 unready狀態(tài)持續(xù)時間大于podEvictionTimeout贮喧。(這個就是processNoTaintBaseEviction將node加入了隊列)
- 遍歷zonePodEvictor筒狠,獲取一個zone里的node隊列,從隊列中獲取一個node箱沦,執(zhí)行下面步驟
- 獲取node的uid辩恼,從緩存中獲取node上的所有pod
- 執(zhí)行DeletePods–刪除daemonset之外的所有pod,保留daemonset的pod
- 遍歷所由的pod饱普,檢查pod綁定的node是否跟提供的一樣运挫,不一樣則跳過這個pod
- 執(zhí)行SetPodTerminationReason–設(shè)置pod Status.Reason為
NodeLost
,Status.Message為"Node %v which was running pod %v is unresponsive"
套耕,并更新pod谁帕。 - 如果pod 設(shè)置了DeletionGracePeriodSeconds,說明pod已經(jīng)被刪除冯袍,則跳過這個pod
- 判斷pod是否為daemonset的pod匈挖,如果是則跳過這個pod
- 刪除這個pod
- 在nodeEvictionMap設(shè)置node的狀態(tài)為evicted
3.5 doNoExecuteTaintingPass(if useTaintBasedEvictions==true)
啟用taint manager 執(zhí)行doNoExecuteTaintingPass–添加NoExecute的taint。這里不執(zhí)行驅(qū)逐康愤,驅(qū)逐單獨在taint manager里處理儡循。
doNoExecuteTaintingPass是一個令牌桶限速隊列(也是受受參數(shù)evictionLimiterQPS影響,默認0.1也就是10s驅(qū)逐一個node)
- 遍歷zoneNoExecuteTainter征冷,獲得一個zone的node隊列择膝,從隊列中獲取一個node,執(zhí)行下面步驟
- 從緩存中獲取node
- 如果node ready condition為false检激,移除“node.kubernetes.io/unreachable”的taint肴捉,添加“node.kubernetes.io/not-ready” 的taint,Effect為NoExecute叔收。
- 如果node ready condition為unknown齿穗,移除“node.kubernetes.io/not-ready” 的taint,添加“node.kubernetes.io/unreachable” 的taint饺律,Effect為NoExecute窃页。
3.6 monitorNodeHealth
(3.6該章節(jié)摘自https://midbai.com/post/node-lifecycle-controller-manager/)
無論是否啟用了 TaintBasedEvictions
特性,需要打 taint 或者驅(qū)逐 pod 的 node 都會被放在 zoneNoExecuteTainter 或者 zonePodEvictor 隊列中复濒,而 nc.monitorNodeHealth
就是這兩個隊列中數(shù)據(jù)的生產(chǎn)者脖卖。nc.monitorNodeHealth
的主要功能是持續(xù)監(jiān)控 node 的狀態(tài),當 node 處于異常狀態(tài)時更新 node 的 taint 以及 node 上 pod 的狀態(tài)或者直接驅(qū)逐 node 上的 pod巧颈,此外還會為集群下的所有 node 劃分 zoneStates 并為每個 zoneStates 設(shè)置對應(yīng)的驅(qū)逐速率胚嘲。
每隔nodeMonitorPeriod周期,執(zhí)行一次monitorNodeHealth洛二,維護node狀態(tài)和zone的狀態(tài),更新未響應(yīng)的node–設(shè)置node status為unknown和根據(jù)集群不同狀態(tài)設(shè)置zone的速率。
3.6.1 node分類并初始化
從緩存中獲取所有node列表晾嘶,借助兩個字段knownNodeSet(用來存放已經(jīng)發(fā)現(xiàn)的node集合)和zoneStates(用來存儲已經(jīng)發(fā)現(xiàn)zone的狀態(tài)–狀態(tài)有Initial妓雾、Normal、FullDisruption垒迂、PartialDisruption)來進行對node進行分類械姻,分為新加的–add、刪除的deleted机断、新的zone node–newZoneRepresentatives楷拳。
對新發(fā)現(xiàn)的zone進行初始化–啟用taint manager,設(shè)置執(zhí)行node設(shè)置taint 隊列zoneNoExecuteTainter(存放node為unready吏奸,需要添加taint)的速率為evictionLimiterQPS欢揖。未啟用taint manager,設(shè)置安排node執(zhí)行驅(qū)逐隊列zonePodEvictor(存放zone里的需要執(zhí)行pod evictor的node列表)的速率evictionLimiterQPS奋蔚。同時在zoneStates里設(shè)置zone狀態(tài)為stateInitial她混。
對新發(fā)現(xiàn)的node,添加到knownNodeSet泊碑,同時在zoneStates里設(shè)置zone狀態(tài)為stateInitial坤按,如果node的所屬的zone未初始化,則進行初始化馒过。啟用taint manager臭脓,標記node為健康的–移除node上unreachable和notready taint(如果存在),從zoneNoExecuteTainter(存放node為unready腹忽,需要添加taint)隊列中移除(如果存在)来累。未啟用taint manager,初始化nodeEvictionMap(存放node驅(qū)逐執(zhí)行pod的進度)–設(shè)置node的狀態(tài)為unmarked留凭,從zonePodEvictor(存放zone的需要pod evictor的node列表)隊列中移除佃扼。
對刪除的node,發(fā)送一個RemovingNode事件并從knownNodeSet里移除蔼夜。
3.6.2 處理node status
超時時間
如果當前node的ready condition為空兼耀,說明node剛注冊,所以它的超時時間為nodeStartupGracePeriod求冷,否則它的超時時間為nodeMonitorGracePeriod瘤运。
心跳時間
最后的心跳時間(probeTimestamp和readyTransitionTimestamp),由下面規(guī)則從上往下執(zhí)行匠题。
如果node剛注冊拯坟,則nodeHealthMap保存的probeTimestamp和readyTransitionTimestamp都為node的創(chuàng)建時間。
如果nodeHealthMap里沒有該node數(shù)據(jù)韭山,則probeTimestamp和readyTransitionTimestamp都為現(xiàn)在郁季。
如果nodeHealthMap里的 ready condition沒有冷溃,而現(xiàn)在有ready condition,則probeTimestamp和readyTransitionTimestamp都為現(xiàn)在梦裂,status為現(xiàn)在的status似枕。
如果nodeHealthMap里的有ready condition,而現(xiàn)在的ready condition沒有年柠,說明發(fā)生了未知的異常情況(一般不會發(fā)生凿歼,只是預(yù)防性的代碼),則probeTimestamp和readyTransitionTimestamp都為現(xiàn)在冗恨,status為現(xiàn)在的status答憔。
如果nodeHealthMap里有ready condition,而現(xiàn)在的ready condition也有掀抹,且保存的LastHeartbeatTime與現(xiàn)在不一樣虐拓。probeTimestamp為現(xiàn)在、status為現(xiàn)在的status渴丸。 如果保存的LastTransitionTime與現(xiàn)在的不一樣侯嘀,說明node狀態(tài)發(fā)生了變化,則設(shè)置nodeHealthMap的readyTransitionTimestamp為現(xiàn)在谱轨。
如果現(xiàn)在的lease存在戒幔,且lease的RenewTime在nodeHealthMap保存的RenewTime之后,或者nodeHealthMap里不存在土童。則probeTimestamp為現(xiàn)在诗茎,保存現(xiàn)在lease到nodeHealthMap里。
嘗試更新node狀態(tài)
如果probeTimestamp加上超時時間献汗,在現(xiàn)在之前–即status狀態(tài)更新已經(jīng)超時敢订,則會更新update node。
更新ready罢吃、memorypressure楚午、diskpressure、pidpressure的condition為:
相應(yīng)condition不存在
v1.NodeCondition{
Type: nodeConditionType,//上面的四種類型
Status: v1.ConditionUnknown,// unknown
Reason: "NodeStatusNeverUpdated",
Message: "Kubelet never posted node status.",
LastHeartbeatTime: node.CreationTimestamp,//node創(chuàng)建時間
LastTransitionTime: nowTimestamp, //現(xiàn)在時間
}
相應(yīng)的condition存在
currentCondition.Status = v1.ConditionUnknown
currentCondition.Reason = "NodeStatusUnknown"
currentCondition.Message = "Kubelet stopped posting node status."
currentCondition.LastTransitionTime = nowTimestamp
如果現(xiàn)在node與之前的node不一樣的–發(fā)生了更新尿招,則對node執(zhí)行update矾柜。
update成功,同時更新nodeHealthMap上的狀態(tài)–readyTransitionTimestamp改為現(xiàn)在就谜,status改為現(xiàn)在的node.status怪蔑。
對unready node進行處理–驅(qū)逐pod
node當前的ReadyCondition–執(zhí)行嘗試更新node狀態(tài)之后的node的ReadyCondition
node最后發(fā)現(xiàn)ReadyCondition–執(zhí)行嘗試更新node狀態(tài)之前node的ReadyCondition
如果當前的ReadyCondition不為空,執(zhí)行下面操作
- 從緩存中獲取node上pod列表
- 如果啟用taint manager丧荐,執(zhí)行processTaintBaseEviction–根據(jù)node最后發(fā)現(xiàn)ReadyCondition 對node的taint進行操作
- node最后發(fā)現(xiàn)ReadyCondition為false缆瓣,如果已經(jīng)有“node.kubernetes.io/unreachable”的taint,將該taint刪除虹统,添加“node.kubernetes.io/not-ready” 的taint弓坞。否則將node添加到zoneNoExecuteTainter隊列中隧甚,等待添加taint。
- node最后發(fā)現(xiàn)ReadyCondition為unknown昼丑,如果已經(jīng)有“node.kubernetes.io/not-ready” 的taint呻逆,將該taint刪除,添加“node.kubernetes.io/unreachable”的taint菩帝。否則將node添加到zoneNoExecuteTainter隊列中,等待添加taint茬腿。
- node最后發(fā)現(xiàn)ReadyCondition為true呼奢,移除“node.kubernetes.io/not-ready” 和“node.kubernetes.io/unreachable”的taint,如果存在的話切平,同時從zoneNoExecuteTainter隊列中移除握础。
- 未啟用taint manager,則執(zhí)行processNoTaintBaseEviction
- node最后發(fā)現(xiàn)ReadyCondition為false悴品,nodeHealthMap里的readyTransitionTimestamp加上podEvictionTimeout的時間是過去的時間–ReadyCondition為false狀態(tài)已經(jīng)持續(xù)了至少podEvictionTimeout禀综,執(zhí)行evictPods。
- node最后發(fā)現(xiàn)ReadyCondition為unknown苔严,nodeHealthMap里的readyTransitionTimestamp加上podEvictionTimeout的時間是過去的時間–ReadyCondition為false狀態(tài)已經(jīng)持續(xù)了至少podEvictionTimeout定枷,執(zhí)行evictPods。
- node最后發(fā)現(xiàn)ReadyCondition為true届氢,則執(zhí)行cancelPodEviction–在nodeEvictionMap設(shè)置status為unmarked欠窒,然后node從zonePodEvictor隊列中移除。
- evictPods–先從nodeEvictionMap獲取node驅(qū)逐的狀態(tài)退子,如果是evicted說明node已經(jīng)發(fā)生驅(qū)逐岖妄,則把node上所有的pod刪除。否則設(shè)置狀態(tài)為toBeEvicted寂祥,然后node加入zonePodEvictor隊列等待執(zhí)行驅(qū)逐pod荐虐。
這里有個疑問:
為什么要用observedReadyCondition 而不用currentReadyCondition,observedReadyCondition和currentReadyCondition不一定一樣丸凭?
比如node掛了currentReadyCondition變?yōu)閡nknown福扬,而observedReadyCondition為ready
這樣明顯有問題,這一周期不會做驅(qū)逐或taint贮乳,下一周期observedReadyCondition和currentReadyCondition都為unknown 一定會驅(qū)逐pod或添加taint忧换。
可能考慮nodeMonitorPeriod都很短,不立馬執(zhí)行驅(qū)逐或taint沒有什么大問題向拆。
3.6.3 集群健康狀態(tài)處理
每個zone有四種狀態(tài)亚茬,stateInitial(剛加入的zone)、stateFullDisruption(全掛)浓恳、statePartialDisruption(掛的node比例超出了unhealthyZoneThreshold)刹缝、stateNormal(剩下的所有情況)
allAreFullyDisrupted代表現(xiàn)在所有zone狀態(tài)stateFullDisruption全掛
allWasFullyDisrupted為true代表過去所有zone狀態(tài)stateFullDisruption全掛
集群狀態(tài)有四種:
- allAreFullyDisrupted為true allWasFullyDisrupted為true
- allAreFullyDisrupted為true allWasFullyDisrupted為false
- allAreFullyDisrupted為false allWasFullyDisrupted為true
- allAreFullyDisrupted為false allWasFullyDisrupted為false
計算現(xiàn)在集群的狀態(tài)
遍歷現(xiàn)在所有的zone碗暗,每個zone遍歷所有node的ready condition,計算出zone的狀態(tài)梢夯。
根據(jù)zone的狀態(tài)設(shè)置allAreFullyDisrupted的值
如果zone不在zoneStates言疗,添加進zoneStates并設(shè)置狀態(tài)為stateInitial
計算過去集群的狀態(tài)
從zoneStates讀取保存的zone列表,如果不在現(xiàn)在的zone列表里颂砸,則從zoneStates移除
根據(jù)zoneStates里保存的zone狀態(tài)設(shè)置allWasFullyDisrupted值
設(shè)置zone 每秒安排多少個node來執(zhí)行taint或驅(qū)逐
當allAreFullyDisrupted為false allWasFullyDisrupted為true–之前zone未全掛噪奄,現(xiàn)在所有zone全掛。
- 遍歷所有node人乓,設(shè)置node為正常狀態(tài)勤篮。
- 啟用taint manager,執(zhí)行markNodeAsReachable–移除“node.kubernetes.io/not-ready”和“node.kubernetes.io/unreachable”的taint色罚,如果存的話碰缔,同時從zoneNoExecuteTainter隊列中移除
- 未啟用taint manager,執(zhí)行cancelPodEviction–在nodeEvictionMap設(shè)置status為unmarked戳护,然后node從zonePodEvictor隊列中移除
- 從zoneStates讀取保存的zone列表金抡,設(shè)置zone 每秒安排多少個node來執(zhí)行taint或驅(qū)逐
- 啟用taint manager,設(shè)置zoneNoExecuteTainter的速率為0
- 未啟用taint manager腌且, 設(shè)置zonePodEvictor的速率為0
- 設(shè)置所有zoneStates里的zone為stateFullDisruption
當 allAreFullyDisrupted為true allWasFullyDisrupted為false–過去所有zone全掛梗肝,現(xiàn)在所有zone未全掛
- 遍歷所有node更新nodeHealthMap里的probeTImestamp、readyTransitiontimestamp為現(xiàn)在的時間戳
- 遍歷zoneStates切蟋,重新評估zone的每秒安排多少個node來執(zhí)行taint或驅(qū)逐
- 當zone的狀態(tài)為stateNormal统捶,如果啟用taint manager,則zoneNoExecuteTainter速率設(shè)置為evictionLimiterQPS柄粹,否則喘鸟,設(shè)置zonePodEvictor的速率為evictionLimiterQPS的速率
- 當zone狀態(tài)為statePartialDisruption,如果啟用taint manager驻右,根據(jù)zone里的node數(shù)量什黑,當node數(shù)量大于largeClusterThreshold,設(shè)置zoneNoExecuteTainter速率為SecondEvictionLimiterQPS堪夭;小于等于largeClusterThreshold愕把,設(shè)置zoneNoExecuteTainter速率為0。未啟用taint manager森爽,根據(jù)zone里的node數(shù)量恨豁,當node數(shù)量大于largeClusterThreshold,設(shè)置zonePodEvictor速率為SecondEvictionLimiterQPS爬迟;小于等于largeClusterThreshold橘蜜,設(shè)置zonePodEvictorTainter速率為0。
- 當zone狀態(tài)為stateFullDisruption,如果啟用taint manager计福,則zoneNoExecuteTainter速率設(shè)置為evictionLimiterQPS跌捆,否則,設(shè)置zonePodEvictor的速率為evictionLimiterQPS的速率
- 這里不處理stateInitial狀態(tài)的zone象颖,因為下一周期佩厚,zone會變成非stateInitial,下面就是處理這個情況的
除了上面兩種情況说订,還有一個情況要進行處理抄瓦,allAreFullyDisrupted為false allWasFullyDisrupted為false,就是沒有發(fā)生集群所有zone全掛陶冷。這個時候zone有可能發(fā)生狀態(tài)轉(zhuǎn)換闺鲸,所以需要重新評估zone的速率
- 遍歷zoneStates,當保存的狀態(tài)和新的狀態(tài)不一致的時候–zone狀態(tài)發(fā)生了變化埃叭,重新評估zone的速率
- 當zone的狀態(tài)為stateNormal,如果啟用taint manager悉罕,則zoneNoExecuteTainter速率設(shè)置為evictionLimiterQPS赤屋,否則,設(shè)置zonePodEvictor的速率為evictionLimiterQPS的速率
- 當zone狀態(tài)為statePartialDisruption壁袄,如果啟用taint manager类早,根據(jù)zone里的node數(shù)量,當node數(shù)量大于largeClusterThreshold嗜逻,設(shè)置zoneNoExecuteTainter速率為SecondEvictionLimiterQPS涩僻;小于等于largeClusterThreshold,設(shè)置zoneNoExecuteTainter速率為0栈顷。未啟用taint manager逆日,根據(jù)zone里的node數(shù)量,當node數(shù)量大于largeClusterThreshold萄凤,設(shè)置zonePodEvictor速率為SecondEvictionLimiterQPS室抽;小于等于largeClusterThreshold,設(shè)置zonePodEvictorTainter速率為0靡努。
- 當zone狀態(tài)為stateFullDisruption,如果啟用taint manager册招,則zoneNoExecuteTainter速率設(shè)置為evictionLimiterQPS,否則病梢,設(shè)置zonePodEvictor的速率為evictionLimiterQPS的速率
- zoneStates里的狀態(tài)更新為新的狀態(tài)
而allAreFullyDisrupted為true allWasFullyDisrupted為true栅屏,集群一直都是掛著堂鲜,不需要處理,zone狀態(tài)沒有發(fā)生改變护奈。
4 總結(jié)
nodeLifecycleController核心邏輯如下:
啟動了以下協(xié)程:
(1)monitorNodeHealth 更新node的狀態(tài),并且更加baseTaint是否開啟痴奏,將需要處理的node加入NoExecuteTainter或者ZonePodEviction隊列厌秒。實現(xiàn)按照速率驅(qū)逐
(2)doNodeProcessingPassWorker 監(jiān)聽Node鸵闪,根據(jù)node狀態(tài)設(shè)置NoScheduler污點(這個影響調(diào)度和驅(qū)逐無關(guān))
(3)如果開啟了BaseTaint, 那么就會執(zhí)行doNoExecutingPass從NoExecuteTainter取出node設(shè)置污點(這里設(shè)置可以控制設(shè)置污點的速率)
同時如果開啟了BaseTaint,taintManger就會run, 會進行pod的驅(qū)逐
(4)如果不開啟BaseTaint, 那么就會啟動doevctionPass從ZonePodEviction取出node辟灰,進行pod驅(qū)逐
(5)doPodProcessingPassWorker會監(jiān)聽pod芥喇,設(shè)置pod狀態(tài)凰萨,如果沒有開啟BaseTaint胖眷,還會進行pod的驅(qū)逐
一般而言瘦材,kcm都是有2中設(shè)置:
pod-eviction-timeout:默認5分鐘
enable-taint-manager,TaintBasedEvictions默認true
(1)開啟驅(qū)逐,或者使用默認值
--pod-eviction-timeout=5m --enable-taint-manager=true --feature-gates=TaintBasedEvictions=true
這個時候pod-eviction-timeout是不起作用的朗和,只要node有污點眶拉,Pod會馬上驅(qū)逐憔儿。(變更Kubelet的時候要小心這個坑)
(2)不開啟污點驅(qū)逐
--pod-eviction-timeout=5m --enable-taint-manager=false --feature-gates=TaintBasedEvictions=false
這個時候pod-eviction-timeout起作用的,node notReady 5分鐘后朝刊,pod會被驅(qū)逐拾氓。