用 go 實現(xiàn)一個分布式限流器

項目中需要對 api 的接口進(jìn)行限流,但是麻煩的是草慧,api 可能有多個節(jié)點(diǎn)桶蛔,傳統(tǒng)的本地限流無法處理這個問題。限流的算法有很多漫谷,比如計數(shù)器法仔雷,漏斗法,令牌桶法舔示,等等碟婆。各有利弊,相關(guān)博文網(wǎng)上很多惕稻,這里不再贅述竖共。

項目的要求主要有以下幾點(diǎn):

  1. 支持本地/分布式限流,接口統(tǒng)一
  2. 支持多種限流算法的切換
  3. 方便配置俺祠,配置方式不確定

go 語言不是很支持 OOP公给,我在實現(xiàn)的時候是按 Java 的思路走的,所以看起來有點(diǎn)不倫不類锻煌,希望能拋磚引玉妓布。

1. 接口定義

package ratelimit

import "time"

// 限流器接口
type Limiter interface {
    Acquire() error
    TryAcquire() bool
}

// 限流定義接口
type Limit interface {
    Name() string
    Key() string
    Period() time.Duration
    Count() int32
    LimitType() LimitType
}

// 支持 burst
type BurstLimit interface {
    Limit
    BurstCount() int32
}

// 分布式定義的 burst
type DistLimit interface {
    Limit
    ClusterNum() int32
}

type LimitType int32
const (
    CUSTOM LimitType = iota
    IP
)

Limiter 接口參考了 Google 的 guava 包里的 Limiter 實現(xiàn)。Acquire 接口是阻塞接口宋梧,其實還需要加上 context 來保證調(diào)用鏈安全匣沼,因為實際項目中并沒有用到 Acquire 接口,所以沒有實現(xiàn)完善捂龄;同理释涛,超時時間的支持也可以通過添加新接口繼承自 Limiter 接口來實現(xiàn)。TryAcquire 會立即返回倦沧。

Limit 抽象了一個限流定義唇撬,Key() 方法返回這個 Limit 的唯一標(biāo)識,Name() 僅作輔助展融,Period() 表示周期窖认,單位是秒,Count() 表示周期內(nèi)的最大次數(shù)告希,LimitType()表示根據(jù)什么來做區(qū)分扑浸,如 IP,默認(rèn)是 CUSTOM.
BurstLimit 提供突發(fā)的能力燕偶,一般是配合令牌桶算法喝噪。DistLimit 新增 ClusterNum() 方法,因為 mentor 要求分布式遇到錯誤的時候指么,需要退化為單機(jī)版本酝惧,退化的策略即是:2 節(jié)點(diǎn)總共 100QPS榴鼎,如果出現(xiàn)分區(qū),每個節(jié)點(diǎn)需要調(diào)整為各 50QPS

2. LocalCounterLimiter

package ratelimit

import (
    "errors"
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"
)

// todo timer 需要 stop
type localCounterLimiter struct {
    limit Limit

    limitCount int32 // 內(nèi)部使用晚唇,對 limit.count 做了 <0 時的轉(zhuǎn)換

    ticker *time.Ticker
    quit chan bool

    lock sync.Mutex
    newTerm *sync.Cond
    count int32
}

func (lim *localCounterLimiter) init() {
    lim.newTerm = sync.NewCond(&lim.lock)
    lim.limitCount = lim.limit.Count()

    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32 // count 永遠(yuǎn)不會大于 limitCount巫财,后面的寫法保證溢出也沒問題
    } else if lim.limitCount == 0  {
        // 禁止訪問, 會無限阻塞
    } else {
        lim.ticker = time.NewTicker(lim.limit.Period())
        lim.quit = make(chan bool, 1)

        go func() {
            for {
                select {
                case <- lim.ticker.C:
                    fmt.Println("ticker .")
                    atomic.StoreInt32(&lim.count, 0)
                    lim.newTerm.Broadcast()

                    //lim.newTerm.L.Unlock()
                case <- lim.quit:
                    fmt.Println("work well .")
                    lim.ticker.Stop()
                    return
                }
            }
        }()
    }
}

// todo 需要機(jī)制來防止無限阻塞, 不超時也應(yīng)該有個極限時間
func (lim *localCounterLimiter) Acquire() error {
    if lim.limitCount == 0 {
        return errors.New("rate limit is 0, infinity wait")
    }

    lim.newTerm.L.Lock()
    for lim.count >= lim.limitCount {
        // block instead of spinning
        lim.newTerm.Wait()
        //fmt.Println(count, lim.limitCount)
    }
    lim.count++
    lim.newTerm.L.Unlock()

    return nil
}

