基于redis和lua的分布式限流器設計與實現(xiàn)

前言

之前這篇文章中,我大致介紹了一下google guava庫中的RateLimiter的實現(xiàn)以及它背后的令牌桶算法原理脯倚。但是也有新的問題渔彰,在分布式的環(huán)境中,我們?nèi)绾吾槍Χ鄼C環(huán)境做限流呢推正?在查閱了一些資料和其他人的博客之后恍涂,我采用了redis來作為限流器的實現(xiàn)基礎。
原因主要有以下幾點:

  • redis作為高性能緩存系統(tǒng)植榕,性能上能夠滿足多機之間高并發(fā)訪問的要求
  • redis有比較好的api來支持限流器令牌桶算法的實現(xiàn)
  • 對于我們的系統(tǒng)來說再沧,通過spring data redis來操作比較簡單和常見,避免了引入新的中間件帶來的風險

但是我們也知道尊残,限流器在每次請求令牌和放入令牌操作中炒瘸,存在一個協(xié)同的問題,即獲取令牌操作要盡可能保證原子性寝衫,否則無法保證限流器是否能正常工作顷扩。在RateLimiter的實現(xiàn)中使用了mutex作為互斥鎖來保證操作的原子性,那么在redis中就需要一個類似于事務的機制來保證獲取令牌中多重操作的原子性慰毅。
面對這樣的需求隘截,我們有幾個選擇:

  • 用redis實現(xiàn)分布式鎖來保證操作的原子性,這個方案實現(xiàn)起來應該比較簡單汹胃,分布式鎖有現(xiàn)成的例子婶芭,然后就是把Rate Limiter的代碼套用分布式鎖就行了,但是這樣的話效率會顯得不太高统台,特別是在大量訪問的情況下雕擂。
  • 用redis的transaction啡邑,在我查閱redis官方文檔和stackoverflow之后發(fā)現(xiàn)redis的transaction官方并不推薦贱勃,并且有可能在未來取消事務,因此不可取。
  • 通過redis分布式鎖和本地鎖組成一個雙層結構贵扰,每次分布式獲取鎖之后可以預支一部分令牌量仇穗,然后放到本地通過本地的鎖來分配這些令牌,消耗完之后再到請求redis戚绕。這樣的好處是相比第一個方案纹坐,網(wǎng)絡訪問延遲開銷會比較好,但是實現(xiàn)難度和復雜程度比較難估量舞丛,而且這樣的做法如果在多機不能保證均勻分配流量的情況下并不理想
  • 通過將獲取鎖封裝到lua腳本中耘子,提交給redis進行eval和evalsha操作來完成lua腳本的執(zhí)行,由于lua腳本在redis中天然的原子性球切,我們的需求能夠比較好的滿足谷誓,問題是將業(yè)務邏輯封裝在lua中,對于開發(fā)人員自身的能力和調(diào)試存在一定的問題吨凑。

經(jīng)過權衡捍歪,我采用了第四種方式,通過redis和lua來編寫令牌桶算法來完成分布式限流的需求鸵钝。

lua腳本

話不多說糙臼,先貼出lua代碼


-- 返回碼 1:操作成功 0:未配置 -1: 獲取失敗 -2:修改錯誤,建議重新初始化 -500:不支持的操作
-- redis hashmap 中存放的內(nèi)容:
-- last_mill_second 上次放入令牌或者初始化的時間
-- stored_permits 目前令牌桶中的令牌數(shù)量
-- max_permits 令牌桶容量
-- interval 放令牌間隔
-- app 一個標志位恩商,表示對于當前key有沒有限流存在

local SUCCESS = 1
local NO_LIMIT = 0
local ACQUIRE_FAIL = -1
local MODIFY_ERROR = -2
local UNSUPPORT_METHOD = -500

local ratelimit_info = redis.pcall("HMGET",KEYS[1], "last_mill_second", "stored_permits", "max_permits", "interval", "app")
local last_mill_second = ratelimit_info[1]
local stored_permits = tonumber(ratelimit_info[2])
local max_permits = tonumber(ratelimit_info[3])
local interval = tonumber(ratelimit_info[4])
local app = ratelimit_info[5]

