流量整形淺析

漏斗算法

它有點(diǎn)像我們生活中用到的漏斗席怪,液體倒進(jìn)去以后畜普,總是從下端的小口中以固定速率流出奇昙,漏斗算法也類(lèi)似护侮,不管突然流量有多大,漏斗都保證了流量的常速率輸出储耐,也可以類(lèi)比于調(diào)用量概行,比如,不管服務(wù)調(diào)用多么不穩(wěn)定弧岳,我們只固定進(jìn)行服務(wù)輸出凳忙,比如每10毫秒接受一次服務(wù)調(diào)用。既然是一個(gè)桶禽炬,那就肯定有容量涧卵,由于調(diào)用的消費(fèi)速率已經(jīng)固定,那么當(dāng)桶的容量堆滿了腹尖,則只能丟棄了柳恐,漏斗算法如下圖:

image.png

缺點(diǎn)

漏斗算法其實(shí)是悲觀的,因?yàn)樗鼑?yán)格限制了系統(tǒng)的吞吐量热幔,從某種角度上來(lái)說(shuō)乐设,它的效果和并發(fā)量限流很類(lèi)似。漏斗算法也可以用于大多數(shù)場(chǎng)景绎巨,但由于它對(duì)服務(wù)吞吐量有著嚴(yán)格固定的限制近尚,如果在某個(gè)大的服務(wù)網(wǎng)絡(luò)中只對(duì)某些服務(wù)進(jìn)行漏斗算法限流,這些服務(wù)可能會(huì)成為瓶頸场勤。其實(shí)對(duì)于可擴(kuò)展的大型服務(wù)網(wǎng)絡(luò)戈锻,上游的服務(wù)壓力可以經(jīng)過(guò)多重下游服務(wù)進(jìn)行擴(kuò)散歼跟,過(guò)多的漏斗限流似乎意義不大。

實(shí)現(xiàn):

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * <p>
 * 對(duì)請(qǐng)求做限速流控
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 17/3/3
 */
public class LimitRequestByTime {
    long limit = 1000;

    Map<Long, AtomicLong> map = Collections.synchronizedMap(new LinkedHashMap<>(8));

    public boolean limitReq() {
        // 統(tǒng)計(jì)當(dāng)前秒數(shù)
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        map.putIfAbsent(currentTimeMillis, new AtomicLong(0));
        // 獲取秒速流控
        AtomicLong currentAtomicLong = map.get(currentTimeMillis);
        return !(currentAtomicLong.incrementAndGet() >= limit);
    }
    public static void main(String[] args) {
        LimitRequestByTime limitRequestByTime = new LimitRequestByTime();
        // 統(tǒng)計(jì)所有的請(qǐng)求次數(shù)
        Map<Long, AtomicLong> totalMap = new ConcurrentHashMap<>();

        Map<Long, AtomicLong> successTotalMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 100000000; i++) {
            // 統(tǒng)計(jì)當(dāng)前這一秒的請(qǐng)求書(shū)
            long currentTimeMillis = System.currentTimeMillis() / 1000;
            totalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
            // 自增加1
            totalMap.get(currentTimeMillis).incrementAndGet();

            successTotalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
            if (limitRequestByTime.limitReq()) {
                successTotalMap.get(currentTimeMillis).incrementAndGet();
            }
        }

        for (Map.Entry<Long, AtomicLong> total : totalMap.entrySet()) {
            Long totalKey = total.getKey();
            System.out.println(String
                .format("在%d這一秒一共發(fā)送了%d次請(qǐng)求格遭,通過(guò)的請(qǐng)求數(shù)量為%d", totalKey, totalMap.get(totalKey).get(),
                    successTotalMap.get(totalKey).get()));
        }
    }
}

令牌桶算法

