k8s 中有許多優(yōu)秀的包都可以在平時(shí)的開發(fā)中借鑒與使用于个,比如,任務(wù)的定時(shí)輪詢暮顺、高可用的實(shí)現(xiàn)厅篓、日志處理、緩存使用等都是獨(dú)立的包捶码,可以直接引用羽氮。本篇文章會(huì)介紹 k8s 中定時(shí)任務(wù)的實(shí)現(xiàn),k8s 中定時(shí)任務(wù)都是通過 wait 包實(shí)現(xiàn)的惫恼,wait 包在 k8s 的多個(gè)組件中都有用到档押,以下是 wait 包在 kubelet 中的幾處使用:
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
...
// kubelet 每5分鐘一次從 apiserver 獲取證書
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
if err != nil {
return err
}
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)
if err != nil {
return err
}
...
}
...
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// 持續(xù)監(jiān)聽 pod 的變化
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
...
}
golang 中可以通過 time.Ticker 實(shí)現(xiàn)定時(shí)任務(wù)的執(zhí)行,但在 k8s 中用了更原生的方式祈纯,使用 time.Timer 實(shí)現(xiàn)的令宿。time.Ticker 和 time.Timer 的使用區(qū)別如下:
- ticker 只要定義完成,從此刻開始計(jì)時(shí)腕窥,不需要任何其他的操作粒没,每隔固定時(shí)間都會(huì)自動(dòng)觸發(fā)。
- timer 定時(shí)器是到了固定時(shí)間后會(huì)執(zhí)行一次簇爆,僅執(zhí)行一次
- 如果 timer 定時(shí)器要每隔間隔的時(shí)間執(zhí)行癞松,實(shí)現(xiàn) ticker 的效果,使用
func (t *Timer) Reset(d Duration) bool
一個(gè)示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
timer1 := time.NewTimer(2 * time.Second)
ticker1 := time.NewTicker(2 * time.Second)
wg.Add(1)
go func(t *time.Ticker) {
defer wg.Done()
for {
<-t.C
fmt.Println("exec ticker", time.Now().Format("2006-01-02 15:04:05"))
}
}(ticker1)
wg.Add(1)
go func(t *time.Timer) {
defer wg.Done()
for {
<-t.C
fmt.Println("exec timer", time.Now().Format("2006-01-02 15:04:05"))
t.Reset(2 * time.Second)
}
}(timer1)
wg.Wait()
}
一入蛆、wait 包中的核心代碼
核心代碼(k8s.io/apimachinery/pkg/util/wait/wait.go):
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
var t *time.Timer
var sawTimeout bool
for {
select {
case <-stopCh:
return
default:
}
jitteredPeriod := period
if jitterFactor > 0.0 {
jitteredPeriod = Jitter(period, jitterFactor)
}
if !sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)
}
select {
case <-stopCh:
return
case <-t.C:
sawTimeout = true
}
}
}
...
func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {
if t == nil {
return time.NewTimer(d)
}
if !t.Stop() && !sawTimeout {
<-t.C
}
t.Reset(d)
return t
}
幾個(gè)關(guān)鍵點(diǎn)的說明:
- 1拦惋、如果 sliding 為 true,則在 f() 運(yùn)行之后計(jì)算周期安寺。如果為 false,那么 period 包含 f() 的執(zhí)行時(shí)間首尼。
- 2挑庶、在 golang 中 select 沒有優(yōu)先級選擇,為了避免額外執(zhí)行 f(),在每次循環(huán)開始后會(huì)先判斷 stopCh chan软能。
k8s 中 wait 包其實(shí)是對 time.Timer 做了一層封裝實(shí)現(xiàn)迎捺。
二、wait 包常用的方法
1查排、定期執(zhí)行一個(gè)函數(shù)凳枝,永不停止,可以使用 Forever 方法:
func Forever(f func(), period time.Duration)
2、在需要的時(shí)候停止循環(huán)岖瑰,那么可以使用下面的方法叛买,增加一個(gè)用于停止的 chan 即可,方法定義如下:
func Until(f func(), period time.Duration, stopCh <-chan struct{})
上面的第三個(gè)參數(shù) stopCh 就是用于退出無限循環(huán)的標(biāo)志蹋订,停止的時(shí)候我們 close 掉這個(gè) chan 就可以了率挣。
3、有時(shí)候露戒,我們還會(huì)需要在運(yùn)行前去檢查先決條件椒功,在條件滿足的時(shí)候才去運(yùn)行某一任務(wù),這時(shí)候可以使用 Poll 方法:
func Poll(interval, timeout time.Duration, condition ConditionFunc)
這個(gè)函數(shù)會(huì)以 interval 為間隔智什,不斷去檢查 condition 條件是否為真动漾,如果為真則可以繼續(xù)后續(xù)處理;如果指定了 timeout 參數(shù)荠锭,則該函數(shù)也可以只常識指定的時(shí)間旱眯。
4、PollUntil 方法和上面的類似节沦,但是沒有 timeout 參數(shù)键思,多了一個(gè) stopCh 參數(shù),如下所示:
PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
此外還有 PollImmediate 甫贯、 PollInfinite 和 PollImmediateInfinite 方法吼鳞。
三、總結(jié)
本篇文章主要講了 k8s 中定時(shí)任務(wù)的實(shí)現(xiàn)與對應(yīng)包(wait)中方法的使用叫搁。通過閱讀 k8s 的源代碼赔桌,可以發(fā)現(xiàn) k8s 中許多功能的實(shí)現(xiàn)也都是我們需要在平時(shí)工作中用的,其大部分包的性能都是經(jīng)過大規(guī)目事撸考驗(yàn)的疾党,通過使用其相關(guān)的工具包不僅能學(xué)到大量的編程技巧也能避免自己造輪子。