并發(fā)請求量限制組件分享

背景

關于限流Go官方通過一個采用令牌池的算法的實現(xiàn):golang.org/x/time/rate,但是狮杨,這個限制的是每秒的請求數(shù),有的時候我們希望限制的是系統(tǒng)并發(fā)處理的請求數(shù)量兴垦,類似線程池的功能茵汰,需求如下:

  1. 設置一個最大的請求處理數(shù)量,當請求超過時绑榴,后續(xù)請求將等待哪轿,直到有請求處理完后被喚醒。
  2. 請求的等待時間能夠指定翔怎,超出等待時間就返回窃诉,提示給客戶端杨耙。
  3. 等待請求的個數(shù)需要能夠限制,數(shù)量超過時就直接返回飘痛,提示給客戶端珊膜。

設計

設計思路是實現(xiàn)一個Ticket池(NumLimiter),每個請求首先需要向NumLimiter申請一個ticket敦冬,當請求處理結束后辅搬,需要被回收唯沮。

獲取不到ticket的請求就等待現(xiàn)有的ticket釋放脖旱,所以會有兩個核心對象:

  1. NumLimiter:數(shù)量限制器(ticket 池)
  2. Ticket:入場券,請求需要先申請一個Ticket

先不考慮細節(jié)介蛉,可以設計如下:

package numlimiter

// 數(shù)量限制器
type NumLimiter struct {
  maxTicket   int // 最大請求數(shù)
  maxWait       int // 最大等待數(shù)
    ...
}
// 釋放Ticket
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
    ...
}
// 預訂Ticket
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
    ...
}
// 創(chuàng)建一個tocket池
func New(maxTicket) *NumLimiter {
    l := &NumLimiter{
        maxTicket:   maxTicket,
    }
    return l
}
// 入場券
type Ticket struct {
    l      *NumLimiter
    reqKey int64
}
// 釋放入場券
func (r *Ticket) Close() {
    r.l.releaseTicket(r)
}

NumLimiter有兩個核心的方法:

  1. Reserve - 申請Ticket:每個請求處理前需要先調(diào)用該方法獲取一個ticket萌庆,如果當前頒發(fā)的ticket數(shù)已經(jīng)是大于等于 maxTicket時,請求就pending等待Ticket釋放币旧。 該方法接收一個context践险,作用是傳遞外部超時或取消的信號,結束等待吹菱。
  2. releaseTicket - 釋放Ticket:當請求處理完就需要把持有的ticket釋放巍虫,該方法不直接暴露給外部,提供給ticket的Close方法調(diào)用鳍刷。

Ticket就只有一個Close方法:

  1. Close:調(diào)用NumLimiter的releaseTicket釋放Ticket

客戶端使用

每次處理請求需要先調(diào)用Reserve獲取Ticket占遥,獲取到后才執(zhí)行具體的業(yè)務邏輯,執(zhí)行完畢后調(diào)用Close方法釋放Ticket

l := numlimiter.New(2) 
func Do(req Request) error { // 模擬請求request
  tk, err := l.Reserve(context.Background()) // 申請Ticket
  if err != nil { // 異常
    return err
  }
  defer tk.Close() // 釋放Ticket
  // 處理請求req
  ...
}

整個框架定義好了输瓜,接著開始擼具體實現(xiàn)

首先瓦胎,需要給每個ticket標識一個唯一標識,我們定義一個reqKey序列尤揣,通過nextReqKeyLocked方法自增搔啊,調(diào)用時需要加鎖,保證在NumLimiter實例生成的key是唯一北戏,代碼如下:

type NumLimiter struct {
  nextKey     int64 // 下一個請求的Key
    ...
}
// 每次調(diào)用nextKey自動+1负芋,調(diào)用的時候需要加鎖,保證協(xié)程安全
func (r *NumLimiter) nextReqKeyLocked() int64 {
    next := r.nextKey
    r.nextKey++
    return next
}

接著嗜愈,我們開始實現(xiàn)核心的Reserve()方法旧蛾,梳理后的邏輯如下:

  1. 當頒發(fā)的Ticket數(shù)量小于maxTicket時,創(chuàng)建一個Ticket直接返回芝硬。
  2. 如果Ticket數(shù)量大于等于maxTicket蚜点,就先判斷當前wait請求數(shù)是否超過maxWait,如果”是“拌阴,直接返回相應的error绍绘。
  3. 如果wait數(shù)沒超過,就pending等待Ticket釋放,同時還得監(jiān)聽是否超時陪拘。