令牌桶算法從某種程度上來(lái)說(shuō)是漏桶算法的一種改進(jìn)哈街,漏桶算法能夠強(qiáng)行限制請(qǐng)求調(diào)用的速率,而令牌桶算法能夠在限制調(diào)用的平均速率的同時(shí)還允許某種程度的突發(fā)調(diào)用拒迅。在令牌桶算法中骚秦,桶中會(huì)有一定數(shù)量的令牌,每次請(qǐng)求調(diào)用需要去桶中拿取一個(gè)令牌璧微,拿到令牌后才有資格執(zhí)行請(qǐng)求調(diào)用骤竹,否則只能等待能拿到足夠的令牌數(shù),大家看到這里往毡,可能就認(rèn)為是不是可以把令牌比喻成信號(hào)量,那和前面說(shuō)的并發(fā)量限流不是沒(méi)什么區(qū)別嘛靶溜?其實(shí)不然开瞭,令牌桶算法的精髓就在于“拿令牌”和“放令牌”的方式,這和單純的并發(fā)量限流有明顯區(qū)別罩息,采用并發(fā)量限流時(shí)嗤详,當(dāng)一個(gè)調(diào)用到來(lái)時(shí),會(huì)先獲取一個(gè)信號(hào)量瓷炮,當(dāng)調(diào)用結(jié)束時(shí)葱色,會(huì)釋放一個(gè)信號(hào)量,但令牌桶算法不同娘香,因?yàn)槊看握?qǐng)求獲取的令牌數(shù)不是固定的苍狰,比如當(dāng)桶中的令牌數(shù)還比較多時(shí),每次調(diào)用只需要獲取一個(gè)令牌烘绽,隨著桶中的令牌數(shù)逐漸減少淋昭,當(dāng)?shù)搅钆频氖褂寐剩词褂弥械牧钆茢?shù)/令牌總數(shù))達(dá)某個(gè)比例,可能一次請(qǐng)求需要獲取兩個(gè)令牌安接,當(dāng)令牌使用率到了一個(gè)更高的比例翔忽,可能一次請(qǐng)求調(diào)用需要獲取更多的令牌數(shù)。同時(shí)盏檐,當(dāng)調(diào)用使用完令牌后歇式,有兩種令牌生成方法,第一種就是直接往桶中放回使用的令牌數(shù)胡野,第二種就是不做任何操作材失,有另一個(gè)額外的令牌生成步驟來(lái)將令牌勻速放回桶中。如下圖:

image1.png

代碼實(shí)現(xiàn)


import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.base.Preconditions;
/**
 * <p>
 *     令牌桶算法
 * </p>
 *
 * @author wangguangdong
 * @version 1.0
 * @Date 17/3/23
 */
public class TokenBucket  {

    // 默認(rèn)桶大小個(gè)數(shù) 即最大瞬間流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

    // 一個(gè)桶的單位是1字節(jié)
    private int everyTokenSize = 1;

    // 瞬間最大流量
    private int maxFlowRate;

    // 平均流量
    private int avgFlowRate;

    // 隊(duì)列來(lái)緩存桶數(shù)量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {

        // 若是桶已經(jīng)滿了硫豆,就不再家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(A_CHAR);
        }
    }

    public TokenBucket build() {

        start();
        return this;
    }

    /**
     * 獲取足夠的令牌個(gè)數(shù)
     *
     * @return
     */
    public boolean getTokens(byte[] dataSize) {

        Preconditions.checkNotNull(dataSize);
        Preconditions.checkArgument(isStart, "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內(nèi)容大小對(duì)應(yīng)的桶個(gè)數(shù)

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數(shù)量
            if (!result) {
                return false;
            }

            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }

            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }


    public void start() {

        // 初始化桶隊(duì)列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }

        // 初始化令牌生產(chǎn)者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
        isStart = true;

    }


    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }


    public boolean isStarted() {
        return isStart;
    }

    class TokenProducer implements Runnable {

        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {

        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }

        return sbuilder.toString();

    }

    public static void main(String[] args) throws IOException, InterruptedException {

        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();

        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
        String data = "xxxx";// 四個(gè)字節(jié)
        for (int i = 1; i <= 1000; i++) {

            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }

            bufferedWriter.newLine();
            bufferedWriter.flush();
        }

        bufferedWriter.close();
    }

}

