一:目標(biāo):實(shí)現(xiàn)任意滑動(dòng)計(jì)數(shù)器的抽象,保證高性能
此抽象可以提供給限流 熔斷 防刷等統(tǒng)計(jì)模塊使用,進(jìn)行任意時(shí)間的統(tǒng)計(jì)與計(jì)數(shù)氧苍,高性能,線程安全是基本要求
二:算法如下
- 數(shù)組模擬時(shí)間輪
- 當(dāng)前使用的數(shù)據(jù)為當(dāng)前時(shí)間對(duì)數(shù)組大小進(jìn)行取模確定.
累加邏輯為:當(dāng)前數(shù)組元素往前n個(gè)大小進(jìn)行累加 - 清理邏輯分兩種實(shí)現(xiàn)
3.1 數(shù)組大小=滑動(dòng)窗口大小+未來(lái)n個(gè)時(shí)間單位的大小,定時(shí)任務(wù)提前清理未來(lái)n個(gè)時(shí)間單位大小
3.2 使用時(shí)清理,不使用buffer.需要一個(gè)紀(jì)錄最后一次使用位置的指針,當(dāng)使用時(shí)清理最后一次位置到當(dāng)前位置的數(shù)據(jù)包括當(dāng)前位置,考慮并發(fā)調(diào)用的 情況,此清理過(guò)程只能一個(gè)線程進(jìn)行清理,
清理后把最后一次的指針指向當(dāng)前位置,釋放鎖,其他線程才能進(jìn)入再次執(zhí)行次邏輯,通常其他線程進(jìn)行后發(fā)現(xiàn)當(dāng)前指針跟自己持有的指針相同, 不清理跳過(guò)备图。統(tǒng)計(jì)邏輯也類似
3.3 3.1邏輯較為簡(jiǎn)單,3.2比較復(fù)雜,但是少了提前預(yù)留的n個(gè)大小
可以用次抽象實(shí)現(xiàn)任意單位時(shí)間的n秒滑動(dòng)計(jì)數(shù)器
三:實(shí)現(xiàn)
提前清理
public class TimeRollingScheduleCleanCounter {
private static final ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private static final ConcurrentLinkedQueue<TimeRollingScheduleCleanCounter> queue = new ConcurrentLinkedQueue<>();
private static final int nextBufferSize=10;
public static void register(TimeRollingScheduleCleanCounter timeRollingCounterScheduleClean) {
queue.add(timeRollingCounterScheduleClean);
}
static {
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (TimeRollingScheduleCleanCounter clean : queue) {
int index = currentIndex(clean.getBucketsSize());
for(int i=0;i<nextBufferSize;i++){
clean.buckets[(index+i+clean.getBucketsSize())%clean.getBucketsSize()].set(0);
}
}
}
}, 1, 1, TimeUnit.SECONDS);
}
/**
* 滑動(dòng)計(jì)數(shù)器對(duì)應(yīng)的bucket
*/
private AtomicLong[] buckets;
private int bucketsSize;
public TimeRollingScheduleCleanCounter(int size) {
bucketsSize=size+nextBufferSize;
buckets = new AtomicLong[bucketsSize];
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new AtomicLong(0);
}
register(this);
}
public void add(Long count) {
buckets[currentIndex(bucketsSize)].getAndAdd(count);
}
public long accumulative(int n) {
if (n > bucketsSize-nextBufferSize) {
throw new IllegalArgumentException("params is too large");
}
int start = currentIndex(bucketsSize);
long total = 0;
for (int i = 0; i < n; i++) {
total += buckets[(start - i+bucketsSize) % bucketsSize].get();
}
return total;
}
public int getBucketsSize() {
return bucketsSize;
}
private static int currentIndex(int bucketsSize) {
long currentTime = System.currentTimeMillis() / 1000;
return (int) (currentTime % bucketsSize);
}
public long increment() {
return buckets[currentIndex(bucketsSize)].incrementAndGet();
}
}
使用時(shí)清理
public class TimeRollingCounter {
private static Logger logger = LoggerFactory.getLogger(TimeRollingCounter.class);
/**
* 滑動(dòng)計(jì)數(shù)器對(duì)應(yīng)的bucket
*/
private AtomicInteger[] buckets;
//private LongAdder[] longAdders;
/**
* bucket對(duì)應(yīng)的時(shí)間
*/
private AtomicLong currentBucketTime;
/**
* 桶的默認(rèn)大小
*/
private int bucketSize =1;
private AtomicBoolean windowRolling = new AtomicBoolean(false);
private static final long MILLIS_IN_ONE_SECOND=1000L;//1秒等于1000毫秒
/**
* 給當(dāng)前時(shí)間的計(jì)數(shù)器增加1
* 如果當(dāng)前時(shí)間和桶的當(dāng)前時(shí)間差大于BUCKET_SIZE,重置所有的bucket
* @return
*/
public int increment(){
final long targetBucketTime = getNowSecond();//秒轉(zhuǎn)換成毫秒
final int targetBucketIndex = this.calIndex(targetBucketTime);
//窗口滑動(dòng),重置計(jì)數(shù)器
this.windowRolling(targetBucketTime,this.currentBucketTime.get());
//增加當(dāng)前計(jì)數(shù)
buckets[targetBucketIndex].incrementAndGet();
//返回當(dāng)前計(jì)數(shù)值
return buckets[targetBucketIndex].intValue();
}
/**
* 獲取之前N秒的計(jì)數(shù)之和
* @param n
* @return
*/
public int accumulative(int n){
if(n> bucketSize){
n= bucketSize;
}
final long targetBucketTime = getNowSecond();
final int targetBucketIndex = this.calIndex(targetBucketTime);
//窗口滑動(dòng),重置計(jì)數(shù)器
this.windowRolling(targetBucketTime,this.currentBucketTime.get());
AtomicInteger beforeSum = new AtomicInteger(0);
for(int i=0;i<n;i++){
beforeSum.addAndGet(buckets[(targetBucketIndex+ bucketSize -i)% bucketSize].intValue());
}
return beforeSum.intValue();
}
/**
* 初始化一個(gè)當(dāng)前時(shí)間的全0計(jì)數(shù)器,并可以設(shè)置bucket大小
*/
public TimeRollingCounter(int size, TimeUnit timeUnit){
final long currentTime = getNowSecond();
bucketSize =size;
buckets=new AtomicInteger[bucketSize];
for(int i = 0; i< bucketSize; i++){
buckets[i]=new AtomicInteger(0);
}
currentBucketTime=new AtomicLong(currentTime);
}
/**
* 重置計(jì)數(shù)器
*/
public void reset(){
final long currentTime = getNowSecond();;
for(AtomicInteger counter:buckets){
counter.set(0);
}
currentBucketTime.set(currentTime);
}
/**
* 窗口滑動(dòng),使用自旋滑動(dòng)
* @param targetBucketTime
* @param currentBucketTime
*/
private void windowRolling(final long targetBucketTime,Long currentBucketTime){
if(targetBucketTime > currentBucketTime){
while(!windowRolling.compareAndSet(false,true)){
}
try {
//重新獲取值
currentBucketTime = this.currentBucketTime.get();
//計(jì)數(shù)器已經(jīng)被其它線程滑動(dòng)了,不再滑動(dòng),防止誤操作數(shù)據(jù)
if (currentBucketTime >= targetBucketTime) {
return ;
}
final int sep = this.calSep(currentBucketTime, targetBucketTime);
final int currentIndex = this.calIndex(currentBucketTime);
//重置計(jì)數(shù)器
this.resetBucket(currentIndex, sep);
//重新設(shè)置當(dāng)前時(shí)間
this.currentBucketTime.set(targetBucketTime);
} catch(Exception e){
logger.error("Window rolling error",e);
}finally {
windowRolling.set(false);
}
}
}
/**
* 時(shí)間差大于BUCKET_SIZE,重置bucket
* 否則重置兩個(gè)窗口區(qū)間的臟數(shù)據(jù),注意這是一個(gè)循環(huán)數(shù)組
* @param currentBucketIndex
* @param sep
*/
private void resetBucket(final int currentBucketIndex,int sep){
//時(shí)間差大于buck_size,重置所有的bucket
if(sep>= bucketSize){
for(AtomicInteger counter:buckets){
counter.set(0);
}
}else {
//循環(huán)隊(duì)列
if(sep<0){
sep= bucketSize +sep;
}
//時(shí)間差在1到sep+1秒之間,重置之間的區(qū)間,包括新使用的本身
for(int i=1;i<sep+1;i++){
buckets[(currentBucketIndex+i)% bucketSize].set(0);
}
}
}
/**
* 計(jì)算指定時(shí)間對(duì)應(yīng)的bucket索引
* @param currentTime
* @return
*/
private int calIndex(Long currentTime){
int currIndex = Long.valueOf(currentTime% bucketSize).intValue();
return currIndex;
}
/**
* 計(jì)算兩個(gè)指定時(shí)間差,以秒為單位,可能為負(fù)數(shù)
* @param beforeTime
* @param currentTime
* @return
*/
private int calSep(Long beforeTime,Long currentTime){
return Long.valueOf(currentTime-beforeTime).intValue();
}
public int getBucketSize() {
return bucketSize;
}
/**
* 獲取當(dāng)前的秒數(shù)
* @return
*/
public static long getNowSecond(){
return System.currentTimeMillis()/MILLIS_IN_ONE_SECOND;
}
}
三:選擇哪種
1.理解成本
代碼更多的時(shí)間是在做維護(hù)震桶,上面復(fù)雜的寫法很多時(shí)候在做解釋,review更是耗費(fèi)巨大時(shí)間再沧,修改一行代碼也設(shè)計(jì)成本很高尼夺,以前的實(shí)現(xiàn)也有bug,參考我之前的分析
[janus本地防刷代碼問(wèn)題分析]
2.gc成本
創(chuàng)建是注冊(cè)到隊(duì)列上炒瘸,無(wú)法回收掉淤堵,因?yàn)闊o(wú)法判斷何時(shí)移除。當(dāng)然可以換一種實(shí)現(xiàn)顷扩,提供一個(gè)可以拿到當(dāng)前使用所有計(jì)數(shù)器的接口拐邪。
我們目前場(chǎng)景用完后都會(huì)放緩存隊(duì)列,暫時(shí)不存在該問(wèn)題
3.清理?yè)p耗
定時(shí)器一秒清理未來(lái)buffer 數(shù)量隘截,但我們場(chǎng)景存在百萬(wàn)級(jí)別的數(shù)量扎阶,是否能快速清理掉,需要測(cè)試1s的清理速度婶芭,理論上兩次清理完成間隔相差十秒就滿足條件
根據(jù)我們場(chǎng)景东臀,大概會(huì)有將近百萬(wàn)的計(jì)數(shù)器(統(tǒng)計(jì)ip session的防刷)本地測(cè)試能在1s內(nèi)清理掉,但cpu卻一致高負(fù)載犀农,此方案在百萬(wàn)級(jí)別的情況下會(huì)損耗不少cpu資源
數(shù)量增大到5百萬(wàn)惰赋,有其他使用cpu時(shí),清理時(shí)間不穩(wěn)定呵哨,但在預(yù)期內(nèi)可以清理掉赁濒。
測(cè)試過(guò)程遇到卡住,為old區(qū)滿掉孟害,頻繁gc但無(wú)空間所致拒炎。
綜上,3的cpu消耗不能容忍挨务,只能選擇較為復(fù)雜的方案實(shí)現(xiàn)