實現(xiàn)邏輯之前需要考慮:

  1. Ticket如何管理厂镇。想要統(tǒng)一管理已經(jīng)發(fā)放的Ticket數(shù)量,就需要有地方存儲左刽,還能對NumLimiter中所有方法可見捺信,所以在NumLimiter中增加一個tickets屬性,類型為 :map[int64]*Ticket(注:key 為請求的key欠痴,value對應的是已經(jīng)頒發(fā)的Ticket)
  2. 管理等待Ticket迄靠。同樣等待Ticket的請求需要被存儲,并且能夠被喚醒喇辽。于是也可以在NumLimiter增加一個屬性:waitTickets掌挚,類型為:map[int64]chan struct{}(注:key同樣是請求的key,值比較特殊菩咨,使用chan吠式,目的是為了其他協(xié)程安全訪問,當沒數(shù)據(jù)時讀取會pending抽米,被close后會繼續(xù)特占,chan的類型我們不關注,所以直接使用空結構體struct{})
  3. 另外云茸,為了保護這些共享資源是目,還需要一個鎖:mu sync.Mutex:
type NumLimiter struct {
  maxTicket   int // 最大請求數(shù)
  maxWait   int // 最大等待數(shù)量
  mu          sync.Mutex
  nextKey     int64 // 下一個請求的Key
  tickets     map[int64]*Ticket
  waitTickets map[int64]chan struct{}
    ...
}

接下來就可以開始實現(xiàn)Reserve方法

func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
    r.mu.Lock()
    reqKey := r.nextReqKeyLocked()
    t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()}
    // 當請求數(shù)量大于maxTicket就放到waitTickets中等待
    if len(r.tickets) >= r.maxTicket {
        if len(waitTickets) > r.maxWait {
            return nil, errors.New("waiting exceed max wait")
        }
        req := make(chan struct{})
        now := time.Now()
        r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1)
        r.waitTickets[reqKey] = req
        r.mu.Unlock() // 需要立即解鎖,否則會導致其他協(xié)程調(diào)用Reserve或releaseTicket方法獲取不到鎖

        select {
        case <-ctx.Done():
            r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds())
            r.mu.Lock()
            delete(r.waitTickets, reqKey)
            r.mu.Unlock()
            select {
            default:
            case <-req:
                t.Close() // 返回ticket
            }
            return nil, ctx.Err()
        case <-req:
            r.mu.Lock()
            r.tickets[reqKey] = t
            r.mu.Unlock()
            r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
            return t, nil
        }
    }
    r.tickets[reqKey] = t
    r.mu.Unlock()
    return t, nil
}

雖然代碼看著比較長查辩,但是整個實現(xiàn)沒太多復雜邏輯胖笛,核心代碼就是等待ticket和被喚醒部分:

req := make(chan struct{})
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解鎖,否則會導致其他協(xié)程調(diào)用Reserve或releaseTicket方法獲取不到鎖
select {
  ...
  case <-req:
  r.mu.Lock()
  r.tickets[reqKey] = t
  r.mu.Unlock()
  r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
  return t, nil
}

這里是利用chan特性宜岛,當要pending等待時长踊,會創(chuàng)建一個請求chan:req := make(chan struct{}),然后放到waitTickets后就立即解鎖(目的是讓其他協(xié)程能獲取到鎖)萍倡,chan在沒數(shù)據(jù)寫入或chan沒有被關閉的情況下會pending身弊,如果一旦有ticket釋放,會通過close這個chan方式通知繼續(xù)列敲。
另外阱佛,超時的實現(xiàn)是借助context來實現(xiàn),通過監(jiān)聽ctx.Done()方法戴而,同時還要注意并發(fā)問題凑术,超時的時候還是有可能獲取到鎖,所以還是得再檢查一下case <-req是否成立所意,成立就說明超時的同時也正好獲取到ticket淮逊,但是由于超時了催首,ticket就沒用了,直接釋放t.Close()泄鹏。

