集群限流源碼分析(二)
接下來分析sentinel-cluster-server-default模塊中server包下的內(nèi)容。
1悟狱、源碼目錄結(jié)構(gòu)
2、源碼分析
2.1 編解碼codec
2.1.1 ServerEntityCodecProvider
編解碼提供工具類击儡,用于獲取RequestEntityDecoder與ResponseEntityWriter闷哆;
源碼如下:
public final class ServerEntityCodecProvider {
private static RequestEntityDecoder requestEntityDecoder = null;
private static ResponseEntityWriter responseEntityWriter = null;
static {
resolveInstance();
}
private static void resolveInstance() {
ResponseEntityWriter writer = SpiLoader.loadFirstInstance(ResponseEntityWriter.class);
if (writer == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing response entity writer, resolve failed");
} else {
responseEntityWriter = writer;
RecordLog.info(
"[ServerEntityCodecProvider] Response entity writer resolved: " + responseEntityWriter.getClass()
.getCanonicalName());
}
RequestEntityDecoder decoder = SpiLoader.loadFirstInstance(RequestEntityDecoder.class);
if (decoder == null) {
RecordLog.warn("[ServerEntityCodecProvider] No existing request entity decoder, resolve failed");
} else {
requestEntityDecoder = decoder;
RecordLog.info(
"[ServerEntityCodecProvider] Request entity decoder resolved: " + requestEntityDecoder.getClass()
.getCanonicalName());
}
}
public static RequestEntityDecoder getRequestEntityDecoder() {
return requestEntityDecoder;
}
public static ResponseEntityWriter getResponseEntityWriter() {
return responseEntityWriter;
}
}
實(shí)例變量
- requestEntityDecoder:客戶端請(qǐng)求解碼器
- responseEntityWriter:服務(wù)端響應(yīng)編碼器
靜態(tài)方法
- resolveInstance:通過SPI機(jī)制獲取requestEntityDecoder與responseEntityWriter的實(shí)現(xiàn)類腰奋;并保存在實(shí)例變量中。
- requestEntityDecoder與responseEntityWriter實(shí)現(xiàn)類分別是DefaultRequestEntityDecoder抱怔、DefaultResponseEntityWriter氛堕;實(shí)現(xiàn)類的指定配置META-INF.services目錄下。
2.1.2 DefaultRequestEntityDecoder
默認(rèn)的ClusterRequest對(duì)象解碼器
解碼格式如下:
- +--------+---------+---------+
- | xid(4) | type(1) | data... |
- +--------+---------+---------+
源碼
@Override
public ClusterRequest decode(ByteBuf source) {
//1. 判斷可讀的字節(jié)數(shù)是否大于5個(gè)字節(jié)
if (source.readableBytes() >= 5) {
//2. 獲取xid:4個(gè)字節(jié)
int xid = source.readInt();
//3. 獲取type 1個(gè)字節(jié)野蝇;三種類型:PING讼稚、FLOW、PARAM_FLOW:下文會(huì)講
int type = source.readByte();
//4. 從注冊(cè)器中獲取具體類型的解碼器
EntityDecoder<ByteBuf, ?> dataDecoder = RequestDataDecodeRegistry.getDecoder(type);
if (dataDecoder == null) {
RecordLog.warn("Unknown type of request data decoder: {0}", type);
return null;
}
Object data;
//5. 再次確認(rèn)下可讀取的字節(jié)數(shù)
if (source.readableBytes() == 0) {
data = null;
} else {
// 6. 讀取數(shù)據(jù)
data = dataDecoder.decode(source);
}
return new ClusterRequest<>(xid, type, data);
}
return null;
}
2.1.3 DefaultResponseEntityWriter
默認(rèn)的ClusterResponse響應(yīng)編碼器
源碼
@Override
public void writeTo(ClusterResponse response, ByteBuf out) {
//1. 獲取響應(yīng)類型及具體的響應(yīng)數(shù)據(jù)編碼器
int type = response.getType();
EntityWriter<Object, ByteBuf> responseDataWriter = ResponseDataWriterRegistrygetWriter(type);
if (responseDataWriter == null) {
writeHead(response.setStatus(ClusterConstants.RESPONSE_STATUS_BAD), out);
RecordLog.warn("[NettyResponseEncoder] Cannot find matching writer for type <{0>", response.getType());
return;
}
// 2. 寫入頭部數(shù)據(jù)
writeHead(response, out);
// 3. 寫入數(shù)據(jù)包
responseDataWriter.writeTo(response.getData(), out);
}
private void writeHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(response.getType());
//狀態(tài)绕沈,如錯(cuò)誤狀態(tài)會(huì)寫入
out.writeByte(response.getStatus());
}
2.1.3 registry
- RequestDataDecodeRegistry:RequestData解碼注冊(cè)器
- ResponseDataWriterRegistry:ResponseData編碼注冊(cè)器
源碼
public final class RequestDataDecodeRegistry {
//保存EntityDecoder的Map锐想,key是類型type
private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();
public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
if (DECODER_MAP.containsKey(type)) {
return false;
}
DECODER_MAP.put(type, decoder);
return true;
}
public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
}
public static boolean removeDecoder(int type) {
return DECODER_MAP.remove(type) != null;
}
}
public final class ResponseDataWriterRegistry {
//保存EntityWriter的Map,key是類型type
private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();
public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
if (WRITER_MAP.containsKey(type)) {
return false;
}
WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
return true;
}
public static EntityWriter<Object, ByteBuf> getWriter(int type) {
return WRITER_MAP.get(type);
}
public static boolean remove(int type) {
return WRITER_MAP.remove(type) != null;
}
}
源碼很簡(jiǎn)單乍狐,可以理解就是一個(gè)工具類赠摇;那么Map的中編碼、解碼器數(shù)據(jù)在哪個(gè)地方設(shè)置的呢浅蚪;在后面的init包中源碼會(huì)分析藕帜。
2.1.4 netty
這個(gè)包下其實(shí)就是handler類了,會(huì)放在ChannelPipeline中惜傲;在NettyTransportServer啟動(dòng)類中可以看到洽故。
源碼分析
public class NettyRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//1. 獲取RequestEntityDecoder
RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder();
if (requestDecoder == null) {
RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, "
+ "dropping the request");
return;
}
// TODO: handle decode error here.
// 2. 請(qǐng)求數(shù)據(jù)解碼
Request request = requestDecoder.decode(in);
if (request != null) {
out.add(request);
}
}
}
public class NettyResponseEncoder extends MessageToByteEncoder<ClusterResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, ClusterResponse response, ByteBuf out) throws Exception {
//1. 獲取ResponseEntityWriter
ResponseEntityWriter<ClusterResponse, ByteBuf> responseEntityWriter = ServerEntityCodecProvider.getResponseEntityWriter();
if (responseEntityWriter == null) {
RecordLog.warn("[NettyResponseEncoder] Cannot resolve the global response entity writer, reply bad status");
//寫入錯(cuò)誤狀態(tài)數(shù)據(jù)
writeBadStatusHead(response, out);
return;
}
//2. 寫入數(shù)據(jù)
responseEntityWriter.writeTo(response, out);
}
private void writeBadStatusHead(Response response, ByteBuf out) {
out.writeInt(response.getId());
out.writeByte(ClusterConstants.RESPONSE_STATUS_BAD);
out.writeByte(response.getStatus());
}
}
- NettyRequestDecoder類繼承ByteToMessageDecoder,而MessageToByteEncoder又繼承ChannelInboundHandlerAdapter盗誊,熟悉netty的知道时甚,通過繼承ChannelInboundHandlerAdapter可以自定義攔截器
- NettyResponseEncoder繼承MessageToByteEncoder,MessageToByteEncoder繼承ChannelOutboundHandlerAdapter
2.1.5 data
該包下就是EntityWriter和EntityDecoder的實(shí)現(xiàn)類了哈踱;其中EntityDecoder的子類有FlowRequestDataDecoder荒适,ParamFlowRequestDataDecoder,PingRequestDataDecoder开镣;EntityWriter的子類有FlowResponseDataWriter刀诬,PingResponseDataWriter。
2.1.5.1 對(duì)請(qǐng)求數(shù)據(jù)解碼
主要可以看到FlowRequestDataDecoder邪财、ParamFlowRequestDataDecoder陕壹、PingRequestDataDecoder類
對(duì)ClusterResponse響應(yīng)的數(shù)據(jù)進(jìn)行解碼质欲。
- FlowRequestDataDecoder
通用限流響應(yīng)數(shù)據(jù)解碼器,解碼后對(duì)象FlowRequestData,配合FlowRequestDataWriter查看
| flow ID (8) | count (4) | priority flag (1) |
源碼
public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowRequestData> {
@Override
public FlowRequestData decode(ByteBuf source) {
// 字節(jié)數(shù)判斷帐要,需要大于12
if (source.readableBytes() >= 12) {
FlowRequestData requestData = new FlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
//判斷是否有priority屬性
if (source.readableBytes() >= 1) {
requestData.setPriority(source.readBoolean());
}
return requestData;
}
return null;
}
}
- ParamFlowRequestDataDecoder
熱點(diǎn)參數(shù)解碼器,解碼后的對(duì)象是ParamFlowRequestData弥奸,配合ParamFlowRequestDataWriter查看
| flow ID (8) | count (4) | param count (4) |
源碼
public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> {
@Override
public ParamFlowRequestData decode(ByteBuf source) {
//讀取16個(gè)字符
//| flow ID (8) | count (4) | param count (4) |
if (source.readableBytes() >= 16) {
ParamFlowRequestData requestData = new ParamFlowRequestData()
.setFlowId(source.readLong())
.setCount(source.readInt());
//熱點(diǎn)參數(shù)個(gè)數(shù)
int amount = source.readInt();
if (amount > 0) {
List<Object> params = new ArrayList<>(amount);
for (int i = 0; i < amount; i++) {
//解析熱點(diǎn)參數(shù)
decodeParam(source, params);
}
requestData.setParams(params);
return requestData;
}
}
return null;
}
private boolean decodeParam(ByteBuf source, List<Object> params) {
//熱點(diǎn)參數(shù)類型
byte paramType = source.readByte();
switch (paramType) {
//int
case ClusterConstants.PARAM_TYPE_INTEGER:
params.add(source.readInt());
return true;
//string
case ClusterConstants.PARAM_TYPE_STRING:
//先讀取字符長(zhǎng)度
int length = source.readInt();
byte[] bytes = new byte[length];
//讀取字符
source.readBytes(bytes);
// TODO: take care of charset?
params.add(new String(bytes));
return true;
//boolean
case ClusterConstants.PARAM_TYPE_BOOLEAN:
params.add(source.readBoolean());
return true;
//double
case ClusterConstants.PARAM_TYPE_DOUBLE:
params.add(source.readDouble());
return true;
//long
case ClusterConstants.PARAM_TYPE_LONG:
params.add(source.readLong());
return true;
//float
case ClusterConstants.PARAM_TYPE_FLOAT:
params.add(source.readFloat());
return true;
//byte
case ClusterConstants.PARAM_TYPE_BYTE:
params.add(source.readByte());
return true;
//short
case ClusterConstants.PARAM_TYPE_SHORT:
params.add(source.readShort());
return true;
default:
return false;
}
}
}
- PingRequestDataDecoder
測(cè)試數(shù)據(jù)
2.1.5.2 響應(yīng)數(shù)據(jù)編碼
有FlowResponseDataWriter榨惠、PingRequestDataWriter
- FlowResponseDataWriter:Server端響應(yīng)編碼器,需要配合FlowResponseDataDecoder查看
源碼
public class FlowResponseDataWriter implements EntityWriter<FlowTokenResponseData, ByteBuf> {
@Override
public void writeTo(FlowTokenResponseData entity, ByteBuf out) {
//剩下的數(shù)據(jù)
out.writeInt(entity.getRemainingCount());
//等待時(shí)間
out.writeInt(entity.getWaitInMs());
}
}
- PingRequestDataWriter
測(cè)試數(shù)據(jù)
2.2 配置config
這個(gè)包下主要是server的一些配置盛霎,包括常量配置赠橙、動(dòng)態(tài)配置等。
2.2.1 ServerTransportConfigObserver接口
源碼
public interface ServerTransportConfigObserver {
/**
* Callback on server transport config (e.g. port) change.
*
* @param config new server transport config
*/
//定義了一個(gè)回調(diào)方法愤炸,用于傳輸配置變更時(shí)通知
void onTransportConfigChange(ServerTransportConfig config);
}
2.2.2 ServerTransportConfig
實(shí)例變量
- port:端口
- idleSeconds:活躍時(shí)間
2.2.3 ServerFlowConfig
服務(wù)流控規(guī)則配置及默認(rèn)值期揪,看源碼
源碼
public static final double DEFAULT_EXCEED_COUNT = 1.0d;
public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d;
public static final int DEFAULT_INTERVAL_MS = 1000;
public static final int DEFAULT_SAMPLE_COUNT= 10;
public static final double DEFAULT_MAX_ALLOWED_QPS= 30000;
private final String namespace;
//超過數(shù)
private double exceedCount = DEFAULT_EXCEED_COUNT;
//最大比例
private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO;
//間隔ms
private int intervalMs = DEFAULT_INTERVAL_MS;
//采樣個(gè)數(shù)
private int sampleCount = DEFAULT_SAMPLE_COUNT;
//最大允許qps
private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS;
2.2.4 ClusterServerConfigManager
集群流控配置管理器
實(shí)例變量并賦值默認(rèn)值
private static boolean embedded = false;
/**
* Server global transport and scope config.
* 全局的服務(wù)端傳送配置
*/
private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT;
private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS;
private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
/**
* Server global flow config.
* 服務(wù)端流控配置
*/
private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT;
private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO;
private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS;
private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT;
private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS;
動(dòng)態(tài)配置初始化
/**
* Namespace-specific flow config for token server.
* Format: (namespace, config).
*/
//token server的命名配置
private static final Map<String, ServerFlowConfig> NAMESPACE_CONF = new ConcurrentHashMap<>();
//服務(wù)端Transport觀察者
private static final List<ServerTransportConfigObserver> TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>();
/**
* Property for cluster server global transport configuration.
*/
//傳輸動(dòng)態(tài)配置
private static SentinelProperty<ServerTransportConfig> transportConfigProperty = new DynamicSentinelProperty<>();
/**
* Property for cluster server namespace set.
*/
//namespace動(dòng)態(tài)配置
private static SentinelProperty<Set<String>> namespaceSetProperty = new DynamicSentinelProperty<>();
/**
* Property for cluster server global flow control configuration.
*/
//流控規(guī)則動(dòng)態(tài)配置
private static SentinelProperty<ServerFlowConfig> globalFlowProperty = new DynamicSentinelProperty<>();
//配置監(jiān)聽者
private static final PropertyListener<ServerTransportConfig> TRANSPORT_PROPERTY_LISTENER
= new ServerGlobalTransportPropertyListener();
private static final PropertyListener<ServerFlowConfig> GLOBAL_FLOW_PROPERTY_LISTENER
= new ServerGlobalFlowPropertyListener();
private static final PropertyListener<Set<String>> NAMESPACE_SET_PROPERTY_LISTENER
= new ServerNamespaceSetPropertyListener();
//啟動(dòng)時(shí)加載,增加監(jiān)聽器
static {
transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER);
globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER);
namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER);
}
//省略部分代碼
//增加傳送變更配置變更者
public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) {
AssertUtil.notNull(observer, "observer cannot be null");
TRANSPORT_CONFIG_OBSERVERS.add(observer);
}
監(jiān)聽器內(nèi)部類
//namespace 監(jiān)聽器
private static class ServerNamespaceSetPropertyListener implements PropertyListener<Set<String>> {
@Override
public synchronized void configLoad(Set<String> set) {
if (set == null || set.isEmpty()) {
RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set");
return;
}
//更新
applyNamespaceSetChange(set);
}
@Override
public synchronized void configUpdate(Set<String> set) {
// TODO: should debounce?
applyNamespaceSetChange(set);
}
}
private static void applyNamespaceSetChange(Set<String> newSet) {
if (newSet == null) {
return;
}
RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet);
if (newSet.isEmpty()) {
ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
return;
}
newSet = new HashSet<>(newSet);
// Always add the `default` namespace to the namespace set.
newSet.add(ServerConstants.DEFAULT_NAMESPACE);
if (embedded) {
// In embedded server mode, the server itself is also a part of service,
// so it should be added to namespace set.
// By default, the added namespace is the appName.
//嵌入模式也需要增加
newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get());
}
//更新
Set<String> oldSet = ClusterServerConfigManager.namespaceSet;
if (oldSet != null && !oldSet.isEmpty()) {
for (String ns : oldSet) {
// Remove the cluster rule property for deprecated namespace set.
if (!newSet.contains(ns)) {
ClusterFlowRuleManager.removeProperty(ns);
ClusterParamFlowRuleManager.removeProperty(ns);
}
}
}
ClusterServerConfigManager.namespaceSet = newSet;
//注冊(cè)規(guī)則屬性规个,在后面會(huì)講解
for (String ns : newSet) {
// Register the rule property if needed.
ClusterFlowRuleManager.registerPropertyIfAbsent(ns);
ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns);
// Initialize the global QPS limiter for the namespace.
GlobalRequestLimiter.initIfAbsent(ns);
}
}
//globaTransport監(jiān)聽器
private static class ServerGlobalTransportPropertyListener implements PropertyListener<ServerTransportConfig> {
@Override
public void configLoad(ServerTransportConfig config) {
if (config == null) {
RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config");
return;
}
applyConfig(config);
}
@Override
public void configUpdate(ServerTransportConfig config) {
//應(yīng)用配置
applyConfig(config);
}
private synchronized void applyConfig(ServerTransportConfig config) {
//校驗(yàn)配置
if (!isValidTransportConfig(config)) {
RecordLog.warn(
"[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config);
return;
}
RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config);
if (config.getIdleSeconds() != idleSeconds) {
idleSeconds = config.getIdleSeconds();
}
//更新server的token
updateTokenServer(config);
}
}
private static void updateTokenServer(ServerTransportConfig config) {
int newPort = config.getPort();
AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)");
if (newPort == port) {
return;
}
ClusterServerConfigManager.port = newPort;
for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) {
//更新凤薛,通過觀察者模式更新
observer.onTransportConfigChange(config);
}
}
//globalFlow流控監(jiān)聽器
private static class ServerGlobalFlowPropertyListener implements PropertyListener<ServerFlowConfig> {
@Override
public void configUpdate(ServerFlowConfig config) {
//更新
applyGlobalFlowConfig(config);
}
@Override
public void configLoad(ServerFlowConfig config) {
applyGlobalFlowConfig(config);
}
private synchronized void applyGlobalFlowConfig(ServerFlowConfig config) {
//校驗(yàn)規(guī)則
if (!isValidFlowConfig(config)) {
RecordLog.warn(
"[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config);
return;
}
RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config);
//判斷有沒有更新
if (config.getExceedCount() != exceedCount) {
exceedCount = config.getExceedCount();
}
if (config.getMaxOccupyRatio() != maxOccupyRatio) {
maxOccupyRatio = config.getMaxOccupyRatio();
}
if (config.getMaxAllowedQps() != maxAllowedQps) {
maxAllowedQps = config.getMaxAllowedQps();
//調(diào)用GlobalRequestLimiter設(shè)置qps變更
GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps);
}
int newIntervalMs = config.getIntervalMs();
int newSampleCount = config.getSampleCount();
if (newIntervalMs != intervalMs || newSampleCount != sampleCount) {
if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) {
RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count");
} else {
intervalMs = newIntervalMs;
sampleCount = newSampleCount;
// Reset all the metrics.
//重置統(tǒng)計(jì)指標(biāo)
ClusterMetricStatistics.resetFlowMetrics();
ClusterParamMetricStatistics.resetFlowMetrics();
}
}
}
}
public static boolean isValidTransportConfig(ServerTransportConfig config) {
return config != null && config.getPort() > 0 && config.getPort() <= 65535;
}
public static boolean isValidFlowConfig(ServerFlowConfig config) {
return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0
&& config.getMaxAllowedQps() >= 0
&& FlowRuleUtil.isWindowConfigValid(config.getSampleCount(), config.getIntervalMs());
}
2.3 連接器connection
這個(gè)包下主要就是集群鏈接器、連接器組的管理诞仓。
2.3.1 Connection接口
源碼
public interface Connection extends AutoCloseable {
//獲取SocketAddress
SocketAddress getLocalAddress();
//port
int getRemotePort();
//ip
String getRemoteIP();
//刷新readTime
void refreshLastReadTime(long lastReadTime);
//獲取lastReadTime
long getLastReadTime();
//獲取鏈接的key
String getConnectionKey();
}
Connection繼承AutoCloseable接口后就可以自動(dòng)釋放資源了缤苫,JDK中的文件流操作在1.7版本后也實(shí)現(xiàn)了。
定義了6個(gè)方法墅拭,在子類實(shí)現(xiàn)活玲。
2.3.2 NettyConnection
實(shí)例變量
- remoteIp:遠(yuǎn)程ip
- remotePort:遠(yuǎn)程port
- channel:渠道Channel
- lastReadTime:上傳刷新事件
- pool:鏈接池
源碼
//Netty鏈接器
public class NettyConnection implements Connection {
private String remoteIp;
private int remotePort;
private Channel channel;
private long lastReadTime;
private ConnectionPool pool;
//構(gòu)造器,需要傳入channel以及pool谍婉;ConnectionPool下面會(huì)說
public NettyConnection(Channel channel, ConnectionPool pool) {
this.channel = channel;
this.pool = pool;
//獲取socketAddress
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
this.remoteIp = socketAddress.getAddress().getHostAddress();
this.remotePort = socketAddress.getPort();
this.lastReadTime = System.currentTimeMillis();
}
@Override
public SocketAddress getLocalAddress() {
return channel.localAddress();
}
//省略部分代碼
@Override
public String getConnectionKey() {
//ip:port
return remoteIp + ":" + remotePort;
}
@Override
//實(shí)現(xiàn)了AutoCloseable的close方法舒憾,可以自動(dòng)關(guān)閉資源
public void close() {
// Remove from connection pool.
pool.remove(channel);
// Close the connection.
if (channel != null && channel.isActive()){
channel.close();
}
}
}
2.3.3 ConnectionPool
通用連接池連接管理。
實(shí)例變量
- TIMER:初始化了一個(gè)可定時(shí)執(zhí)行的線程執(zhí)行器穗熬,核心線程2個(gè)
- CONNECTION_MAP:鏈接器保存對(duì)象:Format: ("ip:port", connection)
- scanTaskFuture:定期掃描任務(wù)
源碼
public class ConnectionPool {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);
/**
* Format: ("ip:port", connection)
*/
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
/**
* Periodic scan task.
*/
private ScheduledFuture scanTaskFuture = null;
//創(chuàng)建連接器
public void createConnection(Channel channel) {
if (channel != null) {
//通過構(gòu)造方法創(chuàng)建
Connection connection = new NettyConnection(channel, this);
//獲取connKey并保存在CONNECTION_MAP中
String connKey = getConnectionKey(channel);
CONNECTION_MAP.put(connKey, connection);
}
}
/**
* Start the scan task for long-idle connections.
*/
//啟動(dòng)一個(gè)定時(shí)任務(wù)镀迂,定時(shí)任務(wù)下面講解
private synchronized void startScan() {
if (scanTaskFuture == null
|| scanTaskFuture.isCancelled()
|| scanTaskFuture.isDone()) {
scanTaskFuture = TIMER.scheduleAtFixedRate(
new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
}
}
/**
* Format to "ip:port".
*
* @param channel channel
* @return formatted key
*/
private String getConnectionKey(Channel channel) {
InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress();
String remoteIp = socketAddress.getAddress().getHostAddress();
int remotePort = socketAddress.getPort();
return remoteIp + ":" + remotePort;
}
private String getConnectionKey(String ip, int port) {
return ip + ":" + port;
}
//刷新readTime,readTime為當(dāng)前時(shí)間
public void refreshLastReadTime(Channel channel) {
if (channel != null) {
String connKey = getConnectionKey(channel);
Connection connection = CONNECTION_MAP.get(connKey);
if (connection != null) {
connection.refreshLastReadTime(System.currentTimeMillis());
}
}
}
//獲取鏈接,直接從map中獲取
public Connection getConnection(String remoteIp, int remotePort) {
String connKey = getConnectionKey(remoteIp, remotePort);
return CONNECTION_MAP.get(connKey);
}
public void remove(Channel channel) {
String connKey = getConnectionKey(channel);
CONNECTION_MAP.remove(connKey);
}
public List<Connection> listAllConnection() {
List<Connection> connections = new ArrayList<Connection>(CONNECTION_MAP.values());
return connections;
}
public int count() {
return CONNECTION_MAP.size();
}
public void clear() {
CONNECTION_MAP.clear();
}
//shoudownAll
public void shutdownAll() throws Exception {
for (Connection c : CONNECTION_MAP.values()) {
c.close();
}
}
//刷新定時(shí)任務(wù)
public void refreshIdleTask() {
if (scanTaskFuture == null || scanTaskFuture.cancel(false)) {
startScan();
} else {
RecordLog.info("The result of canceling scanTask is error.");
}
}
}
2.3.4 ScanIdleConnectionTask
ScanIdleConnectionTask是定時(shí)任務(wù)執(zhí)行實(shí)現(xiàn)了Runnable的方法唤蔗。
源碼
public class ScanIdleConnectionTask implements Runnable {
private final ConnectionPool connectionPool;
//構(gòu)造方法
public ScanIdleConnectionTask(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
@Override
public void run() {
try {
//獲取鏈接獲取時(shí)間招拙,默認(rèn)600s
int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
long idleTimeMillis = idleSeconds * 1000;
if (idleTimeMillis < 0) {
idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000;
}
long now = System.currentTimeMillis();
//拿到所有的連接
List<Connection> connections = connectionPool.listAllConnection();
for (Connection conn : connections) {
//如果當(dāng)前時(shí)間-上次readTime大于活躍時(shí)間,說明鏈接可以關(guān)閉了
if ((now - conn.getLastReadTime()) > idleTimeMillis) {
RecordLog.info(
String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
+ "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
);
//關(guān)閉鏈接
conn.close();
}
}
} catch (Throwable t) {
RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
}
}
}
2.3.5 連接組ConnectionGroup
連接組可以理解就是連接connection的集合措译。
實(shí)例變量
- namespace:命名空間
- connectionSet:ConnectionDescriptor的集合别凤,ConnectionDescriptor的實(shí)例變量是address、host领虹。
- connectedCount:連接次數(shù)规哪,定義的是AtomicInteger類型
源碼分析
public class ConnectionGroup {
private final String namespace;
private final Set<ConnectionDescriptor> connectionSet = Collections.synchronizedSet(new HashSet<ConnectionDescriptor>());
private final AtomicInteger connectedCount = new AtomicInteger();
//帶namespace構(gòu)造函數(shù)
public ConnectionGroup(String namespace) {
AssertUtil.notEmpty(namespace, "namespace cannot be empty");
this.namespace = namespace;
}
//無參構(gòu)造函數(shù)
public ConnectionGroup() {
this(ServerConstants.DEFAULT_NAMESPACE);
}
//增加連接
public ConnectionGroup addConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");
//獲取host,若是ip:port形式塌衰,就只需要ip
String[] ip = address.split(":");
String host;
if (ip != null && ip.length >= 1) {
host = ip[0];
} else {
host = address;
}
//已經(jīng)重寫的equals诉稍、hashCode方法
boolean newAdded = connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host));
//增加成功蝠嘉,連接數(shù)加1
if (newAdded) {
connectedCount.incrementAndGet();
}
return this;
}
//移除連接
public ConnectionGroup removeConnection(String address) {
AssertUtil.notEmpty(address, "address cannot be empty");
if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) {
connectedCount.decrementAndGet();
}
return this;
}
}
2.3.6 連接管理ConnectionManager
顧名思義ConnectionManager是對(duì)連接的管
實(shí)例變量
- CONN_MAP:Connection map (namespace, connection).
- NAMESPACE_MAP:namespace map (address, namespace).
源碼分析
public final class ConnectionManager {
/**
* Connection map (namespace, connection).
*/
private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>();
/**
* namespace map (address, namespace).
*/
private static final Map<String, String> NAMESPACE_MAP = new ConcurrentHashMap<>();
/**
* Get connected count for specific namespace.
*
* @param namespace namespace to check
* @return connected count for specific namespace
*/
//獲取namespace的連接數(shù)
public static int getConnectedCount(String namespace) {
AssertUtil.notEmpty(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
return group == null ? 0 : group.getConnectedCount();
}
//查詢獲取創(chuàng)建連接組,注意:連接沒有創(chuàng)建
public static ConnectionGroup getOrCreateGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
//synchronized鎖住杯巨,防止并發(fā)問題
synchronized (CREATE_LOCK) {
if ((group = CONN_MAP.get(namespace)) == null) {
//創(chuàng)建并保存在CONN_MAP方便獲取
group = new ConnectionGroup(namespace);
CONN_MAP.put(namespace, group);
}
}
}
return group;
}
//移除連接
public static void removeConnection(String address) {
AssertUtil.assertNotBlank(address, "address should not be empty");
String namespace = NAMESPACE_MAP.get(address);
if (namespace != null) {
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
return;
}
//調(diào)用ConnectionGroup的方法蚤告,上面講解過,其實(shí)就是移除保存在set里面的方法服爷,
group.removeConnection(address);
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
}
//map中移除
NAMESPACE_MAP.remove(address);
}
//傳入兩個(gè)參數(shù)移除杜恰,少走一步獲取namespace
public static void removeConnection(String namespace, String address) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
AssertUtil.assertNotBlank(address, "address should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
if (group == null) {
return;
}
group.removeConnection(address);
NAMESPACE_MAP.remove(address);
RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
}
//增加連接,有address參數(shù)仍源,需要把a(bǔ)ddress增加到NAMESPACE_MAP map中心褐,并增加連接次數(shù)
public static ConnectionGroup addConnection(String namespace, String address) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
AssertUtil.assertNotBlank(address, "address should not be empty");
ConnectionGroup group = getOrCreateGroup(namespace);
group.addConnection(address);
NAMESPACE_MAP.put(address, namespace);
RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace);
return group;
}
//增加連接
public static ConnectionGroup getOrCreateConnectionGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = getOrCreateGroup(namespace);
return group;
}
// 拿到連接組
public static ConnectionGroup getConnectionGroup(String namespace) {
AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
ConnectionGroup group = CONN_MAP.get(namespace);
return group;
}
static void clear() {
CONN_MAP.clear();
NAMESPACE_MAP.clear();
}
private static final Object CREATE_LOCK = new Object();
private ConnectionManager() {}
}
2.4 處理器processor
這個(gè)包下主要是流控的處理器,分別有普通限流和熱點(diǎn)限流處理器
2.4.1 接口RequestProcessor
public interface RequestProcessor<T, R> {
/**
* Process the cluster request.
*
* @param request Sentinel cluster request
* @return the response after processed
*/
//處理請(qǐng)求
//有兩個(gè)實(shí)現(xiàn)類笼踩,分別是FlowRequestProcessor逗爹,ParamFlowRequestProcessor
ClusterResponse<R> processRequest(ClusterRequest<T> request);
}
2.4.2 RequestProcessorProvider
請(qǐng)求流控提供者,類似于工廠類
實(shí)例變量
- PROCESSOR_MAP:請(qǐng)求類型對(duì)應(yīng)的請(qǐng)求處理器
- SERVICE_LOADER:通過ServiceLoader加載流控實(shí)現(xiàn)類
源碼分析
public final class RequestProcessorProvider {
private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>();
//默認(rèn)配置的實(shí)現(xiàn)類有FlowRequestProcessor嚎于,ParamFlowRequestProcessor
private static final ServiceLoader<RequestProcessor> SERVICE_LOADER = ServiceLoaderUtil.getServiceLoader(
RequestProcessor.class);
//靜態(tài)代碼快掘而,類啟動(dòng)是會(huì)加載
static {
loadAndInit();
}
private static void loadAndInit() {
for (RequestProcessor processor : SERVICE_LOADER) {
Integer type = parseRequestType(processor);
if (type != null) {
//放入map中
PROCESSOR_MAP.put(type, processor);
}
}
}
//獲取RequestType
private static Integer parseRequestType(RequestProcessor processor) {
//配置在注解上
RequestType requestType = processor.getClass().getAnnotation(RequestType.class);
if (requestType != null) {
return requestType.value();
} else {
return null;
}
}
//獲取RequestProcessor
public static RequestProcessor getProcessor(int type) {
return PROCESSOR_MAP.get(type);
}
static void addProcessorIfAbsent(int type, RequestProcessor processor) {
// TBD: use putIfAbsent in JDK 1.8.
if (PROCESSOR_MAP.containsKey(type)) {
return;
}
PROCESSOR_MAP.put(type, processor);
}
static void addProcessor(int type, RequestProcessor processor) {
AssertUtil.notNull(processor, "processor cannot be null");
PROCESSOR_MAP.put(type, processor);
}
private RequestProcessorProvider() {}
}
2.4.3 FlowRequestProcessor
源碼分析
@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {
//獲取TokenService,配置的是DefaultTokenService
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
boolean prioritized = request.getData().isPriority();
//獲取請(qǐng)求token
TokenResult result = tokenService.requestToken(flowId, count, prioritized);
//解析響應(yīng)結(jié)果
return toResponse(result, request);
}
private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(result.getWaitInMs())
);
}
}
2.4.4 ParamFlowRequestProcessor
源碼分析
@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW)
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> {
@Override
public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<ParamFlowRequestData> request) {
TokenService tokenService = TokenServiceProvider.getService();
long flowId = request.getData().getFlowId();
int count = request.getData().getCount();
Collection<Object> args = request.getData().getParams();
//請(qǐng)求熱點(diǎn)參數(shù)
TokenResult result = tokenService.requestParamToken(flowId, count, args);
//流控接口解析
return toResponse(result, request);
}
private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
new FlowTokenResponseData()
.setRemainingCount(result.getRemaining())
.setWaitInMs(0)
);
}
}
流控解析于购,下一面講解镣屹。
2.5 啟動(dòng)加載器DefaultClusterServerInitFunc
源碼
//實(shí)現(xiàn)了InitFunc,InitFunc會(huì)在系統(tǒng)啟動(dòng)時(shí)加載
public class DefaultClusterServerInitFunc implements InitFunc {
@Override
public void init() throws Exception {
//初始化Decoders
initDefaultEntityDecoders();
//初始化wriders
initDefaultEntityWriters();
//初始化processors
initDefaultProcessors();
// Eagerly-trigger the SPI pre-load of token service.
// 這個(gè)時(shí)候就把TokenService加載好了
TokenServiceProvider.getService();
RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered");
}
private void initDefaultEntityWriters() {
//ping
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter());
//流控Writer
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter());
//熱點(diǎn)參數(shù)Writer和Flow一樣
ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter());
}
private void initDefaultEntityDecoders() {
//ping
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder());
//普通Flow
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder());
//熱點(diǎn)參數(shù)
RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder());
}
private void initDefaultProcessors() {
// Eagerly-trigger the SPI pre-load.
//獲取默認(rèn),實(shí)際上就是加載了价涝,這個(gè)類在上面已經(jīng)講過了
RequestProcessorProvider.getProcessor(0);
}
}
2.6 TokenServiceProvider
類似于RequestProcessor女蜈,可以了解到sentinel的源碼作者,習(xí)慣于使用Provider作為一個(gè)工廠使用色瘩。
源碼
public final class TokenServiceProvider {
private static TokenService service = null;
static {
resolveTokenServiceSpi();
}
public static TokenService getService() {
return service;
}
private static void resolveTokenServiceSpi() {
//加載TokenServeice伪窖,若不存在的則使用默認(rèn)的DefalutTokenService,這里使用的就是默認(rèn)的
service = SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);
if (service != null) {
RecordLog.info("[TokenServiceProvider] Global token service resolved: "
+ service.getClass().getCanonicalName());
} else {
RecordLog.warn("[TokenServiceProvider] Unable to resolve TokenService: no SPI found");
}
}
}
3居兆、其他內(nèi)容
- ClusterTokenServer:TokenServer相關(guān)的在后面與client關(guān)聯(lián)時(shí)再講解覆山,這邊有NettyTransportServer、SentinelDefaultTokenServer泥栖、DefaultEmbeddedTokenServer類簇宽,以及handler包下的TokenServerHandler。
- log下的ClusterServerStatLogUtil主要用于集群流控記錄日志的
- command.handler包下類主要用于管理中心動(dòng)態(tài)配置的查詢與變更的交互:如流控規(guī)則吧享、監(jiān)控指標(biāo)魏割、集群配置等