local method = ARGV[1]

--獲取當前毫秒
--考慮主從策略和腳本回放機制变逃,這個time由客戶端獲取傳入
--local curr_time_arr = redis.call('TIME')
--local curr_timestamp = curr_time_arr[1] * 1000 + curr_time_arr[2]/1000
local curr_timestamp = tonumber(ARGV[2])


-- 當前方法為初始化
if method == 'init' then
    --如果app不為null說明已經(jīng)初始化過,不要重復初始化
    if(type(app) ~='boolean' and app ~=nil) then
        return SUCCESS
    end

    redis.pcall("HMSET", KEYS[1],
        "last_mill_second", curr_timestamp,
        "stored_permits", ARGV[3],
        "max_permits", ARGV[4],
        "interval", ARGV[5],
        "app", ARGV[6])
    --始終返回成功
    return SUCCESS
end

-- 當前方法為修改配置
if method == "modify" then
    if(type(app) =='boolean' or app ==nil) then
        return MODIFY_ERROR
    end
    --只能修改max_permits和interval
    redis.pcall("HMSET", KEYS[1],
        "max_permits", ARGV[3],
        "interval", ARGV[4])

    return SUCCESS

end

-- 當前方法為刪除
if method == "delete" then
    --已經(jīng)清除完畢
    if(type(app) =='boolean' or app ==nil) then
        return SUCCESS
    end
    redis.pcall("DEL", KEYS[1])
    return SUCCESS
end

-- 嘗試獲取permits
if method == "acquire" then
    -- 如果app為null說明沒有對這個進行任何配置痕届,返回0代表不限流
    if(type(app) =='boolean' or app ==nil) then
        return NO_LIMIT
    end
    --需要獲取令牌數(shù)量
    local acquire_permits = tonumber(ARGV[3])
    --計算上一次放令牌到現(xiàn)在的時間間隔中韧献,一共應該放入多少令牌
    local reserve_permits = math.max(0, math.floor((curr_timestamp - last_mill_second) / interval))
    
    local new_permits = math.min(max_permits, stored_permits + reserve_permits)
    local result = ACQUIRE_FAIL
    --如果桶中令牌數(shù)量夠則放行
    if new_permits >= acquire_permits then
        result = SUCCESS
        new_permits = new_permits - acquire_permits
    end
    --更新當前桶中的令牌數(shù)量 
    redis.pcall("HSET", KEYS[1], "stored_permits", new_permits)
    --如果這次有放入令牌,則更新時間
    if reserve_permits > 0 then
        redis.pcall("HSET", KEYS[1], "last_mill_second", curr_timestamp)
    end
    return result
end


return UNSUPPORT_METHOD

絕大部分邏輯在注釋里面都已經(jīng)寫清楚了(我java客戶端用的代碼刪掉了所有的注釋研叫,因為提交上去報編譯錯誤锤窑,但是redis-cli調(diào)試就沒問題,我也沒太關注原因)嚷炉。
大致上渊啰,我在這個腳本中編寫了4種函數(shù):

  • init 初始化限流器
  • modify 修改限流器配置(主要針對限流器的桶大小和放令牌間隔,即1/QPS)
  • delete 刪除限流器配置
  • acquire 嘗試獲取制定數(shù)目的令牌

代碼基本上仿照了Guava RateLimiter的邏輯申屹,實現(xiàn)了觸發(fā)式的放令牌策略绘证。
由于我的需求中不需要像guava RateLimiter那樣的預支令牌的邏輯,因此如果當前沒有令牌可供服務哗讥,我就直接返回獲取失敗了嚷那。
還有一點需要注意的是,我本來在腳本中寫了獲取redis服務器當前時間的代碼杆煞,但是我通過redis-cli執(zhí)行的時候報錯了:

Write commands not allowed after non deterministic commands.

