背景
負(fù)責(zé)的項目中有一個爬蟲調(diào)度項目卿吐∨越ⅲ基礎(chǔ)的模型就是利用平臺提供的 Token
通過相關(guān)的數(shù)據(jù) API
從平臺獲取數(shù)據(jù)。
對于每個 Token
均存在一個短時間內(nèi)調(diào)用的上限嗡官。一旦超出限制箭窜,將在一段時間內(nèi)不能進(jìn)行繼續(xù)獲取。
之前對于這個限制的管理比較簡單衍腥,當(dāng)任務(wù)觸發(fā)時磺樱,會直接發(fā)起數(shù)據(jù)獲取請求。通過檢查返回信息婆咸,判斷是否超限竹捉,如果超限,設(shè)置一個等待時間之后進(jìn)行重試尚骄。但發(fā)現(xiàn)這樣沒有最大化的利用到 Token
. 因為發(fā)起請求本身就是對 Token
的一種消耗块差。
最近跟組長進(jìn)行討論相關(guān)細(xì)節(jié)時,他提到可以利用 Nginx
的流量限制來進(jìn)行改進(jìn)。研究之后發(fā)現(xiàn)令牌桶算法
很合適這個需求憾儒。
令牌桶算法
簡介
令牌桶(token bucket
)算法是 Nginx
進(jìn)行流量限制的一種常用算法询兴。常用于控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)量乃沙,并允許突發(fā)數(shù)據(jù)的發(fā)送起趾。
基礎(chǔ)流程圖
當(dāng)數(shù)據(jù)請求來臨時,算法通過檢查當(dāng)前桶的令牌量警儒,如果令牌量足以支持消耗训裆,即會進(jìn)行接下來的處理。
如果令牌不足蜀铲,則會將請求拋棄(獲取緩存边琉,看相關(guān)需求)
使用
在當(dāng)前的需求中,對每一個 Token
實例添加一個容量桶记劝。存儲當(dāng)前的可調(diào)用次數(shù)。當(dāng)有 worker
發(fā)起請求時厌丑,先檢查當(dāng)前的可調(diào)用余量。
如果余量足夠砍鸠,則返回可調(diào)用狀態(tài)耕驰,并設(shè)置當(dāng)前的處理時間。當(dāng)請求完畢時朦肘,對桶進(jìn)行主動更新。如果當(dāng)前余量不足以進(jìn)行請求媒抠,則可以返回需要等待的時間,或者執(zhí)行切換 Token
實例等操作夫嗓。
簡單實現(xiàn)
import time
class TokenBucket:
def __init__(self, rate=0.1, capacity=100):
"""
此 為 單例
初始化時 應(yīng)設(shè)置 當(dāng)前的容量為 總?cè)萘? :param rate: 速率 秒為單位
:param capacity: 總?cè)萘? """
self._rate = rate
self._capacity = capacity
self.current_amount = capacity
self._last_consumed_at = int(time.time())
def consume(self, need_amount=1):
"""
進(jìn)行消費
:param need_amount:
:return:
"""
increments = (int(time.time()) - self._last_consumed_at) * self._rate
self.current_amount = min(
self.current_amount + increments, self._capacity
)
if need_amount > self.current_amount:
return False
self.current_amount = self.current_amount - need_amount
self._last_consumed_at = int(time.time())
return True
def update(self, amount):
"""
存在一個更新操作舍咖,用于 Token 余量狀態(tài)主動返回
并重新計算 最后消費時間
:param amount: 主動發(fā)送的數(shù)量
:return:
"""
self.current_amount = min(amount, self._capacity)
self._last_consumed_at = int(time.time())
源碼可訪問 code