漏斗算法
它有點(diǎn)像我們生活中用到的漏斗席怪,液體倒進(jìn)去以后畜普,總是從下端的小口中以固定速率流出奇昙,漏斗算法也類(lèi)似护侮,不管突然流量有多大,漏斗都保證了流量的常速率輸出储耐,也可以類(lèi)比于調(diào)用量概行,比如,不管服務(wù)調(diào)用多么不穩(wěn)定弧岳,我們只固定進(jìn)行服務(wù)輸出凳忙,比如每10毫秒接受一次服務(wù)調(diào)用。既然是一個(gè)桶禽炬,那就肯定有容量涧卵,由于調(diào)用的消費(fèi)速率已經(jīng)固定,那么當(dāng)桶的容量堆滿了腹尖,則只能丟棄了柳恐,漏斗算法如下圖:
缺點(diǎn)
漏斗算法其實(shí)是悲觀的,因?yàn)樗鼑?yán)格限制了系統(tǒng)的吞吐量热幔,從某種角度上來(lái)說(shuō)乐设,它的效果和并發(fā)量限流很類(lèi)似。漏斗算法也可以用于大多數(shù)場(chǎng)景绎巨,但由于它對(duì)服務(wù)吞吐量有著嚴(yán)格固定的限制近尚,如果在某個(gè)大的服務(wù)網(wǎng)絡(luò)中只對(duì)某些服務(wù)進(jìn)行漏斗算法限流,這些服務(wù)可能會(huì)成為瓶頸场勤。其實(shí)對(duì)于可擴(kuò)展的大型服務(wù)網(wǎng)絡(luò)戈锻,上游的服務(wù)壓力可以經(jīng)過(guò)多重下游服務(wù)進(jìn)行擴(kuò)散歼跟,過(guò)多的漏斗限流似乎意義不大。
實(shí)現(xiàn):
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* 對(duì)請(qǐng)求做限速流控
* </p>
*
* @author wangguangdong
* @version 1.0
* @Date 17/3/3
*/
public class LimitRequestByTime {
long limit = 1000;
Map<Long, AtomicLong> map = Collections.synchronizedMap(new LinkedHashMap<>(8));
public boolean limitReq() {
// 統(tǒng)計(jì)當(dāng)前秒數(shù)
long currentTimeMillis = System.currentTimeMillis() / 1000;
map.putIfAbsent(currentTimeMillis, new AtomicLong(0));
// 獲取秒速流控
AtomicLong currentAtomicLong = map.get(currentTimeMillis);
return !(currentAtomicLong.incrementAndGet() >= limit);
}
public static void main(String[] args) {
LimitRequestByTime limitRequestByTime = new LimitRequestByTime();
// 統(tǒng)計(jì)所有的請(qǐng)求次數(shù)
Map<Long, AtomicLong> totalMap = new ConcurrentHashMap<>();
Map<Long, AtomicLong> successTotalMap = new ConcurrentHashMap<>();
for (int i = 0; i < 100000000; i++) {
// 統(tǒng)計(jì)當(dāng)前這一秒的請(qǐng)求書(shū)
long currentTimeMillis = System.currentTimeMillis() / 1000;
totalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
// 自增加1
totalMap.get(currentTimeMillis).incrementAndGet();
successTotalMap.putIfAbsent(currentTimeMillis, new AtomicLong(0));
if (limitRequestByTime.limitReq()) {
successTotalMap.get(currentTimeMillis).incrementAndGet();
}
}
for (Map.Entry<Long, AtomicLong> total : totalMap.entrySet()) {
Long totalKey = total.getKey();
System.out.println(String
.format("在%d這一秒一共發(fā)送了%d次請(qǐng)求格遭,通過(guò)的請(qǐng)求數(shù)量為%d", totalKey, totalMap.get(totalKey).get(),
successTotalMap.get(totalKey).get()));
}
}
}
令牌桶算法
令牌桶算法從某種程度上來(lái)說(shuō)是漏桶算法的一種改進(jìn)哈街,漏桶算法能夠強(qiáng)行限制請(qǐng)求調(diào)用的速率,而令牌桶算法能夠在限制調(diào)用的平均速率的同時(shí)還允許某種程度的突發(fā)調(diào)用拒迅。在令牌桶算法中骚秦,桶中會(huì)有一定數(shù)量的令牌,每次請(qǐng)求調(diào)用需要去桶中拿取一個(gè)令牌璧微,拿到令牌后才有資格執(zhí)行請(qǐng)求調(diào)用骤竹,否則只能等待能拿到足夠的令牌數(shù),大家看到這里往毡,可能就認(rèn)為是不是可以把令牌比喻成信號(hào)量,那和前面說(shuō)的并發(fā)量限流不是沒(méi)什么區(qū)別嘛靶溜?其實(shí)不然开瞭,令牌桶算法的精髓就在于“拿令牌”和“放令牌”的方式,這和單純的并發(fā)量限流有明顯區(qū)別罩息,采用并發(fā)量限流時(shí)嗤详,當(dāng)一個(gè)調(diào)用到來(lái)時(shí),會(huì)先獲取一個(gè)信號(hào)量瓷炮,當(dāng)調(diào)用結(jié)束時(shí)葱色,會(huì)釋放一個(gè)信號(hào)量,但令牌桶算法不同娘香,因?yàn)槊看握?qǐng)求獲取的令牌數(shù)不是固定的苍狰,比如當(dāng)桶中的令牌數(shù)還比較多時(shí),每次調(diào)用只需要獲取一個(gè)令牌烘绽,隨著桶中的令牌數(shù)逐漸減少淋昭,當(dāng)?shù)搅钆频氖褂寐剩词褂弥械牧钆茢?shù)/令牌總數(shù))達(dá)某個(gè)比例,可能一次請(qǐng)求需要獲取兩個(gè)令牌安接,當(dāng)令牌使用率到了一個(gè)更高的比例翔忽,可能一次請(qǐng)求調(diào)用需要獲取更多的令牌數(shù)。同時(shí)盏檐,當(dāng)調(diào)用使用完令牌后歇式,有兩種令牌生成方法,第一種就是直接往桶中放回使用的令牌數(shù)胡野,第二種就是不做任何操作材失,有另一個(gè)額外的令牌生成步驟來(lái)將令牌勻速放回桶中。如下圖:
代碼實(shí)現(xiàn)
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
/**
* <p>
* 令牌桶算法
* </p>
*
* @author wangguangdong
* @version 1.0
* @Date 17/3/23
*/
public class TokenBucket {
// 默認(rèn)桶大小個(gè)數(shù) 即最大瞬間流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一個(gè)桶的單位是1字節(jié)
private int everyTokenSize = 1;
// 瞬間最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 隊(duì)列來(lái)緩存桶數(shù)量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已經(jīng)滿了硫豆,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(A_CHAR);
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 獲取足夠的令牌個(gè)數(shù)
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內(nèi)容大小對(duì)應(yīng)的桶個(gè)數(shù)
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數(shù)量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
public void start() {
// 初始化桶隊(duì)列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生產(chǎn)者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四個(gè)字節(jié)
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
漏斗算法和令牌桶的異同點(diǎn)
漏斗算法會(huì)限制平均的qps豺憔,對(duì)每個(gè)時(shí)間段的流控都是一樣的额获,如果突然一瞬間的大流量進(jìn)來(lái),那么有可能會(huì)有大量請(qǐng)求被攔截住恭应,
令牌桶算法的話除了能夠限制數(shù)據(jù)的平均傳輸速率外抄邀,還允許一定時(shí)間內(nèi)的大流量涌入,相當(dāng)于漏斗算法的升級(jí)版本昼榛。