這個錯誤的原因大家可以參見這篇文章魏宽,大致原因跟redis集群的重放和備份策略有關腐泻,相當于我調(diào)用TIME操作,會在主從各執(zhí)行一次队询,得到的結果肯定會存在差異派桩,這個差異就給最終邏輯正確性帶來了不確定性。在redis 4.0之后引入了redis.replicate_commands()來放開限制蚌斩。但我考慮了幾個因素之后铆惑,還是采用網(wǎng)上大部分人的做法,在執(zhí)行前先行獲取到redis的時間戳送膳,然后當做參數(shù)傳上去员魏。

lua調(diào)試

對lua調(diào)試最開始花掉了我不少時間,主要對于redis-cli命令不太熟悉叠聋。大家有一樣問題的可以參見這篇文章逆趋。大致來說就是將寫好的腳本放到redis所在文件夾下(我是windows環(huán)境),然后在cmd下執(zhí)行 redis-cli.exe --eval rate_limit.lua test2(key晒奕,可重復) , (逗號分隔) init 10101 100 100 10 test2 (后跟參數(shù)闻书,空格隔開)。

java集成

在完成了lua的調(diào)試工作之后脑慧,我們就開始java部分的集成代碼編寫魄眉,我們使用的是spring boot來完成開發(fā)。
第一部分是redis配置:

    @Bean("rateLimitLua")
    public DefaultRedisScript<Long> getRateLimitScript() {
        DefaultRedisScript<Long> rateLimitLua = new DefaultRedisScript<>();
        rateLimitLua.setLocation(new ClassPathResource("rate_limit.lua"));
        rateLimitLua.setResultType(Long.class);
        return rateLimitLua;
    }

然后是一些與lua適配的枚舉和一些bean:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 *
 * 限流的具體方法
 */
public enum RateLimitMethod {

    //initialize rate limiter
    init,

    //modify rate limiter parameter
    modify,

    //delete rate limiter
    delete,

    //acquire permits
    acquire;
}
/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 * rate limite result
 **/
public enum RateLimitResult {

    SUCCESS(1L),
    NO_LIMIT(0L),
    ACQUIRE_FAIL(-1L),
    MODIFY_ERROR(-2L),
    UNSUPPORT_METHOD(-500L),
    ERROR(-505L);

    private Long code;

    RateLimitResult(Long code){
        this.code = code;
    }

    public static RateLimitResult getResult(Long code){
        for(RateLimitResult enums: RateLimitResult.values()){
            if(enums.code.equals(code)){
                return enums;
            }
        }
        throw new IllegalArgumentException("unknown rate limit return code:" + code);
    }
}
/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@Getter
@Setter
public class RateLimitVo {

    private String url;

    private boolean isLimit;

    private Double interval;

    private Integer maxPermits;

    private Integer initialPermits;

}

第三部分就是限流器的調(diào)用組裝部分:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@Service
@Slf4j
public class RateLimitClient {

    private static final String RATE_LIMIT_PREFIX = "ratelimit:";

    @Autowired
    StringRedisTemplate redisTemplate;

    @Resource
    @Qualifier("rateLimitLua")
    RedisScript<Long> rateLimitScript;

    public RateLimitResult init(String key, RateLimitVo rateLimitInfo){
        return exec(key, RateLimitMethod.init,
                rateLimitInfo.getInitialPermits(),
                rateLimitInfo.getMaxPermits(),
                rateLimitInfo.getInterval(),
                key);
    }

    public RateLimitResult modify(String key, RateLimitVo rateLimitInfo){
        return exec(key, RateLimitMethod.modify, key,
                rateLimitInfo.getMaxPermits(),
                rateLimitInfo.getInterval());
    }

    public RateLimitResult delete(String key){
        return exec(key, RateLimitMethod.delete);
    }

    public RateLimitResult acquire(String key){
        return acquire(key, 1);
    }

    public RateLimitResult acquire(String key, Integer permits){
        return exec(key, RateLimitMethod.acquire, permits);
    }

