在開發(fā)高并發(fā)系統(tǒng)時(shí)有三把利器用來保護(hù)系統(tǒng):緩存昌抠、降級(jí)和限流
緩存:緩存的目的是提升系統(tǒng)訪問速度和增大系統(tǒng)處理容量
降級(jí):降級(jí)是當(dāng)服務(wù)出現(xiàn)問題或者影響到核心流程時(shí)雀久,需要暫時(shí)屏蔽掉,待高峰或者問題解決后再打開
限流:限流的目的是通過對(duì)并發(fā)訪問/請(qǐng)求進(jìn)行限速,或者對(duì)一個(gè)時(shí)間窗口內(nèi)的請(qǐng)求進(jìn)行限速來保護(hù)系統(tǒng),一旦達(dá)到限制速率則可以拒絕服務(wù)坎炼、排隊(duì)或等待、降級(jí)等處理
常見的應(yīng)用:短信/郵件的提供商需要限制每秒可以發(fā)多少封短信/郵件
當(dāng)前最主要的限流方式有兩種:漏桶算法和令牌桶算法
一拦键、漏桶算法
漏桶算法思路很簡(jiǎn)單谣光,水(請(qǐng)求)先進(jìn)入到漏桶里,漏桶以一定的速度出水芬为,當(dāng)水流入速度過大會(huì)直接溢出萄金,可以看出漏桶算法能強(qiáng)行限制數(shù)據(jù)的傳輸速率。
nginx 有兩個(gè)限流模塊碳柱,從 github 上 clone 代碼捡絮,位置在 nginx/src/http/modules 目錄下:
ngx_http_limit_req_module.c (nginx 的 limit_req 模塊,用來 限制時(shí)間窗口內(nèi)的平均速率)
ngx_http_limit_conn_module.c (nginx 的 limit_conn 模塊莲镣,用來限制并發(fā)連接數(shù))
兩者都是按照 IP 或者域名限制的
nginx Github地址:https://github.com/nginx/nginx/tree/master/src/http/modules
nginx 文檔:http://nginx.org/en/docs/http/ngx_http_limit_conn_module.html
limit_conn_zone $binary_remote_addr zone=addr:10m;
server {
location /download/ {
limit_conn addr 1;
}
limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
server {
location /search/ {
limit_req zone=one burst=5 nodelay; #delay 并發(fā)數(shù) burst桶的大小
}
//limit,限流策略涎拉;hash瑞侮,記錄key的hash值;data鼓拧,記錄key的數(shù)據(jù)內(nèi)容半火;len,記錄key的數(shù)據(jù)長(zhǎng)度季俩;ep钮糖,待處理請(qǐng)求數(shù)目;account,是否是最后一條限流策略
static ngx_int_t ngx_http_limit_req_lookup(ngx_http_limit_req_limit_t *limit, ngx_uint_t hash, u_char *data, size_t len, ngx_uint_t *ep, ngx_uint_t account)
{
//紅黑樹查找指定界定,sentinel代表紅黑樹的NULL節(jié)點(diǎn)
while (node != sentinel) {
if (hash < node->key) {
node = node->left;
continue;
}
if (hash > node->key) {
node = node->right;
continue;
}
//hash值相等店归,比較數(shù)據(jù)是否相等
lr = (ngx_http_limit_req_node_t *) &node->color;
rc = ngx_memn2cmp(data, lr->data, len, (size_t) lr->len);
//查找到
if (rc == 0) {
ngx_queue_remove(&lr->queue);
ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); //將記錄移動(dòng)到LRU隊(duì)列頭部
ms = (ngx_msec_int_t) (now - lr->last); //當(dāng)前時(shí)間減去上次訪問時(shí)間
if (ms < -60000) {
ms = 1;
} else if (ms < 0) {
ms = 0;
}
//漏桶算法
excess = lr->excess - ctx->rate * ms / 1000 + 1000; //待處理請(qǐng)求書-限流速率*時(shí)間段+1個(gè)請(qǐng)求(速率阎抒,請(qǐng)求數(shù)等都乘以1000了)
//當(dāng)前積壓令牌數(shù) = 上次積壓令牌數(shù) - 這段時(shí)間可以產(chǎn)生的令牌數(shù) + 本次請(qǐng)求(1 個(gè)令牌)
if (excess < 0) {
excess = 0;
}
*ep = excess;
//待處理數(shù)目超過burst(等待隊(duì)列大小)消痛,返回NGX_BUSY拒絕請(qǐng)求(沒有配置burst時(shí)且叁,值為0)
if ((ngx_uint_t) excess > limit->burst) {
return NGX_BUSY;
}
if (account) { //如果是最后一條限流策略,則更新上次訪問時(shí)間秩伞,待處理請(qǐng)求數(shù)目逞带,返回NGX_OK
lr->excess = excess;
lr->last = now;
return NGX_OK;
}
//訪問次數(shù)遞增
lr->count++;
ctx->node = lr;
return NGX_AGAIN; //非最后一條限流策略,返回NGX_AGAIN纱新,繼續(xù)校驗(yàn)下一條限流策略
}
node = (rc < 0) ? node->left : node->right;
}
//假如沒有查找到節(jié)點(diǎn)展氓,需要新建一條記錄
*ep = 0;
size = offsetof(ngx_rbtree_node_t, color)
+ offsetof(ngx_http_limit_req_node_t, data)
+ len;
//嘗試淘汰記錄(LRU)
ngx_http_limit_req_expire(ctx, 1);
node = ngx_slab_alloc_locked(ctx->shpool, size);
if (node == NULL) { //空間不足,分配失敗
ngx_http_limit_req_expire(ctx, 0); //強(qiáng)制淘汰記錄
node = ngx_slab_alloc_locked(ctx->shpool, size);
if (node == NULL) { //分配失敗脸爱,返回NGX_ERROR
return NGX_ERROR;
}
}
node->key = hash;
lr = (ngx_http_limit_req_node_t *) &node->color;
lr->len = (u_char) len;
lr->excess = 0;
ngx_memcpy(lr->data, data, len);
ngx_rbtree_insert(&ctx->sh->rbtree, node); //插入記錄到紅黑樹與LRU隊(duì)列
ngx_queue_insert_head(&ctx->sh->queue, &lr->queue);
if (account) { //如果是最后一條限流策略带饱,則更新上次訪問時(shí)間,待處理請(qǐng)求數(shù)目阅羹,返回NGX_OK
lr->last = now;
lr->count = 0;
return NGX_OK;
}
lr->last = 0;
lr->count = 1;
ctx->node = lr;
return NGX_AGAIN; //非最后一條限流策略勺疼,返回NGX_AGAIN,繼續(xù)校驗(yàn)下一條限流策略
}
當(dāng)一個(gè)新請(qǐng)求進(jìn)入 Nginx 的限流流程大致如下:
計(jì)算當(dāng)前請(qǐng)求 IP 地址 hash 值(hash 值相等后進(jìn)而使用 IP 內(nèi)容判斷)捏鱼,在存放請(qǐng)求 IP 的紅黑樹中查找對(duì)應(yīng)位置
計(jì)算當(dāng)前請(qǐng)求和上次請(qǐng)求時(shí)間 (保存在紅黑樹節(jié)點(diǎn)的 value 中) 的差值 ms
根據(jù)公式 “excess = lr->excess - ctx->rate * ms / 1000 + 1000” 計(jì)算(漏桶算法的核心)
更新當(dāng)前節(jié)點(diǎn)信息(上一次請(qǐng)求時(shí)間等)执庐,根據(jù)限流結(jié)果返回響應(yīng)
二、令牌桶算法
令牌桶算法的原理是系統(tǒng)會(huì)以一個(gè)恒定的速度往桶里放入令牌导梆,而如果請(qǐng)求需要被處理轨淌,則需要先從桶里獲取一個(gè)令牌,當(dāng)桶里沒有令牌可取時(shí)看尼,則拒絕服務(wù)递鹉。
RateLimiter
令牌桶比較代表的實(shí)現(xiàn)方法是Guava下的RateLimiter
RateLimiter有兩種實(shí)現(xiàn)方式,SmoothBursty(非預(yù)熱)及SmoothWarmingUp(預(yù)熱藏斩,冷啟動(dòng))躏结,這里主要討論的是非預(yù)熱的方式,預(yù)熱的方式暫時(shí)沒看明白
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by yangzaining on 2020-10-14.
*/
@Slf4j
public class TestDemo {
/**
* QPS:5/s 兩個(gè)線程狰域,各間隔50ms獲取一次媳拴,每次獲取一個(gè)令牌
* <p>
* 疑問:
* 先前五個(gè)為什么不是都獲取成功?
* 剛初始化兆览,所以沒有令牌屈溉,所以獲取到一個(gè)則返回一個(gè)
*/
public static void test1() throws InterruptedException {
testRateLimiter(2, 11, 50, 5, 0, false);
}
/**
* QPS:5/s 兩個(gè)線程,各間隔50ms獲取一次抬探,每次獲取一個(gè)令牌子巾,令牌桶初始化1s
* <p>
* 疑問:
* 令牌存儲(chǔ)的是5個(gè),為啥前6次都成功了?
* 頭五次用的是令牌桶中的令牌线梗,第6次用的是下一個(gè)刻度的令牌椰于,延遲計(jì)算
*/
public static void test2() throws InterruptedException {
testRateLimiter(2, 11, 50, 5, 1000, false);
}
/**
*
* QPS:5/s 兩個(gè)線程,各間隔50ms獲取一次缠导,每次獲取一個(gè)令牌廉羔,令牌桶初始化1s
* 清楚的看見后面的訪問等待的時(shí)間逐步增加
*/
public static void test3() throws InterruptedException {
testRateLimiter(2, 11, 50, 5, 1000, true);
}
private static void testRateLimiter(int threadNumber, int count, int taskGapTime, int qps, int sleepTime, boolean useAcquire) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);
ZonedDateTime start = ZonedDateTime.now();
RateLimiter rateLimiter = RateLimiter.create(qps);
log.info("StartTime = {}", DateTimeFormatter.ISO_INSTANT.format(start));
if (sleepTime != 0) {
Thread.sleep(sleepTime);
}
for (int i = 0; i < threadNumber; i++) {
executorService.submit(() -> {
Boolean flag = null;
double waitTime = 0;
for (int j = 0; j < count; j++) {
if (useAcquire) {
waitTime = rateLimiter.acquire();
} else {
flag = rateLimiter.tryAcquire();
}
ZonedDateTime endTime = ZonedDateTime.now();
try {
Thread.sleep(taskGapTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (useAcquire) {
log.info("waitTime = {}, time = {}", waitTime, DateTimeFormatter.ISO_INSTANT.format(endTime));
} else {
log.info("canAcquire = {}, time = {}", flag, DateTimeFormatter.ISO_INSTANT.format(endTime));
}
}
});
}
}
public static void main(String[] args) throws InterruptedException {
// test1();
// test2();
test3();
}
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);//重新計(jì)算當(dāng)前存儲(chǔ)的令牌數(shù)(距離上次計(jì)算的時(shí)間),nextFreeTicketMicros = nowMicros
long returnValue = nextFreeTicketMicros; //nowMicros
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);//存儲(chǔ)令牌數(shù)
double freshPermits = requiredPermits - storedPermitsToSpend;//還需要的令牌數(shù)
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);//請(qǐng)求令牌需要等待的時(shí)間
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;//下次生成令牌的時(shí)間
this.storedPermits -= storedPermitsToSpend;//更新剩余令牌數(shù)
return returnValue; //nowMicros
}
如圖所示僻造,任務(wù)7若需要請(qǐng)求令牌憋他,就需要償還任務(wù)6獲取令牌所需等待時(shí)間
Redis+lua
RateLimiter只滿足了單機(jī)的限流方式,多臺(tái)機(jī)器的限流需要用到redis lua腳本的方式去實(shí)現(xiàn)
1.減少網(wǎng)絡(luò)開銷:本來多次網(wǎng)絡(luò)請(qǐng)求的操作髓削,可以用一個(gè)請(qǐng)求完成竹挡,原先多次請(qǐng)求的邏輯放在lua腳本中,通過redis lua解釋器去完成立膛。使用腳本揪罕,減少了網(wǎng)絡(luò)往返時(shí)延。
2.原子操作:Redis會(huì)將整個(gè)腳本作為一個(gè)整體執(zhí)行宝泵,中間不會(huì)被其他命令插入好啰。
@Override
public RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"http://速率
+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"http://過期時(shí)間
+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",//類型
Collections.<Object>singletonList(getName()), rate, unit.toMillis(rateInterval), type.ordinal());
}
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
+ "local valueName = KEYS[2];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "end;"
+ "local currentValue = redis.call('get', valueName); "
+ "if currentValue ~= false then "http://當(dāng)前存有值
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "http://若比剩余數(shù)量小,則等待下一個(gè)時(shí)間片段
+ "return redis.call('pttl', valueName); "
+ "else "
+ "redis.call('decrby', valueName, ARGV[1]); "http://足夠的話儿奶,則減去相應(yīng)的令牌
+ "return nil; "
+ "end; "
+ "else "
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
+ "redis.call('set', valueName, rate, 'px', interval); "http://設(shè)置對(duì)應(yīng)的令牌數(shù)框往,同時(shí)扣除本次的令牌數(shù)量
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
Arrays.<Object>asList(getName(), getValueName(), getClientValueName()),
value, commandExecutor.getConnectionManager().getId().toString());
}
總結(jié)
算法 | 特點(diǎn) | 內(nèi)容 |
---|---|---|
nginx req_limit_model(漏桶算法) | 把請(qǐng)求以平均速率消費(fèi),多出桶內(nèi)數(shù)量的請(qǐng)求則被拒絕 | 根據(jù)上一次消費(fèi)的時(shí)間計(jì)算本次消費(fèi)所堆積的數(shù)量闯捎,excess = lr->excess - ctx->rate * ms / 1000 + 1000 |
Guava rateLimiter | 同漏桶算法椰弊,額外支持突發(fā)的流量請(qǐng)求(存儲(chǔ)的令牌數(shù)),需要注意第一次消費(fèi)數(shù)量不受限 | 根據(jù)訪問的時(shí)間計(jì)算,當(dāng)前是否需要等待令牌產(chǎn)生瓤鼻,無需等待秉版,則后續(xù)的請(qǐng)求補(bǔ)償本次的等待時(shí)間 |
Redis lua | 通過lua腳本的形式保證原子性,適用集群的限流 | 通過以速率作為過期時(shí)間存儲(chǔ)令牌數(shù)茬祷,若剩余的數(shù)量不足時(shí)清焕,則返回對(duì)應(yīng)的等待時(shí)間,與rateLimiter不同的是牲迫,rateLimiter是邊初始化邊放令牌耐朴,而redis lua是一開始就存儲(chǔ)最大令牌數(shù) |
參考文獻(xiàn):https://segmentfault.com/a/1190000020272200?utm_source=tag-newest