1. 先談?wù)凬ginx分流
對(duì),要考慮限流先得假設(shè)訪問(wèn)量達(dá)到了一定得程度,再高并發(fā)得前提下,請(qǐng)求過(guò)多很有可能導(dǎo)致某天服務(wù)器承受不了導(dǎo)致死機(jī)健爬。
在這個(gè)前提下,我相信你最先想到的一定是nginx么介,使用nginx分流讓所有的請(qǐng)求不要直接到達(dá)某一個(gè)服務(wù)器,當(dāng)并發(fā)量繼續(xù)上升的時(shí)候蜕衡,我提供更多的服務(wù)器視乎就能解決問(wèn)題了壤短,這想法看上去很對(duì),而且很多行業(yè)就是這么做的慨仿,比如下面的nginx配置文件久脯,很簡(jiǎn)單,我相信你一看就能懂
#user nobody;
worker_processes 1;
events {
worker_connections 1024;
}
http {
upstream enjoy{
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}
server {
listen 80;
location / {
proxy_pass http://enjoy;
}
}
}
在上面案例中我配置了個(gè)代理镰吆,當(dāng)高并發(fā)的請(qǐng)求過(guò)來(lái)的時(shí)候帘撰,會(huì)把請(qǐng)求分發(fā)給8080與8081兩個(gè)端口的服務(wù)器。
但現(xiàn)在我們假設(shè)一個(gè)極端的情況万皿,這個(gè)時(shí)候由于業(yè)務(wù)有個(gè)秒殺要求摧找,請(qǐng)求過(guò)大一下就讓這兩個(gè)服務(wù)器爆了,這個(gè)時(shí)候在我繼續(xù)增加服務(wù)器之前這整個(gè)秒殺業(yè)務(wù)幾乎都處在癱瘓狀態(tài)牢硅,而這突然的訪問(wèn)量是你始料未及的蹬耘,也就是說(shuō)你根本就沒(méi)法事先準(zhǔn)備好足夠的服務(wù)器來(lái)解決這種情況。
這個(gè)時(shí)候我相信你已經(jīng)看出了如果僅僅做分流依然會(huì)出現(xiàn)很?chē)?yán)重的問(wèn)題减余,怎么辦呢综苔?這個(gè)時(shí)候你就需要限流了。
2. 再說(shuō)下限流
限流依然是再高并發(fā)的前提下,如果某個(gè)服務(wù)器承受不了數(shù)目過(guò)多的請(qǐng)求量的一種限制機(jī)制如筛,不同的是分流是讓請(qǐng)求分發(fā)的其他服務(wù)器堡牡,而限流是達(dá)到某個(gè)閾值后直接不讓你訪問(wèn)了
如果想完成限流的功能其實(shí)是有一些解決方案的(算法),比如來(lái)說(shuō)杨刨,基于令牌桶的悴侵,程序計(jì)數(shù)器以及漏桶算法;
今天挑一個(gè)最簡(jiǎn)單的計(jì)數(shù)器方式來(lái)講講限流
3. 解決方案
計(jì)數(shù)器的解決方式是最簡(jiǎn)單最容易實(shí)現(xiàn)的一種解決方案拭嫁,假設(shè)有一個(gè)接口可免,要求1分鐘的訪問(wèn)量不能超過(guò)10次
這樣當(dāng)有任何請(qǐng)求過(guò)來(lái),我可以讓計(jì)數(shù)器+1做粤;如果這個(gè)計(jì)數(shù)器的值大于10浇借,而且和第一次的請(qǐng)求相比,時(shí)間間隔在1分鐘以?xún)?nèi)怕品,那么久能說(shuō)明該請(qǐng)求訪問(wèn)過(guò)多妇垢。
如果這個(gè)請(qǐng)求與第一次請(qǐng)求的訪問(wèn)時(shí)間之間的間隔超過(guò)了1分鐘,那么該計(jì)數(shù)器的值久還是限流范圍之內(nèi)肉康,接下來(lái)久只要重置計(jì)數(shù)器就好闯估;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoyCountLimit {
private int limtCount = 60;// 限制最大訪問(wèn)的容量
AtomicInteger atomicInteger = new AtomicInteger(0); // 每秒鐘 實(shí)際請(qǐng)求的數(shù)量
private long start = System.currentTimeMillis();// 獲取當(dāng)前系統(tǒng)時(shí)間
private int interval = 60*1000;// 間隔時(shí)間60秒
public boolean acquire() {
long newTime = System.currentTimeMillis();
if (newTime > (start + interval)) {
// 判斷是否是一個(gè)周期
start = newTime;
atomicInteger.set(0); // 清理為0
return true;
}
atomicInteger.incrementAndGet();// i++;
return atomicInteger.get() <= limtCount;
}
static EnjoyCountLimit limitService = new EnjoyCountLimit();
public static void main(String[] args) {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i < 100; i++) {
final int tempI = i;
newCachedThreadPool.execute(new Runnable() {
public void run() {
if (limitService.acquire()) {
System.out.println("你沒(méi)有被限流,可以正常訪問(wèn)邏輯 i:" + tempI);
} else {
System.out.println("你已經(jīng)被限流呢 i:" + tempI);
}
}
});
}
}
}
這個(gè)計(jì)數(shù)器的限流方式很簡(jiǎn)單吧,但這樣問(wèn)題嗎吼和?好好想想……
還是以60運(yùn)行訪問(wèn)10次請(qǐng)求為例涨薪,在第一次0-58秒之內(nèi),沒(méi)有訪問(wèn)請(qǐng)求炫乓,在59秒之內(nèi)突然來(lái)了10次請(qǐng)求刚夺,這個(gè)時(shí)候會(huì)做什么,由于已經(jīng)到了1分鐘計(jì)數(shù)器會(huì)重置末捣。
這個(gè)時(shí)候第二次的1秒內(nèi)(1分0秒)又了10請(qǐng)求侠姑,這個(gè)時(shí)候是不是就在2秒之內(nèi)有20個(gè)請(qǐng)求被放行了呢?(59秒箩做,1分0秒)莽红,如果某個(gè)服務(wù)器的訪問(wèn)量只能是10次請(qǐng)求,那這種限流方式已經(jīng)導(dǎo)致服務(wù)器掛了邦邦;
4. 滑動(dòng)窗口計(jì)數(shù)器
前面已經(jīng)知道簡(jiǎn)單的計(jì)數(shù)器的實(shí)現(xiàn)方式安吁,也知道他會(huì)出現(xiàn)的一些問(wèn)題,雖然這些問(wèn)題舉得有些極端圃酵,但還是有更好得解決方案柳畔,這方案就是使用滑動(dòng)窗口計(jì)數(shù)器
滑動(dòng)窗口計(jì)數(shù)器得原理是在沒(méi)錯(cuò)請(qǐng)求過(guò)來(lái)得時(shí)候,先判斷前面N個(gè)單位內(nèi)得總訪問(wèn)量是否操過(guò)得閾值郭赐,并且在當(dāng)前得時(shí)間單位得請(qǐng)求數(shù)上+1
舉例來(lái)說(shuō)薪韩,要求1分鐘的訪問(wèn)量不能超過(guò)10次
可以把1分鐘看成是6個(gè)10秒鐘的時(shí)間确沸,0-9秒的訪問(wèn)數(shù)記錄到第一個(gè)格子,10-19秒的訪問(wèn)數(shù)記錄數(shù)記錄到第二個(gè)格子以此內(nèi)推俘陷,每次統(tǒng)計(jì)將6個(gè)格子里面的數(shù)據(jù)求和罗捎,如果超過(guò)了10次就不允許訪問(wèn)。
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoySlidingWindow {
private AtomicInteger[] timeSlices;
/* 隊(duì)列的總長(zhǎng)度 */
private final int timeSliceSize;
/* 每個(gè)時(shí)間片的時(shí)長(zhǎng) */
private final long timeMillisPerSlice;
/* 窗口長(zhǎng)度 */
private final int windowSize;
/* 當(dāng)前所使用的時(shí)間片位置 */
private AtomicInteger cursor = new AtomicInteger(0);
public static enum Time {
MILLISECONDS(1),
SECONDS(1000),
MINUTES(SECONDS.getMillis() * 60),
HOURS(MINUTES.getMillis() * 60),
DAYS(HOURS.getMillis() * 24),
WEEKS(DAYS.getMillis() * 7);
private long millis;
Time(long millis) {
this.millis = millis;
}
public long getMillis() {
return millis;
}
}
public EnjoySlidingWindow(int windowSize, Time timeSlice) {
this.timeMillisPerSlice = timeSlice.millis;
this.windowSize = windowSize;
// 保證存儲(chǔ)在至少兩個(gè)window
this.timeSliceSize = windowSize * 2 + 1;
init();
}
/**
* 初始化
*/
private void init() {
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}
/**
* <p>對(duì)時(shí)間片計(jì)數(shù)+1拉盾,并返回窗口中所有的計(jì)數(shù)總和
* <p>該方法只要調(diào)用就一定會(huì)對(duì)某個(gè)時(shí)間片進(jìn)行+1
* @return
*/
public int incrementAndSum() {
int index = locationIndex();
int sum = 0;
// cursor等于index桨菜,返回true
// cursor不等于index,返回false捉偏,并會(huì)將cursor設(shè)置為index
int oldCursor = cursor.getAndSet(index);
if (oldCursor == index) {
// 在當(dāng)前時(shí)間片里繼續(xù)+1
sum += timeSlices[index].incrementAndGet();
} else {
//輪到新的時(shí)間片倒得,置0,可能有其它線程也置了該值夭禽,容許
timeSlices[index].set(0);
// 清零霞掺,訪問(wèn)量不大時(shí)會(huì)有時(shí)間片跳躍的情況
clearBetween(oldCursor, index);
sum += timeSlices[index].incrementAndGet();
}
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
return sum;
}
/**
* 判斷是否允許進(jìn)行訪問(wèn),未超過(guò)閾值的話(huà)才會(huì)對(duì)某個(gè)時(shí)間片+1
* @param threshold
* @return
*/
public boolean allow(int threshold) {
int index = locationIndex();
int sum = 0;
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
timeSlices[index].set(0);
clearBetween(oldCursor, index);
}
for (int i = 0; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
// 閾值判斷
if (sum < threshold) {
// 未超過(guò)閾值才+1
timeSlices[index].incrementAndGet();
return true;
}
return false;
}
/**
* <p>將fromIndex~toIndex之間的時(shí)間片計(jì)數(shù)都清零
* <p>極端情況下讹躯,當(dāng)循環(huán)隊(duì)列已經(jīng)走了超過(guò)1個(gè)timeSliceSize以上菩彬,這里的清零并不能如期望的進(jìn)行
* @param fromIndex 不包含
* @param toIndex 不包含
*/
private void clearBetween(int fromIndex, int toIndex) {
for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
timeSlices[index].set(0);
}
}
public static void main(String[] args) {
EnjoySlidingWindow window = new EnjoySlidingWindow(5, Time.MILLISECONDS);
for (int i = 0; i < 10; i++) {
System.out.println(window.allow(7));
}
}
}