    /**
     * 執(zhí)行redis的具體方法闷袒,限制method,保證沒有其他的東西進來
     * @param key
     * @param method
     * @param params
     * @return
     */
    private RateLimitResult exec(String key, RateLimitMethod method, Object... params){
        try {
            Long timestamp = getRedisTimestamp();
            String[] allParams = new String[params.length + 2];
            allParams[0] = method.name();
            allParams[1] = timestamp.toString();
            for(int index = 0;index < params.length; index++){
                allParams[2 + index] = params[index].toString();
            }
            Long result = redisTemplate.execute(rateLimitScript,
                    Collections.singletonList(getKey(key)),
                    allParams);
            return RateLimitResult.getResult(result);
        } catch (Exception e){
            log.error("execute redis script fail, key:{}, method:{}",
                    key, method.name(), e);
            return RateLimitResult.ERROR;
        }
    }

    private Long getRedisTimestamp(){
        Long currMillSecond = redisTemplate.execute(
                (RedisCallback<Long>) redisConnection -> redisConnection.time()
        );
        return currMillSecond;
    }
    private String getKey(String key){
        return RATE_LIMIT_PREFIX + key;
    }
}

java代碼這塊比較簡單了坑律,基本就是封裝了之前l(fā)ua腳本中的4項操作。

第四部分就是測試代碼:

/**
 * @author: Yuanqing Luo
 * @date: 2018/10/22
 **/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OpenApiGatewayApplication.class)
public class RateLimitTest {

    @Autowired
    private RateLimitClient rateLimitClient;

    @Test
    public void testInit(){
        RateLimitVo vo = new RateLimitVo();
        vo.setInitialPermits(500);
        vo.setMaxPermits(500);
        vo.setInterval(2.0);
        rateLimitClient.init("test", vo);
    }

