上篇文章提到固定時間窗口限流無法處理突然請求洪峰情況,本文講述的令牌桶線路算法則可以比較好的處理此場景睦番。
工作原理
- 單位時間按照一定速率勻速的生產(chǎn) token 放入桶內(nèi)类茂,直到達到桶容量上限。
- 處理請求托嚣,每次嘗試獲取一個或多個令牌巩检,如果拿到則處理請求,失敗則拒絕請求示启。
優(yōu)缺點
優(yōu)點
可以有效處理瞬間的突發(fā)流量兢哭,桶內(nèi)存量 token 即可作為流量緩沖區(qū)平滑處理突發(fā)流量。
缺點
實現(xiàn)較為復(fù)雜夫嗓。
代碼實現(xiàn)
core/limit/tokenlimit.go
分布式環(huán)境下考慮使用 redis 作為桶和令牌的存儲容器迟螺,采用 lua 腳本實現(xiàn)整個算法流程冲秽。
redis lua 腳本
-- 每秒生成token數(shù)量即token生成速度
local rate = tonumber(ARGV[1])
-- 桶容量
local capacity = tonumber(ARGV[2])
-- 當(dāng)前時間戳
local now = tonumber(ARGV[3])
-- 當(dāng)前請求token數(shù)量
local requested = tonumber(ARGV[4])
-- 需要多少秒才能填滿桶
local fill_time = capacity/rate
-- 向下取整,ttl為填滿時間的2倍
local ttl = math.floor(fill_time*2)
-- 當(dāng)前時間桶容量
local last_tokens = tonumber(redis.call("get", KEYS[1]))
-- 如果當(dāng)前桶容量為0,說明是第一次進入,則默認容量為桶的最大容量
if last_tokens == nil then
last_tokens = capacity
end
-- 上一次刷新的時間
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- 第一次進入則設(shè)置刷新時間為0
if last_refreshed == nil then
last_refreshed = 0
end
-- 距離上次請求的時間跨度
local delta = math.max(0, now-last_refreshed)
-- 距離上次請求的時間跨度,總共能生產(chǎn)token的數(shù)量,如果超多最大容量則丟棄多余的token
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 本次請求token數(shù)量是否足夠
local allowed = filled_tokens >= requested
-- 桶剩余數(shù)量
local new_tokens = filled_tokens
-- 允許本次token申請,計算剩余數(shù)量
if allowed then
new_tokens = filled_tokens - requested
end
-- 設(shè)置剩余token數(shù)量
redis.call("setex", KEYS[1], ttl, new_tokens)
-- 設(shè)置刷新時間
redis.call("setex", KEYS[2], ttl, now)
return allowed
令牌桶限流器定義
type TokenLimiter struct {
// 每秒生產(chǎn)速率
rate int
// 桶容量
burst int
// 存儲容器
store *redis.Redis
// redis key
tokenKey string
// 桶刷新時間key
timestampKey string
// lock
rescueLock sync.Mutex
// redis健康標(biāo)識
redisAlive uint32
// redis故障時采用進程內(nèi) 令牌桶限流器
rescueLimiter *xrate.Limiter
// redis監(jiān)控探測任務(wù)標(biāo)識
monitorStarted bool
}
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
tokenKey := fmt.Sprintf(tokenFormat, key)
timestampKey := fmt.Sprintf(timestampFormat, key)
return &TokenLimiter{
rate: rate,
burst: burst,
store: store,
tokenKey: tokenKey,
timestampKey: timestampKey,
redisAlive: 1,
rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
}
}
獲取令牌
func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// 判斷redis是否健康
// redis故障時采用進程內(nèi)限流器
// 兜底保障
if atomic.LoadUint32(&lim.redisAlive) == 0 {
return lim.rescueLimiter.AllowN(now, n)
}
// 執(zhí)行腳本獲取令牌
resp, err := lim.store.Eval(
script,
[]string{
lim.tokenKey,
lim.timestampKey,
},
[]string{
strconv.Itoa(lim.rate),
strconv.Itoa(lim.burst),
strconv.FormatInt(now.Unix(), 10),
strconv.Itoa(n),
})
// redis allowed == false
// Lua boolean false -> r Nil bulk reply
// 特殊處理key不存在的情況
if err == redis.Nil {
return false
} else if err != nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
// 執(zhí)行異常,開啟redis健康探測任務(wù)
// 同時采用進程內(nèi)限流器作為兜底
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
code, ok := resp.(int64)
if !ok {
logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
// redis allowed == true
// Lua boolean true -> r integer reply with value of 1
return code == 1
}
redis 故障時兜底策略
兜底策略的設(shè)計考慮得非常細節(jié)煮仇,當(dāng) redis
不可用的時候劳跃,啟動單機版的 ratelimit
做備用限流,確闭愕妫基本的限流可用刨仑,服務(wù)不會被沖垮。
// 開啟redis健康探測
func (lim *TokenLimiter) startMonitor() {
lim.rescueLock.Lock()
defer lim.rescueLock.Unlock()
// 防止重復(fù)開啟
if lim.monitorStarted {
return
}
// 設(shè)置任務(wù)和健康標(biāo)識
lim.monitorStarted = true
atomic.StoreUint32(&lim.redisAlive, 0)
// 健康探測
go lim.waitForRedis()
}
// redis健康探測定時任務(wù)
func (lim *TokenLimiter) waitForRedis() {
ticker := time.NewTicker(pingInterval)
// 健康探測成功時回調(diào)此函數(shù)
defer func() {
ticker.Stop()
lim.rescueLock.Lock()
lim.monitorStarted = false
lim.rescueLock.Unlock()
}()
for range ticker.C {
// ping屬于redis內(nèi)置健康探測命令
if lim.store.Ping() {
// 健康探測成功夹姥,設(shè)置健康標(biāo)識
atomic.StoreUint32(&lim.redisAlive, 1)
return
}
}
}
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們杉武!
微信交流群
關(guān)注『微服務(wù)實踐』公眾號并點擊 交流群 獲取社區(qū)群二維碼。