背景
關于限流Go官方通過一個采用令牌池的算法的實現(xiàn):golang.org/x/time/rate,但是狮杨,這個限制的是每秒的請求數(shù),有的時候我們希望限制的是系統(tǒng)并發(fā)處理的請求數(shù)量兴垦,類似線程池的功能茵汰,需求如下:
- 設置一個最大的請求處理數(shù)量,當請求超過時绑榴,后續(xù)請求將等待哪轿,直到有請求處理完后被喚醒。
- 請求的等待時間能夠指定翔怎,超出等待時間就返回窃诉,提示給客戶端杨耙。
- 等待請求的個數(shù)需要能夠限制,數(shù)量超過時就直接返回飘痛,提示給客戶端珊膜。
設計
設計思路是實現(xiàn)一個Ticket池(NumLimiter),每個請求首先需要向NumLimiter申請一個ticket敦冬,當請求處理結束后辅搬,需要被回收唯沮。
獲取不到ticket的請求就等待現(xiàn)有的ticket釋放脖旱,所以會有兩個核心對象:
- NumLimiter:數(shù)量限制器(ticket 池)
- Ticket:入場券,請求需要先申請一個Ticket
先不考慮細節(jié)介蛉,可以設計如下:
package numlimiter
// 數(shù)量限制器
type NumLimiter struct {
maxTicket int // 最大請求數(shù)
maxWait int // 最大等待數(shù)
...
}
// 釋放Ticket
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
...
}
// 預訂Ticket
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
...
}
// 創(chuàng)建一個tocket池
func New(maxTicket) *NumLimiter {
l := &NumLimiter{
maxTicket: maxTicket,
}
return l
}
// 入場券
type Ticket struct {
l *NumLimiter
reqKey int64
}
// 釋放入場券
func (r *Ticket) Close() {
r.l.releaseTicket(r)
}
NumLimiter有兩個核心的方法:
- Reserve - 申請Ticket:每個請求處理前需要先調(diào)用該方法獲取一個ticket萌庆,如果當前頒發(fā)的ticket數(shù)已經(jīng)是大于等于 maxTicket時,請求就pending等待Ticket釋放币旧。 該方法接收一個context践险,作用是傳遞外部超時或取消的信號,結束等待吹菱。
- releaseTicket - 釋放Ticket:當請求處理完就需要把持有的ticket釋放巍虫,該方法不直接暴露給外部,提供給ticket的Close方法調(diào)用鳍刷。
Ticket就只有一個Close方法:
- Close:調(diào)用NumLimiter的releaseTicket釋放Ticket
客戶端使用:
每次處理請求需要先調(diào)用Reserve獲取Ticket占遥,獲取到后才執(zhí)行具體的業(yè)務邏輯,執(zhí)行完畢后調(diào)用Close方法釋放Ticket
l := numlimiter.New(2)
func Do(req Request) error { // 模擬請求request
tk, err := l.Reserve(context.Background()) // 申請Ticket
if err != nil { // 異常
return err
}
defer tk.Close() // 釋放Ticket
// 處理請求req
...
}
整個框架定義好了输瓜,接著開始擼具體實現(xiàn)
首先瓦胎,需要給每個ticket標識一個唯一標識,我們定義一個reqKey序列尤揣,通過nextReqKeyLocked方法自增搔啊,調(diào)用時需要加鎖,保證在NumLimiter實例生成的key是唯一北戏,代碼如下:
type NumLimiter struct {
nextKey int64 // 下一個請求的Key
...
}
// 每次調(diào)用nextKey自動+1负芋,調(diào)用的時候需要加鎖,保證協(xié)程安全
func (r *NumLimiter) nextReqKeyLocked() int64 {
next := r.nextKey
r.nextKey++
return next
}
接著嗜愈,我們開始實現(xiàn)核心的Reserve()方法旧蛾,梳理后的邏輯如下:
- 當頒發(fā)的Ticket數(shù)量小于maxTicket時,創(chuàng)建一個Ticket直接返回芝硬。
- 如果Ticket數(shù)量大于等于maxTicket蚜点,就先判斷當前wait請求數(shù)是否超過maxWait,如果”是“拌阴,直接返回相應的error绍绘。
- 如果wait數(shù)沒超過,就pending等待Ticket釋放,同時還得監(jiān)聽是否超時陪拘。
實現(xiàn)邏輯之前需要考慮:
- Ticket如何管理厂镇。想要統(tǒng)一管理已經(jīng)發(fā)放的Ticket數(shù)量,就需要有地方存儲左刽,還能對NumLimiter中所有方法可見捺信,所以在NumLimiter中增加一個tickets屬性,類型為 :map[int64]*Ticket(注:key 為請求的key欠痴,value對應的是已經(jīng)頒發(fā)的Ticket)
- 管理等待Ticket迄靠。同樣等待Ticket的請求需要被存儲,并且能夠被喚醒喇辽。于是也可以在NumLimiter增加一個屬性:waitTickets掌挚,類型為:map[int64]chan struct{}(注:key同樣是請求的key,值比較特殊菩咨,使用chan吠式,目的是為了其他協(xié)程能安全訪問,當沒數(shù)據(jù)時讀取會pending抽米,被close后會繼續(xù)特占,chan的類型我們不關注,所以直接使用空結構體struct{})
- 另外云茸,為了保護這些共享資源是目,還需要一個鎖:mu sync.Mutex:
type NumLimiter struct {
maxTicket int // 最大請求數(shù)
maxWait int // 最大等待數(shù)量
mu sync.Mutex
nextKey int64 // 下一個請求的Key
tickets map[int64]*Ticket
waitTickets map[int64]chan struct{}
...
}
接下來就可以開始實現(xiàn)Reserve方法:
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
r.mu.Lock()
reqKey := r.nextReqKeyLocked()
t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()}
// 當請求數(shù)量大于maxTicket就放到waitTickets中等待
if len(r.tickets) >= r.maxTicket {
if len(waitTickets) > r.maxWait {
return nil, errors.New("waiting exceed max wait")
}
req := make(chan struct{})
now := time.Now()
r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1)
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解鎖,否則會導致其他協(xié)程調(diào)用Reserve或releaseTicket方法獲取不到鎖
select {
case <-ctx.Done():
r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds())
r.mu.Lock()
delete(r.waitTickets, reqKey)
r.mu.Unlock()
select {
default:
case <-req:
t.Close() // 返回ticket
}
return nil, ctx.Err()
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
}
r.tickets[reqKey] = t
r.mu.Unlock()
return t, nil
}
雖然代碼看著比較長查辩,但是整個實現(xiàn)沒太多復雜邏輯胖笛,核心代碼就是等待ticket和被喚醒部分:
req := make(chan struct{})
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解鎖,否則會導致其他協(xié)程調(diào)用Reserve或releaseTicket方法獲取不到鎖
select {
...
case <-req:
r.mu.Lock()
r.tickets[reqKey] = t
r.mu.Unlock()
r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
return t, nil
}
這里是利用chan特性宜岛,當要pending等待時长踊,會創(chuàng)建一個請求chan:req := make(chan struct{}),然后放到waitTickets后就立即解鎖(目的是讓其他協(xié)程能獲取到鎖)萍倡,chan在沒數(shù)據(jù)寫入或chan沒有被關閉的情況下會pending身弊,如果一旦有ticket釋放,會通過close這個chan方式通知繼續(xù)列敲。
另外阱佛,超時的實現(xiàn)是借助context來實現(xiàn),通過監(jiān)聽ctx.Done()方法戴而,同時還要注意并發(fā)問題凑术,超時的時候還是有可能獲取到鎖,所以還是得再檢查一下case <-req是否成立所意,成立就說明超時的同時也正好獲取到ticket淮逊,但是由于超時了催首,ticket就沒用了,直接釋放t.Close()泄鹏。
接著郎任,我們來實現(xiàn)ticket釋放邏輯:
- 刪除tickets中對應的數(shù)據(jù)。(從tickets移除了备籽,所以相當于將ticket釋放了)
- 如果waitTickets沒有數(shù)據(jù)就直接返回舶治。len(tickets)數(shù)量已經(jīng)-1,相當于ticket釋放到池中车猬。
- 如果waitTickets有等待ticket的請求霉猛,就直接通知其中的一個等待ticket的請求可以繼續(xù),然后等待請求從waitTickets刪除诈唬,相當于將要釋放的ticket直接移交給等待ticket的請求韩脏。
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
r.mu.Lock()
defer r.mu.Unlock()
// 刪除tickets中對應的數(shù)據(jù)
releaseSuccess := true
if _, ok := r.tickets[t.reqKey]; ok {
delete(r.tickets, t.reqKey)
} else {
releaseSuccess = false
}
// 如果waitTickets有等待ticket的請求
if len(r.waitTickets) > 0 {
var req chan struct{}
var reqKey int64
// 取出一條
for reqKey, req = range r.waitTickets {
break
}
close(req) // 通過close方式,通知等待ticket的協(xié)程繼續(xù)
delete(r.waitTickets, reqKey)// 從waitTickets刪除
}
return releaseSuccess
}
這里的通知方式采用close(req)的方式傳輸信號铸磅,相應在Reserve()方法的select case <-req等待的請求就會收到信號,繼續(xù)執(zhí)行杭朱,同時將獲取到的ticket保存在tickets中阅仔,返回對應的ticket后,客戶端獲取到ticket就可以繼續(xù)請求的處理弧械。
另外八酒,實際上releaseTicket方法是不直接暴露給客戶端,而是提供給ticket的close方法調(diào)用:
func (r *Ticket) Close() {
if !r.l.releaseTicket(r) {
r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey)
}
}
這樣當獲得到ticket后刃唐,客戶端可以把這ticket對象傳到方法羞迷,釋放的時候就直接調(diào)用ticket的close方法,就不需要管NumLimiter對象画饥。
最后增加一個初始化方法衔瓮,方便實例化NumLimiter:
func New(maxTicket, maxWait int) *NumLimiter {
l := &NumLimiter{
waitTickets: map[int64]chan struct{}{},
tickets: map[int64]*Ticket{},
maxTicket: maxTicket,
maxWait: maxWait,
}
return l
}
這樣一個完整限量的功能就完成了。
總結
限量的實現(xiàn)是參考database/sql 設計抖甘,核心的思想是如何合理管理ticket热鞍,超出時借助chan實現(xiàn)等待,還有context實現(xiàn)超時衔彻,當ticket釋放薇宠,通過close chan來實現(xiàn)廣播,通知對應的等待請求可以繼續(xù)艰额。
我的博客:https://itart.cn/blogs/2022/practice/num-limiter-library.html