漏斗桶/令牌桶確實能夠保護(hù)系統(tǒng)不被拖垮, 但不管漏斗桶還是令牌桶, 其防護(hù)思路都是設(shè)定一個指標(biāo), 當(dāng)超過該指標(biāo)后就阻止或減少流量的繼續(xù)進(jìn)入媒吗,當(dāng)系統(tǒng)負(fù)載降低到某一水平后則恢復(fù)流量的進(jìn)入凿跳。但其通常都是被動的蔽介,其實際效果取決于限流閾值設(shè)置是否合理奋渔,但往往設(shè)置合理不是一件容易的事情.
項目日常維護(hù)中, 經(jīng)常能夠看到某某同學(xué)在群里說:xx系統(tǒng)429了, 然后經(jīng)過一番查找后發(fā)現(xiàn)是一波突然的活動流量, 只能申請再新增幾臺機(jī)器. 過了幾天 OP 發(fā)現(xiàn)該集群的流量達(dá)不到預(yù)期又下掉了幾臺機(jī)器, 然后又開始一輪新的循環(huán).
這里先不討論集群自動伸縮的問題. 這里提出一些問題
- 集群增加機(jī)器或者減少機(jī)器限流閾值是否要重新設(shè)置?
- 設(shè)置限流閾值的依據(jù)是什么?
- 人力運(yùn)維成本是否過高?
- 當(dāng)調(diào)用方反饋429時, 這個時候重新設(shè)置限流, 其實流量高峰已經(jīng)過了重新評估限流是否有意義?
這些其實都是采用漏斗桶/令牌桶的缺點, 總體來說就是太被動, 不能快速適應(yīng)流量變化
自適應(yīng)限流
對于自適應(yīng)限流來說, 一般都是結(jié)合系統(tǒng)的 Load施绎、CPU 使用率以及應(yīng)用的入口 QPS妒穴、平均響應(yīng)時間和并發(fā)量等幾個維度的監(jiān)控指標(biāo)览祖,通過自適應(yīng)的流控策略, 讓系統(tǒng)的入口流量和系統(tǒng)的負(fù)載達(dá)到一個平衡巡扇,讓系統(tǒng)盡可能跑在最大吞吐量的同時保證系統(tǒng)整體的穩(wěn)定性。
比較出名的自適應(yīng)限流的實現(xiàn)是 Alibaba Sentinel. 不過由于提前沒有發(fā)現(xiàn) Sentinel 有個 golang 版本的實現(xiàn), 本篇文章就以 Kratos 的 BBR 實現(xiàn)探討自適應(yīng)限流的原理.
Kratos 自適應(yīng)限流
借鑒了 Sentinel 項目的自適應(yīng)限流系統(tǒng), 通過綜合分析服務(wù)的 cpu 使用率垮衷、請求成功的 qps 和請求成功的 rt 來做自適應(yīng)限流保護(hù)厅翔。
- cpu: 最近 1s 的 CPU 使用率均值,使用滑動平均計算搀突,采樣周期是 250ms
- inflight: 當(dāng)前處理中正在處理的請求數(shù)量
- pass: 請求處理成功的量
- rt: 請求成功的響應(yīng)耗時
限流公式
cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight
- MaxPass 表示最近 5s 內(nèi)刀闷,單個采樣窗口中最大的請求數(shù)
- MinRt 表示最近 5s 內(nèi),單個采樣窗口中最小的響應(yīng)時間
- windows 表示一秒內(nèi)采樣窗口的數(shù)量仰迁,默認(rèn)配置中是 5s 50 個采樣甸昏,那么 windows 的值為 10
kratos 中間件實現(xiàn)
func (b *RateLimiter) Limit() HandlerFunc {
return func(c *Context) {
uri := fmt.Sprintf("%s://%s%s", c.Request.URL.Scheme, c.Request.Host, c.Request.URL.Path)
limiter := b.group.Get(uri)
done, err := limiter.Allow(c)
if err != nil {
_metricServerBBR.Inc(uri, c.Request.Method)
c.JSON(nil, err)
c.Abort()
return
}
defer func() {
done(limit.DoneInfo{Op: limit.Success})
b.printStats(uri, limiter)
}()
c.Next()
}
}
使用方式
e := bm.DefaultServer(nil)
limiter := bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
e.GET("/api", myHandler)
源碼實現(xiàn)
Allow
func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo), error) {
allowOpts := limit.DefaultAllowOpts()
for _, opt := range opts {
opt.Apply(&allowOpts)
}
if l.shouldDrop() { // 判斷是否觸發(fā)限流
return nil, ecode.LimitExceed
}
atomic.AddInt64(&l.inFlight, 1) // 增加正在處理請求數(shù)
stime := time.Since(initTime) // 記錄請求到來的時間
return func(do limit.DoneInfo) {
rt := int64((time.Since(initTime) - stime) / time.Millisecond) // 請求處理成功的響應(yīng)時長
l.rtStat.Add(rt) // 增加rtStat響應(yīng)耗時的統(tǒng)計
atomic.AddInt64(&l.inFlight, -1) // 請求處理成功后, 減少正在處理的請求數(shù)
switch do.Op {
case limit.Success:
l.passStat.Add(1) // 處理成功后增加成功處理請求數(shù)的統(tǒng)計
return
default:
return
}
}, nil
}
shouldDrop
func (l *BBR) shouldDrop() bool {
// 判斷目前cpu的使用率是否達(dá)到設(shè)置的CPU的限制, 默認(rèn)值800
if l.cpu() < l.conf.CPUThreshold {
// 如果上一次舍棄請求的時間是0, 那么說明沒有限流的需求, 直接返回
prevDrop, _ := l.prevDrop.Load().(time.Duration)
if prevDrop == 0 {
return false
}
// 如果上一次請求的時間與當(dāng)前的請求時間小于1s, 那么說明有限流的需求
if time.Since(initTime)-prevDrop <= time.Second {
if atomic.LoadInt32(&l.prevDropHit) == 0 {
atomic.StoreInt32(&l.prevDropHit, 1)
}
// 增加正在處理的請求的數(shù)量
inFlight := atomic.LoadInt64(&l.inFlight)
// 判斷正在處理的請求數(shù)是否達(dá)到系統(tǒng)的最大的請求數(shù)量
return inFlight > 1 && inFlight > l.maxFlight()
}
// 清空當(dāng)前的prevDrop
l.prevDrop.Store(time.Duration(0))
return false
}
// 增加正在處理的請求的數(shù)量
inFlight := atomic.LoadInt64(&l.inFlight)
// 判斷正在處理的請求數(shù)是否達(dá)到系統(tǒng)的最大的請求數(shù)量
drop := inFlight > 1 && inFlight > l.maxFlight()
if drop {
prevDrop, _ := l.prevDrop.Load().(time.Duration)
// 如果判斷達(dá)到了最大請求數(shù)量, 并且當(dāng)前有限流需求
if prevDrop != 0 {
return drop
}
l.prevDrop.Store(time.Since(initTime))
}
return drop
}
maxFlight
該函數(shù)是核心函數(shù). 其計算公式: MaxPass * MinRt * windows / 1000. maxPASS/minRT都是基于metric.RollingCounter
來實現(xiàn)的, 限于篇幅原因這里就不再具體看其實現(xiàn)(想看的可以去看rolling_counter_test.go還是蠻容易理解的)
func (l *BBR) maxFlight() int64 {
return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0 + 0.5))
}
- winBucketPerSec: 每秒內(nèi)的采樣數(shù)量,其計算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默認(rèn)值10s, conf.WinBucket默認(rèn)值100. 簡化下公式: 1/(10/100) = 10, 所以每秒內(nèi)的采樣數(shù)就是10
// 單個采樣窗口在一個采樣周期中的最大的請求數(shù), 默認(rèn)的采樣窗口是10s, 采樣bucket數(shù)量100
func (l *BBR) maxPASS() int64 {
rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
return rawMaxPass
}
// 遍歷100個采樣bucket, 找到采樣bucket中最大的請求數(shù)
rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
var result = 1.0
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
count := 0.0
for _, p := range bucket.Points {
count += p
}
result = math.Max(result, count)
}
return result
}))
if rawMaxPass == 0 {
rawMaxPass = 1
}
atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
return rawMaxPass
}
// 單個采樣窗口中最小的響應(yīng)時間
func (l *BBR) minRT() int64 {
rawMinRT := atomic.LoadInt64(&l.rawMinRt)
if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
return rawMinRT
}
// 遍歷100個采樣bucket, 找到采樣bucket中最小的響應(yīng)時間
rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
var result = math.MaxFloat64
for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
bucket := iterator.Bucket()
if len(bucket.Points) == 0 {
continue
}
total := 0.0
for _, p := range bucket.Points {
total += p
}
avg := total / float64(bucket.Count)
result = math.Min(result, avg)
}
return result
})))
if rawMinRT <= 0 {
rawMinRT = 1
}
atomic.StoreInt64(&l.rawMinRt, rawMinRT)
return rawMinRT
}