接著郎任,我們來實現(xiàn)ticket釋放邏輯

  1. 刪除tickets中對應的數(shù)據(jù)。(從tickets移除了备籽,所以相當于將ticket釋放了)
  2. 如果waitTickets沒有數(shù)據(jù)就直接返回舶治。len(tickets)數(shù)量已經(jīng)-1,相當于ticket釋放到池中车猬。
  3. 如果waitTickets有等待ticket的請求霉猛,就直接通知其中的一個等待ticket的請求可以繼續(xù),然后等待請求從waitTickets刪除诈唬,相當于將要釋放的ticket直接移交給等待ticket的請求韩脏。
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
    r.mu.Lock()
    defer r.mu.Unlock()
  // 刪除tickets中對應的數(shù)據(jù)
    releaseSuccess := true
    if _, ok := r.tickets[t.reqKey]; ok {
        delete(r.tickets, t.reqKey)
    } else {
        releaseSuccess = false
    }
  // 如果waitTickets有等待ticket的請求
    if len(r.waitTickets) > 0 {
        var req chan struct{}
        var reqKey int64
    // 取出一條
        for reqKey, req = range r.waitTickets {
            break
        }
        close(req) // 通過close方式,通知等待ticket的協(xié)程繼續(xù)
        delete(r.waitTickets, reqKey)// 從waitTickets刪除
    }
    return releaseSuccess
}

這里的通知方式采用close(req)的方式傳輸信號铸磅,相應在Reserve()方法的select case <-req等待的請求就會收到信號,繼續(xù)執(zhí)行杭朱,同時將獲取到的ticket保存在tickets中阅仔,返回對應的ticket后,客戶端獲取到ticket就可以繼續(xù)請求的處理弧械。

另外八酒,實際上releaseTicket方法是不直接暴露給客戶端,而是提供給ticket的close方法調(diào)用:

func (r *Ticket) Close() {
    if !r.l.releaseTicket(r) {
        r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey)
    }
}

這樣當獲得到ticket后刃唐,客戶端可以把這ticket對象傳到方法羞迷,釋放的時候就直接調(diào)用ticket的close方法,就不需要管NumLimiter對象画饥。

最后增加一個初始化方法衔瓮,方便實例化NumLimiter:

func New(maxTicket, maxWait int) *NumLimiter {
    l := &NumLimiter{
        waitTickets: map[int64]chan struct{}{},
        tickets:     map[int64]*Ticket{},
        maxTicket:   maxTicket,
    maxWait:         maxWait,
    }
    return l
}

這樣一個完整限量的功能就完成了。

總結

限量的實現(xiàn)是參考database/sql 設計抖甘,核心的思想是如何合理管理ticket热鞍,超出時借助chan實現(xiàn)等待,還有context實現(xiàn)超時衔彻,當ticket釋放薇宠,通過close chan來實現(xiàn)廣播,通知對應的等待請求可以繼續(xù)艰额。

我的博客https://itart.cn/blogs/2022/practice/num-limiter-library.html

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末澄港,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子柄沮,更是在濱河造成了極大的恐慌回梧,老刑警劉巖逐工,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異漂辐,居然都是意外死亡泪喊,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門髓涯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來袒啼,“玉大人,你說我怎么就攤上這事纬纪◎驹伲” “怎么了?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵包各,是天一觀的道長摘仅。 經(jīng)常有香客問我,道長问畅,這世上最難降的妖魔是什么娃属? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮护姆,結果婚禮上矾端,老公的妹妹穿的比我還像新娘。我一直安慰自己卵皂,他們只是感情好秩铆,可當我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灯变,像睡著了一般殴玛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上添祸,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天滚粟,我揣著相機與錄音,去河邊找鬼膝捞。 笑死坦刀,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的蔬咬。 我是一名探鬼主播鲤遥,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼林艘!你這毒婦竟也來了盖奈?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤狐援,失蹤者是張志新(化名)和其女友劉穎钢坦,沒想到半個月后究孕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡爹凹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年厨诸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片禾酱。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡微酬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出颤陶,到底是詐尸還是另有隱情颗管,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布滓走,位于F島的核電站垦江,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏搅方。R本人自食惡果不足惜比吭,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望腰懂。 院中可真熱鬧梗逮,春花似錦、人聲如沸绣溜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怖喻。三九已至,卻和暖如春岁诉,著一層夾襖步出監(jiān)牢的瞬間锚沸,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工涕癣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哗蜈,地道東北人。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓坠韩,卻偏偏與公主長得像距潘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子只搁,可洞房花燭夜當晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容