為什么需要降載
微服務集群中阵具,調(diào)用鏈路錯綜復雜,作為服務提供者需要有一種保護自己的機制定铜,防止調(diào)用方無腦調(diào)用壓垮自己阳液,保證自身服務的高可用。
最常見的保護機制莫過于限流機制揣炕,使用限流器的前提是必須知道自身的能夠處理的最大并發(fā)數(shù)帘皿,一般在上線前通過壓測來得到最大并發(fā)數(shù),而且日常請求過程中每個接口的限流參數(shù)都不一樣畸陡,同時系統(tǒng)一直在不斷的迭代其處理能力往往也會隨之變化鹰溜,每次上線前都需要進行壓測然后調(diào)整限流參數(shù)變得非常繁瑣。
那么有沒有一種更加簡潔的限流機制能實現(xiàn)最大限度的自我保護呢丁恭?
什么是自適應降載
自適應降載能非常智能的保護服務自身曹动,根據(jù)服務自身的系統(tǒng)負載動態(tài)判斷是否需要降載。
設(shè)計目標:
- 保證系統(tǒng)不被拖垮涩惑。
- 在系統(tǒng)穩(wěn)定的前提下,保持系統(tǒng)的吞吐量桑驱。
那么關(guān)鍵就在于如何衡量服務自身的負載呢竭恬?
判斷高負載主要取決于兩個指標:
- cpu 是否過載。
- 最大并發(fā)數(shù)是否過載熬的。
以上兩點同時滿足時則說明服務處于高負載狀態(tài)痊硕,則進行自適應降載。
同時也應該注意高并發(fā)場景 cpu 負載押框、并發(fā)數(shù)往往波動比較大岔绸,從數(shù)據(jù)上我們稱這種現(xiàn)象為毛刺,毛刺現(xiàn)象可能會導致系統(tǒng)一直在頻繁的進行自動降載操作,所以我們一般獲取一段時間內(nèi)的指標均值來使指標更加平滑盒揉。實現(xiàn)上可以采用準確的記錄一段時間內(nèi)的指標然后直接計算平均值晋被,但是需要占用一定的系統(tǒng)資源。
統(tǒng)計學上有一種算法:滑動平均(exponential moving average)刚盈,可以用來估算變量的局部均值羡洛,使得變量的更新與歷史一段時間的歷史取值有關(guān),無需記錄所有的歷史局部變量就可以實現(xiàn)平均值估算藕漱,非常節(jié)省寶貴的服務器資源欲侮。
滑動平均算法原理 參考這篇文章講的非常清楚。
變量 V 在 t 時刻記為 Vt肋联,θt 為變量 V 在 t 時刻的取值威蕉,即在不使用滑動平均模型時 Vt=θt,在使用滑動平均模型后橄仍,Vt 的更新公式如下:
Vt=β?Vt?1+(1?β)?θt
- β = 0 時 Vt = θt
- β = 0.9 時,大致相當于過去 10 個 θt 值的平均
- β = 0.99 時,大致相當于過去 100 個 θt 值的平均
代碼實現(xiàn)
接下來我們來看下 go-zero 自適應降載的代碼實現(xiàn)韧涨。
core/load/adaptiveshedder.go
自適應降載接口定義:
// 回調(diào)函數(shù)
Promise interface {
// 請求成功時回調(diào)此函數(shù)
Pass()
// 請求失敗時回調(diào)此函數(shù)
Fail()
}
// 降載接口定義
Shedder interface {
// 降載檢查
// 1. 允許調(diào)用,需手動執(zhí)行 Promise.accept()/reject()上報實際執(zhí)行任務結(jié)構(gòu)
// 2. 拒絕調(diào)用沙兰,將會直接返回err:服務過載錯誤 ErrServiceOverloaded
Allow() (Promise, error)
}
接口定義非常精簡意味使用起來其實非常簡單氓奈,對外暴露一個`Allow()(Promise,error)。
go-zero 使用示例:
業(yè)務中只需調(diào)該方法判斷是否降載舀奶,如果被降載則直接結(jié)束流程,否則執(zhí)行業(yè)務最后使用返回值 Promise 根據(jù)執(zhí)行結(jié)果回調(diào)結(jié)果即可斋射。
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
ensureSheddingStat()
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (val interface{}, err error) {
sheddingStat.IncrementTotal()
var promise load.Promise
// 檢查是否被降載
promise, err = shedder.Allow()
// 降載,記錄相關(guān)日志與指標
if err != nil {
metrics.AddDrop()
sheddingStat.IncrementDrop()
return
}
// 最后回調(diào)執(zhí)行結(jié)果
defer func() {
// 執(zhí)行失敗
if err == context.DeadlineExceeded {
promise.Fail()
// 執(zhí)行成功
} else {
sheddingStat.IncrementPass()
promise.Pass()
}
}()
// 執(zhí)行業(yè)務方法
return handler(ctx, req)
}
}
接口實現(xiàn)類定義 :
主要包含三類屬性
- cpu 負載閾值:超過此值意味著 cpu 處于高負載狀態(tài)南蓬。
- 冷卻期:假如服務之前被降載過赘方,那么將進入冷卻期窄陡,目的在于防止降載過程中負載還未降下來立馬加壓導致來回抖動涂圆。因為降低負載需要一定的時間卡辰,處于冷卻期內(nèi)應該繼續(xù)檢查并發(fā)數(shù)是否超過限制,超過限制則繼續(xù)丟棄請求。
- 并發(fā)數(shù):當前正在處理的并發(fā)數(shù)翠霍,當前正在處理的并發(fā)平均數(shù)祸憋,以及最近一段內(nèi)的請求數(shù)與響應時間,目的是為了計算當前正在處理的并發(fā)數(shù)是否大于系統(tǒng)可承載的最大并發(fā)數(shù)拦赠。
// option參數(shù)模式
ShedderOption func(opts *shedderOptions)
// 可選配置參數(shù)
shedderOptions struct {
// 滑動時間窗口大小
window time.Duration
// 滑動時間窗口數(shù)量
buckets int
// cpu負載臨界值
cpuThreshold int64
}
// 自適應降載結(jié)構(gòu)體牌里,需實現(xiàn) Shedder 接口
adaptiveShedder struct {
// cpu負載臨界值
// 高于臨界值代表高負載需要降載保證服務
cpuThreshold int64
// 1s內(nèi)有多少個桶
windows int64
// 并發(fā)數(shù)
flying int64
// 滑動平滑并發(fā)數(shù)
avgFlying float64
// 自旋鎖喳篇,一個服務共用一個降載
// 統(tǒng)計當前正在處理的請求數(shù)時必須加鎖
// 無損并發(fā)挺尿,提高性能
avgFlyingLock syncx.SpinLock
// 最后一次拒絕時間
dropTime *syncx.AtomicDuration
// 最近是否被拒絕過
droppedRecently *syncx.AtomicBool
// 請求數(shù)統(tǒng)計馁害,通過滑動時間窗口記錄最近一段時間內(nèi)指標
passCounter *collection.RollingWindow
// 響應時間統(tǒng)計碘菜,通過滑動時間窗口記錄最近一段時間內(nèi)指標
rtCounter *collection.RollingWindow
}
自適應降載構(gòu)造器:
func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
// 為了保證代碼統(tǒng)一
// 當開發(fā)者關(guān)閉時返回默認的空實現(xiàn)凹蜈,實現(xiàn)代碼統(tǒng)一
// go-zero很多地方都采用了這種設(shè)計,比如Breaker忍啸,日志組件
if !enabled.True() {
return newNopShedder()
}
// options模式設(shè)置可選配置參數(shù)
options := shedderOptions{
// 默認統(tǒng)計最近5s內(nèi)數(shù)據(jù)
window: defaultWindow,
// 默認桶數(shù)量50個
buckets: defaultBuckets,
// cpu負載
cpuThreshold: defaultCpuThreshold,
}
for _, opt := range opts {
opt(&options)
}
// 計算每個窗口間隔時間仰坦,默認為100ms
bucketDuration := options.window / time.Duration(options.buckets)
return &adaptiveShedder{
// cpu負載
cpuThreshold: options.cpuThreshold,
// 1s的時間內(nèi)包含多少個滑動窗口單元
windows: int64(time.Second / bucketDuration),
// 最近一次拒絕時間
dropTime: syncx.NewAtomicDuration(),
// 最近是否被拒絕過
droppedRecently: syncx.NewAtomicBool(),
// qps統(tǒng)計,滑動時間窗口
// 忽略當前正在寫入窗口(桶)计雌,時間周期不完整可能導致數(shù)據(jù)異常
passCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()),
// 響應時間統(tǒng)計悄晃,滑動時間窗口
// 忽略當前正在寫入窗口(桶),時間周期不完整可能導致數(shù)據(jù)異常
rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()),
}
}
降載檢查 Allow()
:
檢查當前請求是否應該被丟棄白粉,被丟棄業(yè)務側(cè)需要直接中斷請求保護服務传泊,也意味著降載生效同時進入冷卻期。如果放行則返回 promise鸭巴,等待業(yè)務側(cè)執(zhí)行回調(diào)函數(shù)執(zhí)行指標統(tǒng)計眷细。
// 降載檢查
func (as *adaptiveShedder) Allow() (Promise, error) {
// 檢查請求是否被丟棄
if as.shouldDrop() {
// 設(shè)置drop時間
as.dropTime.Set(timex.Now())
// 最近已被drop
as.droppedRecently.Set(true)
// 返回過載
return nil, ErrServiceOverloaded
}
// 正在處理請求數(shù)加1
as.addFlying(1)
// 這里每個允許的請求都會返回一個新的promise對象
// promise內(nèi)部持有了降載指針對象
return &promise{
start: timex.Now(),
shedder: as,
}, nil
}
檢查是否應該被丟棄shouldDrop()
:
// 請求是否應該被丟棄
func (as *adaptiveShedder) shouldDrop() bool {
// 當前cpu負載超過閾值
// 服務處于冷卻期內(nèi)應該繼續(xù)檢查負載并嘗試丟棄請求
if as.systemOverloaded() || as.stillHot() {
// 檢查正在處理的并發(fā)是否超出當前可承載的最大并發(fā)數(shù)
// 超出則丟棄請求
if as.highThru() {
flying := atomic.LoadInt64(&as.flying)
as.avgFlyingLock.Lock()
avgFlying := as.avgFlying
as.avgFlyingLock.Unlock()
msg := fmt.Sprintf(
"dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
logx.Error(msg)
stat.Report(msg)
return true
}
}
return false
}
cpu 閾值檢查 systemOverloaded()
:
cpu 負載值計算算法采用的滑動平均算法,防止毛刺現(xiàn)象鹃祖。每隔 250ms 采樣一次 β 為 0.95溪椎,大概相當于歷史 20 次 cpu 負載的平均值,時間周期約為 5s恬口。
// cpu 是否過載
func (as *adaptiveShedder) systemOverloaded() bool {
return systemOverloadChecker(as.cpuThreshold)
}
// cpu 檢查函數(shù)
systemOverloadChecker = func(cpuThreshold int64) bool {
return stat.CpuUsage() >= cpuThreshold
}
// cpu滑動平均值
curUsage := internal.RefreshCpu()
prevUsage := atomic.LoadInt64(&cpuUsage)
// cpu = cpu??1 * beta + cpu? * (1 - beta)
// 滑動平均算法
usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))
atomic.StoreInt64(&cpuUsage, usage)
檢查是否處于冷卻期 stillHot
:
判斷當前系統(tǒng)是否處于冷卻期,如果處于冷卻期內(nèi)校读,應該繼續(xù)嘗試檢查是否丟棄請求。主要是防止系統(tǒng)在過載恢復過程中負載還未降下來祖能,立馬又增加壓力導致來回抖動歉秫,此時應該嘗試繼續(xù)丟棄請求。
func (as *adaptiveShedder) stillHot() bool {
// 最近沒有丟棄請求
// 說明服務正常
if !as.droppedRecently.True() {
return false
}
// 不在冷卻期
dropTime := as.dropTime.Load()
if dropTime == 0 {
return false
}
// 冷卻時間默認為1s
hot := timex.Since(dropTime) < coolOffDuration
// 不在冷卻期养铸,正常處理請求中
if !hot {
// 重置drop記錄
as.droppedRecently.Set(false)
}
return hot
}
檢查當前正在處理的并發(fā)數(shù)highThru()
:
一旦 當前處理的并發(fā)數(shù) > 并發(fā)數(shù)承載上限 則進入降載狀態(tài)雁芙。
這里為什么要加鎖呢轧膘?因為自適應降載時全局在使用的,為了保證并發(fā)數(shù)平均值正確性兔甘。
為什么這里要加自旋鎖呢谎碍?因為并發(fā)處理過程中,可以不阻塞其他的 goroutine 執(zhí)行任務洞焙,采用無鎖并發(fā)提高性能蟆淀。
func (as *adaptiveShedder) highThru() bool {
// 加鎖
as.avgFlyingLock.Lock()
// 獲取滑動平均值
// 每次請求結(jié)束后更新
avgFlying := as.avgFlying
// 解鎖
as.avgFlyingLock.Unlock()
// 系統(tǒng)此時最大并發(fā)數(shù)
maxFlight := as.maxFlight()
// 正在處理的并發(fā)數(shù)和平均并發(fā)數(shù)是否大于系統(tǒng)的最大并發(fā)數(shù)
return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
}
如何得到正在處理的并發(fā)數(shù)與平均并發(fā)數(shù)呢?
當前正在的處理并發(fā)數(shù)統(tǒng)計其實非常簡單澡匪,每次允許請求時并發(fā)數(shù) +1熔任,請求完成后 通過 promise 對象回調(diào)-1 即可,并利用滑動平均算法求解平均并發(fā)數(shù)即可唁情。
type promise struct {
// 請求開始時間
// 統(tǒng)計請求處理耗時
start time.Duration
shedder *adaptiveShedder
}
func (p *promise) Fail() {
// 請求結(jié)束笋敞,當前正在處理請求數(shù)-1
p.shedder.addFlying(-1)
}
func (p *promise) Pass() {
// 響應時間,單位毫秒
rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
// 請求結(jié)束荠瘪,當前正在處理請求數(shù)-1
p.shedder.addFlying(-1)
p.shedder.rtCounter.Add(math.Ceil(rt))
p.shedder.passCounter.Add(1)
}
func (as *adaptiveShedder) addFlying(delta int64) {
flying := atomic.AddInt64(&as.flying, delta)
// 請求結(jié)束后夯巷,統(tǒng)計當前正在處理的請求并發(fā)
if delta < 0 {
as.avgFlyingLock.Lock()
// 估算當前服務近一段時間內(nèi)的平均請求數(shù)
as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
as.avgFlyingLock.Unlock()
}
}
得到了當前的系統(tǒng)數(shù)還不夠 ,我們還需要知道當前系統(tǒng)能夠處理并發(fā)數(shù)的上限哀墓,即最大并發(fā)數(shù)趁餐。
請求通過數(shù)與響應時間都是通過滑動窗口來實現(xiàn)的,關(guān)于滑動窗口的實現(xiàn)可以參考 自適應熔斷器
那篇文章篮绰。
當前系統(tǒng)的最大并發(fā)數(shù) = 窗口單位時間內(nèi)的最大通過數(shù)量 * 窗口單位時間內(nèi)的最小響應時間后雷。
// 計算每秒系統(tǒng)的最大并發(fā)數(shù)
// 最大并發(fā)數(shù) = 最大請求數(shù)(qps)* 最小響應時間(rt)
func (as *adaptiveShedder) maxFlight() int64 {
// windows = buckets per second
// maxQPS = maxPASS * windows
// minRT = min average response time in milliseconds
// maxQPS * minRT / milliseconds_per_second
// as.maxPass()*as.windows - 每個桶最大的qps * 1s內(nèi)包含桶的數(shù)量
// as.minRt()/1e3 - 窗口所有桶中最小的平均響應時間 / 1000ms這里是為了轉(zhuǎn)換成秒
return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
}
// 滑動時間窗口內(nèi)有多個桶
// 找到請求數(shù)最多的那個
// 每個桶占用的時間為 internal ms
// qps指的是1s內(nèi)的請求數(shù),qps: maxPass * time.Second/internal
func (as *adaptiveShedder) maxPass() int64 {
var result float64 = 1
// 當前時間窗口內(nèi)請求數(shù)最多的桶
as.passCounter.Reduce(func(b *collection.Bucket) {
if b.Sum > result {
result = b.Sum
}
})
return int64(result)
}
// 滑動時間窗口內(nèi)有多個桶
// 計算最小的平均響應時間
// 因為需要計算近一段時間內(nèi)系統(tǒng)能夠處理的最大并發(fā)數(shù)
func (as *adaptiveShedder) minRt() float64 {
// 默認為1000ms
result := defaultMinRt
as.rtCounter.Reduce(func(b *collection.Bucket) {
if b.Count <= 0 {
return
}
// 請求平均響應時間
avg := math.Round(b.Sum / float64(b.Count))
if avg < result {
result = avg
}
})
return result
}
參考資料
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們吠各!
微信交流群
關(guān)注『微服務實踐』公眾號并點擊 交流群 獲取社區(qū)群二維碼臀突。