func (lim *localCounterLimiter) TryAcquire() bool {
    count := atomic.AddInt32(&lim.count, 1)
    if count > lim.limitCount {
        return false
    } else {
        return true
    }
}

代碼很簡單,就不多說了

3. LocalTokenBucketLimiter

golang 的官方庫里提供了一個 ratelimiter缺亮,就是采用令牌桶的算法翁涤。所以這里并沒有重復(fù)造輪子桥言,直接代理了 ratelimiter萌踱。

package ratelimit

import (
    "context"
    "golang.org/x/time/rate"
    "math"
)

type localTokenBucketLimiter struct {
    limit Limit

    limiter *rate.Limiter // 直接復(fù)用令牌桶的
}

func (lim *localTokenBucketLimiter) init() {
    burstCount := lim.limit.Count()
    if burstLimit, ok := lim.limit.(BurstLimit); ok {
        burstCount = burstLimit.BurstCount()
    }

    count := lim.limit.Count()
    if count < 0 {
        count = math.MaxInt32
    }

    f := float64(count) / lim.limit.Period().Seconds()
    if f < 0 {
        f = float64(rate.Inf) // 無限
    } else if f == 0 {
        panic("為 0 的時候,底層實現(xiàn)有問題")
    }

    lim.limiter = rate.NewLimiter(rate.Limit(f), int(burstCount))
}

func (lim *localTokenBucketLimiter) Acquire() error {
    err := lim.limiter.Wait(context.TODO())
    return err
}

func (lim *localTokenBucketLimiter) TryAcquire() bool {
    return lim.limiter.Allow()
}

4. RedisCounterLimiter

package ratelimit

import (
    "math"
    "sync"
    "xg-go/log"
    "xg-go/xg/common"
)

type redisCounterLimiter struct {
    limit      DistLimit
    limitCount int32 // 內(nèi)部使用号阿,對 limit.count 做了 <0 時的轉(zhuǎn)換

    redisClient *common.RedisClient

    once sync.Once // 退化為本地計數(shù)器的時候使用
    localLim Limiter

    //script string
}

func (lim *redisCounterLimiter) init() {
    lim.limitCount = lim.limit.Count()
    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32
    }

    //lim.script = buildScript()
}

//func buildScript() string {
//  sb := strings.Builder{}
//
//  sb.WriteString("local c")
//  sb.WriteString("\nc = redis.call('get',KEYS[1])")
//  // 調(diào)用不超過最大值并鸵,則直接返回
//  sb.WriteString("\nif c and tonumber(c) > tonumber(ARGV[1]) then")
//  sb.WriteString("\nreturn c;")
//  sb.WriteString("\nend")
//  // 執(zhí)行計算器自加
//  sb.WriteString("\nc = redis.call('incr',KEYS[1])")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nredis.call('expire',KEYS[1],ARGV[2])")
//  sb.WriteString("\nend")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nreturn c;")
//
//  return sb.String()
//}

func (lim *redisCounterLimiter) Acquire() error {
    panic("implement me")
}

func (lim *redisCounterLimiter) TryAcquire() (success bool) {
    defer func() {
        // 一般是 redis 連接斷了,會觸發(fā)空指針
        if err := recover(); err != nil {
            //log.Errorw("TryAcquire err", common.ERR, err)
            //success = lim.degradeTryAcquire()
            //return
            success = true
        }

        // 沒有錯誤扔涧,判斷是否開啟了 local 如果開啟了园担,把它停掉
        //if lim.localLim != nil {
        //  // stop 線程安全
        //  lim.localLim.Stop()
        //}
    }()

    count, err := lim.redisClient.IncrBy(lim.limit.Key(), 1)
    //panic("模擬 redis 出錯")
    if err != nil {
        log.Errorw("TryAcquire err", common.ERR, err)
        panic(err)
    }

    // *2 是為了保留久一點(diǎn),便于觀察
    err = lim.redisClient.Expire(lim.limit.Key(), int(2 * lim.limit.Period().Seconds()))
    if err != nil {
        log.Errorw("TryAcquire error", common.ERR, err)
        panic(err)
    }

    // 業(yè)務(wù)正確的情況下 確認(rèn)超限
    if int32(count) > lim.limitCount {
        return false
    }

    return true

    //keys := []string{lim.limit.Key()}
    //
    //log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //if err != nil {
    //  log.Errorw("TryAcquire error", common.ERR, err)
    //  return false
    //}
    //
    //
    //typeName := reflect.TypeOf(count).Name()
    //log.Errorw(typeName)
    //
    //if count != nil && count.(int32) <= lim.limitCount {
    //
    //  return true
    //}
    //return false
}