漏斗算法和令牌桶的異同點(diǎn)

漏斗算法會(huì)限制平均的qps豺憔,對(duì)每個(gè)時(shí)間段的流控都是一樣的额获,如果突然一瞬間的大流量進(jìn)來(lái),那么有可能會(huì)有大量請(qǐng)求被攔截住恭应,
令牌桶算法的話除了能夠限制數(shù)據(jù)的平均傳輸速率外抄邀,還允許一定時(shí)間內(nèi)的大流量涌入,相當(dāng)于漏斗算法的升級(jí)版本昼榛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末境肾,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子胆屿,更是在濱河造成了極大的恐慌奥喻,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件非迹,死亡現(xiàn)場(chǎng)離奇詭異环鲤,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)憎兽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)冷离,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人纯命,你說(shuō)我怎么就攤上這事西剥。” “怎么了亿汞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵瞭空,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我疗我,道長(zhǎng)咆畏,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任吴裤,我火速辦了婚禮鳖眼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嚼摩。我一直安慰自己钦讳,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布枕面。 她就那樣靜靜地躺著愿卒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪潮秘。 梳的紋絲不亂的頭發(fā)上琼开,一...
    開(kāi)封第一講書(shū)人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音枕荞,去河邊找鬼柜候。 笑死搞动,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的渣刷。 我是一名探鬼主播鹦肿,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼辅柴!你這毒婦竟也來(lái)了箩溃?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤碌嘀,失蹤者是張志新(化名)和其女友劉穎涣旨,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體股冗,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡霹陡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了止状。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烹棉。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖导俘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情剔蹋,我是刑警寧澤旅薄,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站泣崩,受9級(jí)特大地震影響少梁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜矫付,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一凯沪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧买优,春花似錦妨马、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至脂崔,卻和暖如春滤淳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背砌左。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工脖咐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留铺敌,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓屁擅,卻偏偏與公主長(zhǎng)得像偿凭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子煤蹭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 聊聊高并發(fā)系統(tǒng)限流特技-1來(lái)自開(kāi)濤的博客 在開(kāi)發(fā)高并發(fā)系統(tǒng)時(shí)有三把利器用來(lái)保護(hù)系統(tǒng):緩存笔喉、降級(jí)和限流。緩存的目的是...
    meng_philip123閱讀 6,619評(píng)論 1 20
  • 摘要:在開(kāi)發(fā)高并發(fā)系統(tǒng)時(shí)有三把利器用來(lái)保護(hù)系統(tǒng):緩存硝皂、降級(jí)和限流常挚。而有些場(chǎng)景并不能用緩存和降級(jí)來(lái)解決,因此需有一種...
    落羽成霜丶閱讀 2,147評(píng)論 0 18
  • 最近一直都在研究壓力測(cè)試客戶端的問(wèn)題稽物,如果突破客戶端壓力測(cè)試線程奄毡,端口等問(wèn)題,如果服務(wù)器端處理網(wǎng)絡(luò)請(qǐng)求處理不過(guò)來(lái)贝或,...
    望月成三人閱讀 8,631評(píng)論 1 25
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理吼过,服務(wù)發(fā)現(xiàn),斷路器咪奖,智...
    卡卡羅2017閱讀 134,600評(píng)論 18 139
  • 在百度貼吧看到個(gè)帖子:一對(duì)約40歲的夫妻闲昭,女兒已16歲,他們想要二孩靡挥,但是女兒強(qiáng)烈反對(duì)序矩,以離家出走,不讀書(shū)為要挾跋破,...
    散月的月閱讀 274評(píng)論 0 1