前言
最近在看sentinel的一些資料和代碼镰官,github請參見這個網(wǎng)址巫俺,看過代碼之后感覺sentinel在限流熔斷上相較于Hystrix可能會更好一點采缚,一方面是他沒有用多余的線程池锄贼,通過滾動數(shù)組來記錄了當(dāng)前流量來完成限流邏輯兵琳,比Hystrix完全通過并發(fā)線程數(shù)來限流功能更好一點勃蜘,另外一方面是他沒有用RxJava來完成自己的邏輯硕噩,從代碼閱讀上門檻低了不少,并且通過類似于責(zé)任鏈模式形成了一個slot的chain缭贡,即提升了代碼的可讀性也增強了可擴展性炉擅。示意圖如下(轉(zhuǎn)自github):
本文作為Sentinel學(xué)習(xí)系列第一篇文章需要分析的代碼針對的是流量統(tǒng)計相關(guān)辉懒,對應(yīng)于上圖是存在于StatisticSlot中。
流量統(tǒng)計
本來第一篇文章應(yīng)該從TreeNode這個Slot開始谍失,但是確實我目前現(xiàn)在對于Sentinel中Context和Node的具體關(guān)系沒有特別理清眶俩,所以就先直接跳過直接到了流量統(tǒng)計這一塊來了。對于這一塊需要知道的背景知識的話可能就是一個Node代表的就是請求的一個資源快鱼,在StatisticSlot中針對某一個Node通過滾動數(shù)組算法來計算他的流量颠印。這也跟前言中的圖一致。
代碼結(jié)構(gòu)
首先得稱贊一句阿里的代碼組織非常好抹竹,這是通過github clone下來的項目截圖线罕,紅框中就是我們需要關(guān)注的流量統(tǒng)計相關(guān)代碼的所在了:
StatisticSlot 入口
StatisticSlot 代碼如下:
/**
* <p>
* A processor slot that dedicates to real time statistics.
* When entering this slot, we need to separately count the following
* information:
* <ul>
* <li>{@link ClusterNode}: total statistics of a cluster node of the resource id </li>
* <li> origin node: statistics of a cluster node from different callers/origins.</li>
* <li> {@link DefaultNode}: statistics for specific resource name in the specific context.
* <li> Finally, the sum statistics of all entrances.</li>
* </ul>
* </p>
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
/**
* StatisticSlot在責(zé)任鏈中的調(diào)用入口
* 在demo中調(diào)用SphO.entry進入獲取token邏輯
* 通過前面的Slot后到達這里
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param node resource node
* @param count tokens needed
* @param args parameters of the original call
* @throws Throwable
*/
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
try {
// 直接出發(fā)下游的slot entry操作
fireEntry(context, resourceWrapper, node, count, args);
// 如果到達這里說明獲取token成功,可以繼續(xù)操作
// 首先增加訪問資源的并發(fā)線程數(shù)
node.increaseThreadNum();
// 在增加當(dāng)前秒鐘pass的請求數(shù)
node.addPassRequest();
// 如果在調(diào)用entry之前指定了調(diào)用的origin窃判,即調(diào)用方
if (context.getCurEntry().getOriginNode() != null) {
// 則會有一個originNode钞楼,我們也需要做上面兩個增加操作
// 方便針對調(diào)用方的統(tǒng)計,為后續(xù)的限流做準備
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest();
}
// 這里應(yīng)該是一個全局的統(tǒng)計吧
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest();
}
// 這里我沒過多關(guān)注了
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setError(e);
// 如果觸發(fā)了BlockException袄琳,則說明獲取token失敗询件,被限流
// 因此增加當(dāng)前秒Block的請求數(shù)
// Add block count.
node.increaseBlockQps();
//這里是針對調(diào)用方origin的統(tǒng)計
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// 如果觸發(fā)了exception
// 增加這個請求當(dāng)前秒Exception的數(shù)目
// Should not happen
node.increaseExceptionQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps();
}
throw e;
}
}
/**
* 在demo中調(diào)用SphO.exit進入獲取token邏輯
* 通過前面的Slot后到達這里
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param count tokens needed
* @param args parameters of the original call
*/
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// 記錄當(dāng)前請求的round trip time,即調(diào)用時間
node.rt(rt);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().rt(rt);
}
// 減少當(dāng)前資源的并發(fā)線程數(shù)
node.decreaseThreadNum();
// 按調(diào)用方減少資源的并發(fā)線程數(shù)
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
// 記錄全局的round trip time
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.rt(rt);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
// 調(diào)用下游的slot exit方法
fireExit(context, resourceWrapper, count);
}
}
我在上面的代碼中增加了一些注釋唆樊,我們可以知道宛琅,StaticticSlot只是責(zé)任鏈中的一環(huán),他通過調(diào)用DefaultNode的統(tǒng)計相關(guān)方法來完成流量的統(tǒng)計逗旁。我們接下來看看DefaultNode是怎么做的嘿辟。
DefaultNode
/**
* <p>
* A {@link Node} use to hold statistics for specific resource name in the specific context.
* Each distinct resource in each distinct {@link Context} will corresponding to a {@link DefaultNode}.
* </p>
* <p>
* This class may have a list of sub {@link DefaultNode}s. sub-node will be created when
* call {@link SphU}#entry() or {@link SphO}@entry() multi times in the same {@link Context}.
* </p>
*
* @author qinan.qn
* @see NodeSelectorSlot
*/
public class DefaultNode extends StatisticNode {
private ResourceWrapper id;
private volatile HashSet<Node> childList = new HashSet<Node>();
private ClusterNode clusterNode;
public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) {
this.id = id;
this.clusterNode = clusterNode;
}
public ResourceWrapper getId() {
return id;
}
public ClusterNode getClusterNode() {
return clusterNode;
}
public void setClusterNode(ClusterNode clusterNode) {
this.clusterNode = clusterNode;
}
public void addChild(Node node) {
if (!childList.contains(node)) {
synchronized (this) {
if (!childList.contains(node)) {
HashSet<Node> newSet = new HashSet<Node>(childList.size() + 1);
newSet.addAll(childList);
newSet.add(node);
childList = newSet;
}
}
RecordLog.info(String.format("Add child %s to %s", ((DefaultNode)node).id.getName(), id.getName()));
}
}
public void removeChildList() {
this.childList = new HashSet<Node>();
}
public Set<Node> getChildList() {
return childList;
}
@Override
public void increaseBlockQps() {
super.increaseBlockQps();
this.clusterNode.increaseBlockQps();
}
@Override
public void increaseExceptionQps() {
super.increaseExceptionQps();
this.clusterNode.increaseExceptionQps();
}
@Override
public void rt(long rt) {
super.rt(rt);
this.clusterNode.rt(rt);
}
...
我們看到DefaultNode實際上在統(tǒng)計相關(guān)的調(diào)用中使用了super的對應(yīng)方法,我們繼續(xù)看他的父類StatisticNode
/**
* @author qinan.qn
* @author jialiang.linjl
*/
public class StatisticNode implements Node {
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 60);
private AtomicInteger curThreadNum = new AtomicInteger(0);
private long lastFetchTime = -1;
...
@Override
public long maxSuccessQps() {
return rollingCounterInSecond.maxSuccess() * SampleCountProperty.SAMPLE_COUNT;
}
@Override
public long avgRt() {
long successCount = rollingCounterInSecond.success();
if (successCount == 0) {
return 0;
}
return rollingCounterInSecond.rt() / successCount;
}
@Override
public long minRt() {
return rollingCounterInSecond.minRt();
}
@Override
public int curThreadNum() {
return curThreadNum.get();
}
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
...
這里我們看到在他的內(nèi)部使用了兩個ArrayMetric來做最終的統(tǒng)計痢艺,一個是基于以一秒為單位統(tǒng)計(即QPS)仓洼,一個以一分鐘為單位統(tǒng)計(total開頭的),這個從兩個變量的名字就能感受出來:
private transient volatile Metric rollingCounterInSecond ...
private transient Metric rollingCounterInMinute ...
接著就去看ArrayMetric的代碼
ArrayMetric
/**
* The basic metric class in Sentinel using a {@link MetricsLeapArray} internal.
*
* @author jialiang.linjl
* @author Eric Zhao
*/
public class ArrayMetric implements Metric {
private final MetricsLeapArray data;
/**
* Constructor
*
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param intervalInSec the total time span of this {@link ArrayMetric} in seconds.
*/
public ArrayMetric(int windowLengthInMs, int intervalInSec) {
this.data = new MetricsLeapArray(windowLengthInMs, intervalInSec);
}
/**
* For unit test.
*/
public ArrayMetric(MetricsLeapArray array) {
this.data = array;
}
@Override
public long success() {
data.currentWindow();
long success = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
....
@Override
public void addBlock() {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addBlock();
}
@Override
public void addSuccess() {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess();
}
...
上面的代碼中有兩點需要我們注意:
- ArrayMetric將真正的信息放在了MetricsLeapArray中堤舒。創(chuàng)建MetricsLeapArray需要兩個參數(shù)色建。
- windowLengthInMs代表的是滾動窗口的大小,以毫秒為單位
- intervalInSec代表的是整個統(tǒng)計的時長舌缤,以秒為單位箕戳。
- 每個方法調(diào)用的第一個操作都是data.currentWindow(),這個操作是什么意義呢国撵?
帶著這些疑問陵吸,我們來到了MetricsLeapArray
MetricsLeapArray
/**
* The fundamental data structure for metric statistics in a time window.
*
* @see LeapArray
* @author jialiang.linjl
* @author Eric Zhao
*/
public class MetricsLeapArray extends LeapArray<MetricBucket> {
/**
* Constructor
*
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param intervalInSec the total time span of this {@link MetricsLeapArray} in seconds.
*/
public MetricsLeapArray(int windowLengthInMs, int intervalInSec) {
super(windowLengthInMs, intervalInSec);
}
@Override
public MetricBucket newEmptyBucket() {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}
}
MetricsLeapArray繼承了LeapArray<MetricBucket>,并包含兩個方法:
- newEmptyBucket 創(chuàng)建一個新的空的Bucket(統(tǒng)計桶)
- resetWindowTo 通過傳入的時間戳來重置滾動窗口和它所包含的統(tǒng)計桶
這幾個方法和變量命名都非常易懂介牙,這里也不多展開壮虫,我們終于來到了最終的統(tǒng)計所在LeapArray<MetricBucket>:
LeapArray
首先我們看LeapArray的成員變量和構(gòu)造函數(shù):
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
protected final AtomicReferenceArray<WindowWrap<T>> array;
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@link #sampleCount} = intervalInSec * 1000 / windowLengthInMs.
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param intervalInSec the total time span of this {@link LeapArray} in seconds.
*/
public LeapArray(int windowLengthInMs, int intervalInSec) {
this.windowLengthInMs = windowLengthInMs;
this.intervalInMs = intervalInSec * 1000;
this.sampleCount = intervalInMs / windowLengthInMs;
this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}
從這些代碼我們可以知道:
- windowLengthInMs 跟之前說的一樣,是滾動窗口中每個窗口的長度,以毫秒為單位
- invervalInMs 即整個統(tǒng)計時長囚似,以毫秒為單位
- sampleCount 即在整個統(tǒng)計時長中需要有多少個采樣窗口
- array 通過AtomicReferenceArray來存儲一個WindowWrap的原子數(shù)組剩拢,是存放滾動窗口的物理實現(xiàn)
接著我們來看剛剛提到的currentWindow:
/**
* Get the window at current timestamp.
*
* @return the window at current timestamp
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
/**
* Get window at provided timestamp.
*
* @param time a valid timestamp
* @return the window at provided timestamp
*/
public WindowWrap<T> currentWindow(long time) {
// 獲取當(dāng)前毫秒對應(yīng)到window length的一個id
long timeId = time / windowLengthInMs;
// Calculate current index.
// 獲取這個id對應(yīng)到滾動數(shù)組中的具體index
// 通過mod操作完成了數(shù)組的滾動
int idx = (int)(timeId % array.length());
// Cut the time to current window start.
// 計算出這個window對應(yīng)的開始時間戳
time = time - time % windowLengthInMs;
// 自旋循環(huán)開始
while (true) {
// 獲取index對應(yīng)的窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 如果是null, 說明出于滾動窗口初始化階段
// 創(chuàng)建一個新的窗口饶唤,通過調(diào)用newEmptyBucket來獲取新的統(tǒng)計桶
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
// CAS 設(shè)置 AtomicReferenceArray里面對應(yīng)的元素
if (array.compareAndSet(idx, null, window)) {
// 如果設(shè)置成功就返回當(dāng)前的window
return window;
} else {
// 如果不成功調(diào)用 線程讓步(這里不太明白)
// 進入下一次自旋循環(huán)
Thread.yield();
}
} else if (time == old.windowStart()) {
// 如果開始時間與現(xiàn)存的窗口的開始時間一致
// 表明請求時間戳與現(xiàn)存的窗口匹配徐伐,因此直接返回
return old;
} else if (time > old.windowStart()) {
// 如果請求的時間戳大于現(xiàn)存的窗口的開始時間
// 說明當(dāng)前的窗口已經(jīng)是陳舊的,也就是屬于已經(jīng)過去的一個統(tǒng)計時長之外的數(shù)據(jù)
// 因此需要重置窗口的數(shù)據(jù)
if (updateLock.tryLock()) {
try {
// 嘗試獲取update鎖成功
// 調(diào)用resetWindowTo方法重置
// if (old is deprecated) then [LOCK] resetTo currentTime.
return resetWindowTo(old, time);
} finally {
updateLock.unlock();
}
} else {
// 如果獲取鎖失敗募狂,說明已經(jīng)有其他線程獲取鎖并進行更新
// 因此調(diào)用線程讓步 并進入下一次自旋循環(huán)
Thread.yield();
}
} else if (time < old.windowStart()) {
// 如果請求的時間比現(xiàn)存的還小办素,直接返回一個空的,說明這次請求的時間戳已經(jīng)陳舊了
// Cannot go through here.
return new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
}
}
}
關(guān)于這段代碼的講解我已經(jīng)寫在了注釋里面祸穷,需要注意的可能有幾點:
- 通過while(true) 的自旋運算盡可能的減少了鎖的使用性穿,增強了線程的吞吐量
- 在一些沖突的情況下使用了thread yield方法,我查資料得到這個方法類似于讓線程讓步粱哼,但是調(diào)度器可以不理會季二,所以有可能不會有任何影響,這里是處于怎樣的考慮可能需要大家提示一下揭措,我理解的話可能是在沖突的時候盡可能留出時間給winner做好它該做的,然后loser在讓步結(jié)束之后能夠完成它該做的
另外刻蚯,值得注意的一點是這里獲取當(dāng)前時間戳使用了一個TimeUtil绊含,而不是System.currentTimeMillis,我們看看這個TimeUtil的實現(xiàn):
/**
* Provides millisecond-level time of OS.
*
* @author qinan.qn
*/
public final class TimeUtil {
private static volatile long currentTimeMillis;
static {
currentTimeMillis = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
currentTimeMillis = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (Throwable e) {
}
}
}
});
daemon.setDaemon(true);
daemon.setName("sentinel-time-tick-thread");
daemon.start();
}
public static long currentTimeMillis() {
return currentTimeMillis;
}
}
這段代碼就很簡單了炊汹,相當(dāng)于啟動了一個線程每sleep 1ms喚醒并且調(diào)用System.currentTimeMillis記錄當(dāng)前時間戳到volatile變量中躬充。這段代碼我理解是通過這個線程來更新時間戳,這樣每秒調(diào)用System.currentTimeMillis的次數(shù)穩(wěn)定為1000次讨便,如果不通過這個Util的話調(diào)用的次數(shù)無法估計充甚,有可能遠大于1000次,是否是有耗時等性能上的考慮霸褒?這個也歡迎大家提出意見伴找。
通過上面的代碼我們就可以知道,每次操作調(diào)用currentWindow相當(dāng)于是一次對齊操作废菱,無論是增加計數(shù)還是統(tǒng)計技矮,調(diào)用currentWindow之后保證了我們底層存儲的AtomicReferenceArray中對應(yīng)index存放的肯定是當(dāng)前時間戳對應(yīng)的window,而絕不可能是陳舊的信息殊轴。
接下來我們再回過頭看看增加計數(shù)的代碼(ArrayMetric中):
@Override
public void addSuccess() {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess();
}
其實就很好理解了衰倦,首先獲取當(dāng)前時間戳對應(yīng)的window信息,然后通過addSuccess來做到原子增旁理。這里內(nèi)部使用了阿里自己開發(fā)的一個LongAddr樊零,由于時間有限,我沒有對這個進行深入分析了孽文,可以看做是一個AtomicLong驻襟,應(yīng)該性能會提高不少夺艰。
然后我們再看看一個統(tǒng)計代碼(ArrayMetric中):
public long success() {
data.currentWindow();
long success = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
success += window.success();
}
return success;
}
這里可以理解的是通過調(diào)用底層LeapArray的values方法獲取到了滾動數(shù)組中所有的“有效”窗口,然后通過累加這些窗口的success的數(shù)量得到整個統(tǒng)計時長的總success數(shù)塑悼,并返回劲适,完成了統(tǒng)計功能。這里有個問題厢蒜,什么叫有效窗口霞势?我們接著看LeapArray中的values方法:
public List<T> values() {
// 結(jié)果容器
List<T> result = new ArrayList<T>();
for (int i = 0; i < array.length(); i++) {
// 遍歷底層AtomicReferenceArray的元素
WindowWrap<T> windowWrap = array.get(i);
// 如果當(dāng)前時間窗為空或者已經(jīng)無效則無視之
if (windowWrap == null || isWindowDeprecated(windowWrap)) {
continue;
}
// 否則添加到結(jié)果中
result.add(windowWrap.value());
}
return result;
}
private boolean isWindowDeprecated(WindowWrap<T> windowWrap) {
// 如果當(dāng)前時間與對應(yīng)時間窗開始時間的差值大于整個統(tǒng)計時長
// 說明這個時間窗已經(jīng)陳舊,無需納入統(tǒng)計中
return TimeUtil.currentTimeMillis() - windowWrap.windowStart() >= intervalInMs;
}
通過上述代碼我添加的注釋就已經(jīng)很清楚了斑鸦,isWindowDeprecated方法用來判斷時間窗的有效性愕贡,values通過遍歷底層滾動數(shù)組中每個時間窗元素,并判斷其有效性巷屿,最后返回在統(tǒng)計時長內(nèi)有效的統(tǒng)計數(shù)固以。
為了更加清晰的說明整個流程,大家可以參考下圖來理解:
結(jié)語
至此嘱巾,我已經(jīng)完成了對Sentinel中流量統(tǒng)計部分代碼的分析憨琳,希望大吉能夠喜歡,對于文中講的不清楚或者不正確的地方希望大家指正旬昭,共同進步篙螟!