前言
在之前這篇文章中,我大致介紹了一下google guava庫中的RateLimiter的實現(xiàn)以及它背后的令牌桶算法原理脯倚。但是也有新的問題渔彰,在分布式的環(huán)境中,我們?nèi)绾吾槍Χ鄼C環(huán)境做限流呢推正?在查閱了一些資料和其他人的博客之后恍涂,我采用了redis來作為限流器的實現(xiàn)基礎。
原因主要有以下幾點:
- redis作為高性能緩存系統(tǒng)植榕,性能上能夠滿足多機之間高并發(fā)訪問的要求
- redis有比較好的api來支持限流器令牌桶算法的實現(xiàn)
- 對于我們的系統(tǒng)來說再沧,通過spring data redis來操作比較簡單和常見,避免了引入新的中間件帶來的風險
但是我們也知道尊残,限流器在每次請求令牌和放入令牌操作中炒瘸,存在一個協(xié)同的問題,即獲取令牌操作要盡可能保證原子性寝衫,否則無法保證限流器是否能正常工作顷扩。在RateLimiter的實現(xiàn)中使用了mutex作為互斥鎖來保證操作的原子性,那么在redis中就需要一個類似于事務的機制來保證獲取令牌中多重操作的原子性慰毅。
面對這樣的需求隘截,我們有幾個選擇:
- 用redis實現(xiàn)分布式鎖來保證操作的原子性,這個方案實現(xiàn)起來應該比較簡單汹胃,分布式鎖有現(xiàn)成的例子婶芭,然后就是把Rate Limiter的代碼套用分布式鎖就行了,但是這樣的話效率會顯得不太高统台,特別是在大量訪問的情況下雕擂。
- 用redis的transaction啡邑,在我查閱redis官方文檔和stackoverflow之后發(fā)現(xiàn)redis的transaction官方并不推薦贱勃,并且有可能在未來取消事務,因此不可取。
- 通過redis分布式鎖和本地鎖組成一個雙層結構贵扰,每次分布式獲取鎖之后可以預支一部分令牌量仇穗,然后放到本地通過本地的鎖來分配這些令牌,消耗完之后再到請求redis戚绕。這樣的好處是相比第一個方案纹坐,網(wǎng)絡訪問延遲開銷會比較好,但是實現(xiàn)難度和復雜程度比較難估量舞丛,而且這樣的做法如果在多機不能保證均勻分配流量的情況下并不理想
- 通過將獲取鎖封裝到lua腳本中耘子,提交給redis進行eval和evalsha操作來完成lua腳本的執(zhí)行,由于lua腳本在redis中天然的原子性球切,我們的需求能夠比較好的滿足谷誓,問題是將業(yè)務邏輯封裝在lua中,對于開發(fā)人員自身的能力和調(diào)試存在一定的問題吨凑。
經(jīng)過權衡捍歪,我采用了第四種方式,通過redis和lua來編寫令牌桶算法來完成分布式限流的需求鸵钝。
lua腳本
話不多說糙臼,先貼出lua代碼
-- 返回碼 1:操作成功 0:未配置 -1: 獲取失敗 -2:修改錯誤,建議重新初始化 -500:不支持的操作
-- redis hashmap 中存放的內(nèi)容:
-- last_mill_second 上次放入令牌或者初始化的時間
-- stored_permits 目前令牌桶中的令牌數(shù)量
-- max_permits 令牌桶容量
-- interval 放令牌間隔
-- app 一個標志位恩商,表示對于當前key有沒有限流存在
local SUCCESS = 1
local NO_LIMIT = 0
local ACQUIRE_FAIL = -1
local MODIFY_ERROR = -2
local UNSUPPORT_METHOD = -500
local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app")
local last_mill_second = ratelimit_info[1]
local stored_permits = tonumber(ratelimit_info[2])
local max_permits = tonumber(ratelimit_info[3])
local interval = tonumber(ratelimit_info[4])
local app = ratelimit_info[5]
local method = ARGV[1]
--獲取當前毫秒
--考慮主從策略和腳本回放機制变逃,這個time由客戶端獲取傳入
--local curr_time_arr = redis.call('TIME')
--local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000
local curr_timestamp = tonumber(ARGV[2])
-- 當前方法為初始化
if method == 'init' then
--如果app不為null說明已經(jīng)初始化過,不要重復初始化
if(type(app) ~='boolean' and app ~=nil) then
return SUCCESS
end
redis.pcall("HMSET", KEYS[1],
"last_mill_second", curr_timestamp,
"stored_permits", ARGV[3],
"max_permits", ARGV[4],
"interval", ARGV[5],
"app", ARGV[6])
--始終返回成功
return SUCCESS
end
-- 當前方法為修改配置
if method == "modify" then
if(type(app) =='boolean' or app ==nil) then
return MODIFY_ERROR
end
--只能修改max_permits和interval
redis.pcall("HMSET", KEYS[1],
"max_permits", ARGV[3],
"interval", ARGV[4])
return SUCCESS
end
-- 當前方法為刪除
if method == "delete" then
--已經(jīng)清除完畢
if(type(app) =='boolean' or app ==nil) then
return SUCCESS
end
redis.pcall("DEL", KEYS[1])
return SUCCESS
end
-- 嘗試獲取permits
if method == "acquire" then
-- 如果app為null說明沒有對這個進行任何配置痕届,返回0代表不限流
if(type(app) =='boolean' or app ==nil) then
return NO_LIMIT
end
--需要獲取令牌數(shù)量
local acquire_permits = tonumber(ARGV[3])
--計算上一次放令牌到現(xiàn)在的時間間隔中韧献,一共應該放入多少令牌
local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval))
local new_permits = math.min(max_permits, stored_permits + reserve_permits)
local result = ACQUIRE_FAIL
--如果桶中令牌數(shù)量夠則放行
if new_permits >= acquire_permits then
result = SUCCESS
new_permits = new_permits - acquire_permits
end
--更新當前桶中的令牌數(shù)量
redis.pcall("HSET", KEYS[1], "stored_permits", new_permits)
--如果這次有放入令牌,則更新時間
if reserve_permits > 0 then
redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp)
end
return result
end
return UNSUPPORT_METHOD
絕大部分邏輯在注釋里面都已經(jīng)寫清楚了(我java客戶端用的代碼刪掉了所有的注釋研叫,因為提交上去報編譯錯誤锤窑,但是redis-cli調(diào)試就沒問題,我也沒太關注原因)嚷炉。
大致上渊啰,我在這個腳本中編寫了4種函數(shù):
- init 初始化限流器
- modify 修改限流器配置(主要針對限流器的桶大小和放令牌間隔,即1/QPS)
- delete 刪除限流器配置
- acquire 嘗試獲取制定數(shù)目的令牌
代碼基本上仿照了Guava RateLimiter的邏輯申屹,實現(xiàn)了觸發(fā)式的放令牌策略绘证。
由于我的需求中不需要像guava RateLimiter那樣的預支令牌的邏輯,因此如果當前沒有令牌可供服務哗讥,我就直接返回獲取失敗了嚷那。
還有一點需要注意的是,我本來在腳本中寫了獲取redis服務器當前時間的代碼杆煞,但是我通過redis-cli執(zhí)行的時候報錯了:
Write commands not allowed after non deterministic commands.
這個錯誤的原因大家可以參見這篇文章魏宽,大致原因跟redis集群的重放和備份策略有關腐泻,相當于我調(diào)用TIME操作,會在主從各執(zhí)行一次队询,得到的結果肯定會存在差異派桩,這個差異就給最終邏輯正確性帶來了不確定性。在redis 4.0之后引入了redis.replicate_commands()來放開限制蚌斩。但我考慮了幾個因素之后铆惑,還是采用網(wǎng)上大部分人的做法,在執(zhí)行前先行獲取到redis的時間戳送膳,然后當做參數(shù)傳上去员魏。
lua調(diào)試
對lua調(diào)試最開始花掉了我不少時間,主要對于redis-cli命令不太熟悉叠聋。大家有一樣問題的可以參見這篇文章逆趋。大致來說就是將寫好的腳本放到redis所在文件夾下(我是windows環(huán)境),然后在cmd下執(zhí)行 redis-cli.exe --eval rate_limit.lua test2(key晒奕,可重復) , (逗號分隔) init 10101 100 100 10 test2 (后跟參數(shù)闻书,空格隔開)。
java集成
在完成了lua的調(diào)試工作之后脑慧,我們就開始java部分的集成代碼編寫魄眉,我們使用的是spring boot來完成開發(fā)。
第一部分是redis配置:
@Bean("rateLimitLua")
public DefaultRedisScript<Long> getRateLimitScript() {
DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>();
rateLimitLua.setLocation(new ClassPathResource("rate_limit.lua"));
rateLimitLua.setResultType(Long.class);
return rateLimitLua;
}
然后是一些與lua適配的枚舉和一些bean:
/**
* @author: Yuanqing Luo
* @date: 2018/10/22
*
* 限流的具體方法
*/
public enum RateLimitMethod {
//initialize rate limiter
init,
//modify rate limiter parameter
modify,
//delete rate limiter
delete,
//acquire permits
acquire;
}
/**
* @author: Yuanqing Luo
* @date: 2018/10/22
* rate limite result
**/
public enum RateLimitResult {
SUCCESS(1L),
NO_LIMIT(0L),
ACQUIRE_FAIL(-1L),
MODIFY_ERROR(-2L),
UNSUPPORT_METHOD(-500L),
ERROR(-505L);
private Long code;
RateLimitResult(Long code){
this.code = code;
}
public static RateLimitResult getResult(Long code){
for(RateLimitResult enums: RateLimitResult.values()){
if(enums.code.equals(code)){
return enums;
}
}
throw new IllegalArgumentException("unknown rate limit return code:" + code);
}
}
/**
* @author: Yuanqing Luo
* @date: 2018/10/22
**/
@Getter
@Setter
public class RateLimitVo {
private String url;
private boolean isLimit;
private Double interval;
private Integer maxPermits;
private Integer initialPermits;
}
第三部分就是限流器的調(diào)用組裝部分:
/**
* @author: Yuanqing Luo
* @date: 2018/10/22
**/
@Service
@Slf4j
public class RateLimitClient {
private static final String RATE_LIMIT_PREFIX = "ratelimit:";
@Autowired
StringRedisTemplate redisTemplate;
@Resource
@Qualifier("rateLimitLua")
RedisScript<Long> rateLimitScript;
public RateLimitResult init(String key, RateLimitVo rateLimitInfo){
return exec(key, RateLimitMethod.init,
rateLimitInfo.getInitialPermits(),
rateLimitInfo.getMaxPermits(),
rateLimitInfo.getInterval(),
key);
}
public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){
return exec(key, RateLimitMethod.modify, key,
rateLimitInfo.getMaxPermits(),
rateLimitInfo.getInterval());
}
public RateLimitResult delete(String key){
return exec(key, RateLimitMethod.delete);
}
public RateLimitResult acquire(String key){
return acquire(key, 1);
}
public RateLimitResult acquire(String key, Integer permits){
return exec(key, RateLimitMethod.acquire, permits);
}
/**
* 執(zhí)行redis的具體方法闷袒,限制method,保證沒有其他的東西進來
* @param key
* @param method
* @param params
* @return
*/
private RateLimitResult exec(String key, RateLimitMethod method, Object... params){
try {
Long timestamp = getRedisTimestamp();
String[] allParams = new String[params.length + 2];
allParams[0] = method.name();
allParams[1] = timestamp.toString();
for(int index = 0;index < params.length; index++){
allParams[2 + index] = params[index].toString();
}
Long result = redisTemplate.execute(rateLimitScript,
Collections.singletonList(getKey(key)),
allParams);
return RateLimitResult.getResult(result);
} catch (Exception e){
log.error("execute redis script fail, key:{}, method:{}",
key, method.name(), e);
return RateLimitResult.ERROR;
}
}
private Long getRedisTimestamp(){
Long currMillSecond = redisTemplate.execute(
(RedisCallback<Long>) redisConnection -> redisConnection.time()
);
return currMillSecond;
}
private String getKey(String key){
return RATE_LIMIT_PREFIX + key;
}
}
java代碼這塊比較簡單了坑律,基本就是封裝了之前l(fā)ua腳本中的4項操作。
第四部分就是測試代碼:
/**
* @author: Yuanqing Luo
* @date: 2018/10/22
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OpenApiGatewayApplication.class)
public class RateLimitTest {
@Autowired
private RateLimitClient rateLimitClient;
@Test
public void testInit(){
RateLimitVo vo = new RateLimitVo();
vo.setInitialPermits(500);
vo.setMaxPermits(500);
vo.setInterval(2.0);
rateLimitClient.init("test", vo);
}
@Test
public void testAcquire() throws InterruptedException {
//10個線程
ExecutorService executorService = Executors.newFixedThreadPool(20);
Subject<RateLimitSummary, RateLimitSummary> writeSubject = new SerializedSubject<RateLimitSummary, RateLimitSummary>(PublishSubject.<RateLimitSummary>create());
Observable<RateLimitSummary> readSubject = writeSubject.share();
Observable<RateLimitSummary> bucketStream = Observable.defer(()->{
return readSubject.window(200, TimeUnit.MILLISECONDS)
.flatMap(
observable->
observable.reduce(new RateLimitSummary(0,0,0),
(a, b)-> a.reduce(b))
);
});
Observable<RateLimitSummary> rollingBucketStream = bucketStream.window(5, 1)
.flatMap(observable->observable.reduce(new RateLimitSummary(0, 0, 0),
(a, b)-> a.reduce(b)));
Runnable acquire = () -> {
Random random = new Random();
while(true){
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
RateLimitResult result = rateLimitClient.acquire("test");
writeSubject.onNext(new RateLimitSummary(result));
}
};
//初始時間
final long currentMillis = System.currentTimeMillis();
rollingBucketStream.subscribe(summary->{
double timestamp = (System.currentTimeMillis() - currentMillis)/1000.0;
System.out.println("time:"+ timestamp + ", acquired:" + summary.acquire +
", reject " + summary.reject + ", error: " + summary.error);
});
for(int i=0;i<20;i++){
executorService.submit(acquire);
}
while(true){
Thread.sleep(5000);
}
}
private static class RateLimitSummary{
public int acquire;
public int reject;
public int error;
public RateLimitSummary(RateLimitResult result){
this.acquire = result == RateLimitResult.SUCCESS?1:0;
this.reject = result == RateLimitResult.ACQUIRE_FAIL?1:0;
this.error = result == RateLimitResult.ERROR?1:0;
}
public RateLimitSummary(int acquire, int reject, int error){
this.acquire = acquire;
this.reject = reject;
this.error = error;
}
public RateLimitSummary reduce(RateLimitSummary toAdd){
return new RateLimitSummary(this.acquire + toAdd.acquire,
this.reject + toAdd.reject,
this.error + toAdd.error);
}
}
}
這一段代碼我仿照了Hystrix中的熔斷統(tǒng)計的代碼囊骤,通過一個subject來存放獲取令牌結果晃择,然后通過第一層bucketStream來將令牌結果按照200ms來分組并且reduce成一個結果。接著通過rollingBucketStream來將200ms的分組組合成一個一秒的時間窗(即5個為一組)也物,并且以200ms為步長滾動宫屠。最后統(tǒng)計出來的結果通過subscribe來打印結果。之前的init代碼我們看已經(jīng)初始化了一個大小為500的令牌桶滑蚯,存放令牌的時間間隔為2.0ms浪蹂,所以支持的QPS為500。接著我們執(zhí)行這段代碼告材,并截取一部分輸出:
time:75.857, acquired:460, reject 8, error: 0
time:76.056, acquired:483, reject 36, error: 0
time:76.268, acquired:506, reject 52, error: 0
time:76.454, acquired:503, reject 59, error: 0
time:76.707, acquired:457, reject 69, error: 0
time:76.854, acquired:417, reject 66, error: 0
time:77.054, acquired:454, reject 36, error: 0
time:77.255, acquired:459, reject 54, error: 0
time:77.453, acquired:458, reject 77, error: 0
time:77.658, acquired:474, reject 103, error: 0
time:77.858, acquired:490, reject 132, error: 0
可以看到坤次,這個結果基本每200ms輸出一次,然后一秒鐘內(nèi)的獲取了令牌數(shù)目最大值跟500接近斥赋,并且能夠很好地處理reject缰猴。有一部分結果一秒鐘獲取的令牌數(shù)與500差距較大,我分析的原因是因為請求重復時間段比較多疤剑,很多請求發(fā)生在前一個獲取了令牌之后的2ms內(nèi)滑绒,產(chǎn)生了reject胰舆。
結語
通過redis和lua,我實現(xiàn)了一個簡單的分布式限流器蹬挤。通過上述代碼,大家能看到一個大致的實現(xiàn)框架棘幸,并且通過測試代碼完成了驗證焰扳。如果各位看官有什么問題歡迎留言,希望能跟大家共同學習误续。