    @Test
    public void testAcquire() throws InterruptedException {
        //10個線程
        ExecutorService executorService = Executors.newFixedThreadPool(20);

        Subject<RateLimitSummary, RateLimitSummary> writeSubject = new SerializedSubject<RateLimitSummary, RateLimitSummary>(PublishSubject.<RateLimitSummary>create());
        Observable<RateLimitSummary> readSubject = writeSubject.share();
        Observable<RateLimitSummary> bucketStream = Observable.defer(()->{
            return readSubject.window(200, TimeUnit.MILLISECONDS)
                    .flatMap(
                            observable->
                                    observable.reduce(new RateLimitSummary(0,0,0),
                                            (a, b)-> a.reduce(b))
                    );
        });
        Observable<RateLimitSummary> rollingBucketStream = bucketStream.window(5, 1)
                .flatMap(observable->observable.reduce(new RateLimitSummary(0, 0, 0),
                        (a, b)-> a.reduce(b)));

        Runnable acquire = () -> {
            Random random = new Random();
            while(true){
                try {
                    Thread.sleep(30);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                RateLimitResult result = rateLimitClient.acquire("test");
                writeSubject.onNext(new RateLimitSummary(result));
            }
        };
        //初始時間
        final long currentMillis = System.currentTimeMillis();
        rollingBucketStream.subscribe(summary->{
            double timestamp = (System.currentTimeMillis() - currentMillis)/1000.0;
            System.out.println("time:"+ timestamp + ", acquired:" + summary.acquire +
                    ", reject " + summary.reject + ", error: " + summary.error);
        });
        for(int i=0;i<20;i++){
            executorService.submit(acquire);
        }
        while(true){
            Thread.sleep(5000);
        }
    }

    private static class RateLimitSummary{
        public int acquire;
        public int reject;
        public int error;

        public RateLimitSummary(RateLimitResult result){
            this.acquire = result == RateLimitResult.SUCCESS?1:0;
            this.reject = result == RateLimitResult.ACQUIRE_FAIL?1:0;
            this.error = result == RateLimitResult.ERROR?1:0;
        }

        public RateLimitSummary(int acquire, int reject, int error){
            this.acquire = acquire;
            this.reject = reject;
            this.error = error;
        }

        public RateLimitSummary reduce(RateLimitSummary toAdd){
            return new RateLimitSummary(this.acquire + toAdd.acquire,
                    this.reject + toAdd.reject,
                    this.error + toAdd.error);

        }
    }

}

這一段代碼我仿照了Hystrix中的熔斷統(tǒng)計的代碼囊骤,通過一個subject來存放獲取令牌結果晃择,然后通過第一層bucketStream來將令牌結果按照200ms來分組并且reduce成一個結果。接著通過rollingBucketStream來將200ms的分組組合成一個一秒的時間窗(即5個為一組)也物,并且以200ms為步長滾動宫屠。最后統(tǒng)計出來的結果通過subscribe來打印結果。之前的init代碼我們看已經(jīng)初始化了一個大小為500的令牌桶滑蚯,存放令牌的時間間隔為2.0ms浪蹂,所以支持的QPS為500。接著我們執(zhí)行這段代碼告材,并截取一部分輸出:

time:75.857, acquired:460, reject 8, error: 0
time:76.056, acquired:483, reject 36, error: 0
time:76.268, acquired:506, reject 52, error: 0
time:76.454, acquired:503, reject 59, error: 0
time:76.707, acquired:457, reject 69, error: 0
time:76.854, acquired:417, reject 66, error: 0
time:77.054, acquired:454, reject 36, error: 0
time:77.255, acquired:459, reject 54, error: 0
time:77.453, acquired:458, reject 77, error: 0
time:77.658, acquired:474, reject 103, error: 0
time:77.858, acquired:490, reject 132, error: 0

可以看到坤次,這個結果基本每200ms輸出一次,然后一秒鐘內(nèi)的獲取了令牌數(shù)目最大值跟500接近斥赋,并且能夠很好地處理reject缰猴。有一部分結果一秒鐘獲取的令牌數(shù)與500差距較大,我分析的原因是因為請求重復時間段比較多疤剑,很多請求發(fā)生在前一個獲取了令牌之后的2ms內(nèi)滑绒,產(chǎn)生了reject胰舆。

結語

通過redis和lua,我實現(xiàn)了一個簡單的分布式限流器蹬挤。通過上述代碼,大家能看到一個大致的實現(xiàn)框架棘幸,并且通過測試代碼完成了驗證焰扳。如果各位看官有什么問題歡迎留言,希望能跟大家共同學習误续。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吨悍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蹋嵌,更是在濱河造成了極大的恐慌育瓜,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件栽烂,死亡現(xiàn)場離奇詭異躏仇,居然都是意外死亡,警方通過查閱死者的電腦和手機腺办,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門焰手,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人怀喉,你說我怎么就攤上這事书妻。” “怎么了躬拢?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵躲履,是天一觀的道長。 經(jīng)常有香客問我聊闯,道長工猜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任菱蔬,我火速辦了婚禮域慷,結果婚禮上,老公的妹妹穿的比我還像新娘汗销。我一直安慰自己犹褒,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布弛针。 她就那樣靜靜地躺著叠骑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪削茁。 梳的紋絲不亂的頭發(fā)上宙枷,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天掉房,我揣著相機與錄音,去河邊找鬼慰丛。 笑死卓囚,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的诅病。 我是一名探鬼主播哪亿,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼贤笆!你這毒婦竟也來了蝇棉?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤芥永,失蹤者是張志新(化名)和其女友劉穎篡殷,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體埋涧,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡板辽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了棘催。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片戳气。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖巧鸭,靈堂內(nèi)的尸體忽然破棺而出瓶您,到底是詐尸還是另有隱情,我是刑警寧澤纲仍,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布呀袱,位于F島的核電站,受9級特大地震影響郑叠,放射性物質(zhì)發(fā)生泄漏夜赵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一乡革、第九天 我趴在偏房一處隱蔽的房頂上張望寇僧。 院中可真熱鬧,春花似錦沸版、人聲如沸嘁傀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽细办。三九已至,卻和暖如春蕾殴,著一層夾襖步出監(jiān)牢的瞬間笑撞,已是汗流浹背岛啸。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留茴肥,地道東北人坚踩。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像瓤狐,于是被迫代替她去往敵國和親瞬铸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容