func (lim *redisCounterLimiter) Stop() {
    // 判斷是否開啟了 local 如果開啟了枯夜,把它停掉
    if lim.localLim != nil {
        // stop 線程安全
        lim.localLim.Stop()
    }
}

func (lim *redisCounterLimiter) degradeTryAcquire() bool {
    lim.once.Do(func() {
        count := lim.limit.Count() / lim.limit.ClusterNum()
        limit := LocalLimit {
            name: lim.limit.Name(),
            key: lim.limit.Key(),
            count: count,
            period: lim.limit.Period(),
            limitType: lim.limit.LimitType(),
        }

        lim.localLim = NewLimiter(&limit)
    })

    return lim.localLim.TryAcquire()
}

代碼里回退的部分注釋了弯汰,因為線上為了穩(wěn)定,實習(xí)生的代碼畢竟湖雹,所以先不跑咏闪。
本來原有的思路是直接用 lua 腳本在 redis 上保證原子操作,但是底層封裝的庫對于直接調(diào) eval 跑的時候摔吏,會拋錯鸽嫂,而且 source 是 go-redis 里面,趕 ddl 沒有時間去 debug征讲,所以只能用 incrBy + expire 分開來据某。

5. RedisTokenBucketLimiter

令牌桶的狀態(tài)變量得放在一個 線程安全/一致 的地方,redis 是不二人選诗箍。但是令牌桶的算法核心是個延遲計算得到令牌數(shù)量癣籽,這個是一個很長的臨界區(qū),所以要么用分布式鎖滤祖,要么直接利用 redis 的單線程以原子方式跑筷狼。一般業(yè)界是后者,即 lua 腳本維護(hù)令牌桶的狀態(tài)變量氨距、計算令牌桑逝。代碼類似這種

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local intval = tonumber(ARGV[5])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2) * intval

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
  new_tokens = filled_tokens - requested
end

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed, new_tokens }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市俏让,隨后出現(xiàn)的幾起案子楞遏,更是在濱河造成了極大的恐慌茬暇,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寡喝,死亡現(xiàn)場離奇詭異糙俗,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)预鬓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評論 2 382
  • 文/潘曉璐 我一進(jìn)店門巧骚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人格二,你說我怎么就攤上這事劈彪。” “怎么了顶猜?”我有些...
    開封第一講書人閱讀 152,998評論 0 344
  • 文/不壞的土叔 我叫張陵沧奴,是天一觀的道長。 經(jīng)常有香客問我长窄,道長滔吠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評論 1 279
  • 正文 為了忘掉前任挠日,我火速辦了婚禮疮绷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嚣潜。我一直安慰自己冬骚,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評論 5 374
  • 文/花漫 我一把揭開白布郑原。 她就那樣靜靜地躺著唉韭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪犯犁。 梳的紋絲不亂的頭發(fā)上属愤,一...
    開封第一講書人閱讀 49,079評論 1 285
  • 那天,我揣著相機(jī)與錄音酸役,去河邊找鬼住诸。 笑死,一個胖子當(dāng)著我的面吹牛涣澡,可吹牛的內(nèi)容都是我干的贱呐。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼入桂,長吁一口氣:“原來是場噩夢啊……” “哼奄薇!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起抗愁,我...
    開封第一講書人閱讀 37,019評論 0 259
  • 序言:老撾萬榮一對情侶失蹤馁蒂,失蹤者是張志新(化名)和其女友劉穎呵晚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沫屡,經(jīng)...
    沈念sama閱讀 43,519評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡饵隙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了沮脖。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片金矛。...
    茶點(diǎn)故事閱讀 38,100評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖勺届,靈堂內(nèi)的尸體忽然破棺而出驶俊,到底是詐尸還是另有隱情,我是刑警寧澤涮因,帶...
    沈念sama閱讀 33,738評論 4 324
  • 正文 年R本政府宣布废睦,位于F島的核電站伺绽,受9級特大地震影響养泡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜奈应,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評論 3 307
  • 文/蒙蒙 一澜掩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧杖挣,春花似錦肩榕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至歌殃,卻和暖如春乔妈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氓皱。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評論 1 262
  • 我被黑心中介騙來泰國打工路召, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人波材。 一個月前我還...
    沈念sama閱讀 45,547評論 2 354
  • 正文 我出身青樓股淡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親廷区。 傳聞我的和親對象是個殘疾皇子唯灵,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評論 2 345

推薦閱讀更多精彩內(nèi)容