(apache-shenyu 2.4.3版本)本文一起學習apache-shyu的設計與實現(xiàn)÷牛看看shenyu如何支持多種注冊中心谦炒,多種數(shù)據(jù)同步協(xié)議的。
####### 當然通過 shenyu的disruptor應用
了解到所有數(shù)據(jù)同步风喇,實例以及元數(shù)據(jù)注冊的發(fā)送宁改,接收都使用了disruptor解耦并提高了可用性。
本文以 SyncDataService/DataChangedListener:websocket魂莫,以及ShenyuClientRegisterRepository/ShenyuClientServerRegisterRepository:nacos為主線來研究apache-shenyu的注冊中心整合以及元數(shù)據(jù)同步邏輯透且。
注冊中心整合
shenyu-register-center 作為注冊中心模塊,主要有三個模塊需要了解
- shenyu-register-client:ShenyuClientRegisterRepository,作為后端服務的注冊邏輯封裝秽誊,也就是我們業(yè)務服務會引入這個包的jar根據(jù)配置來選擇consul鲸沮,etcd,http锅论,nacos讼溺,zookeeper目前支持的5種注冊中心。(包括元數(shù)據(jù)最易,uri的注冊)
- shenyu-register-client-server:ShenyuClientServerRegisterRepository怒坯,作為業(yè)務服務上報的數(shù)據(jù)接收端,接收上報過來的數(shù)據(jù)然后處理藻懒。包括初始化剔猿,建立監(jiān)聽等,包括consul嬉荆,etcd归敬,nacos,zookeeper的子類供選擇鄙早,http比較特殊并木有這個模塊汪茧,http直接通過shenyu-admin的ShenyuClientHttpRegistryController類提供了兩個post接口用于上報元數(shù)據(jù)及uri
- shenyu-register-instance:用于注冊業(yè)務服務實例,shenyu這里的核心類ShenyuInstanceRegisterRepository只有consul限番,zookeeper舱污,etcd三個實現(xiàn)類,在這個模塊使用了spring的WebServerInitializedEvent通過web容器初始化事件觀察者進行實例注冊弥虐,可以提供給shenyu網(wǎng)關調用扩灯,也可以實現(xiàn)負載均衡等邏輯,但是這里為什么只有這三個注冊中心會加這個實例注冊邏輯目前不太了解...
下面來看看代碼
入口為ShenyuClientCommonBeanConfiguration類霜瘪。通過spring.factories指定驴剔,然后初始化一個注冊中心策略
ShenyuClientRegisterRepositoryFactory
public final class ShenyuClientRegisterRepositoryFactory {
// 可以看到 是可以支持多注冊中心邏輯
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通過RegisterType 來初始化 注冊中心子類實現(xiàn)
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
可以看到我們的業(yè)務服務只要引入了apache-shenyu的pom文件,就會通過spring-boot自動注入一個ShenyuClientRegisterRepository通過如下配置類
@Configuration
public class ShenyuClientCommonBeanConfiguration {
/**
* Register the register repository for http client bean post processor.
*
* @param config the config
* @return the client register repository
*/
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}
/**
* Shenyu Register Center Config.
*
* @return the Register Center Config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
/**
* Shenyu client config shenyu client config.
*
* @return the shenyu client config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}
核心api接口粥庄,ShenyuClientRegisterRepository
@SPI
public interface ShenyuClientRegisterRepository {
/**
* Init.
* 初始化業(yè)務服務的注冊客戶端
* @param config the config
*/
default void init(ShenyuRegisterCenterConfig config) {
}
/**
* Persist metadata.
* 注冊controller層接口的入口
* @param metadata metadata
*/
void persistInterface(MetaDataRegisterDTO metadata);
/**
* Persist uri.
* 注冊當前業(yè)務服務的ip及端口信息
* @param registerDTO the register dto
*/
default void persistURI(URIRegisterDTO registerDTO) {
}
/**
* Close.
*/
default void close() {
// 關閉
}
}
我們只看nacos的實現(xiàn)
@Join
public class NacosClientRegisterRepository implements ShenyuClientRegisterRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientRegisterRepository.class);
//用于獲取nacos name space 配置的key
private static final String NAMESPACE = "nacosNameSpace";
// 用于傳遞metadata 數(shù)據(jù)的key
private static final String URI_META_DATA = "uriMetadata";
// nacos 暴露出來的 操作configServer的接口
private ConfigService configService;
// nacos 暴露出來的 操作namingServer的接口
private NamingService namingService;
// 用于元數(shù)據(jù)的緩存,我們的業(yè)務服務會通過nacos的configServer上報關于當前服務的接口的元數(shù)據(jù)豺妓,通過當前類緩存惜互,每次都會全量上報
private final ConcurrentLinkedQueue<String> metadataCache = new ConcurrentLinkedQueue<>();
public NacosClientRegisterRepository() { }
public NacosClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}
@Override
public void init(final ShenyuRegisterCenterConfig config) {
// 去和nacos建立
String serverAddr = config.getServerLists();
Properties properties = config.getProps();
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty(NAMESPACE));
// the nacos authentication username
nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
// the nacos authentication password
nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
// access key for namespace
nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
// secret key for namespace
nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
try {
this.configService = ConfigFactory.createConfigService(nacosProperties);
this.namingService = NamingFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
@Override
public void close() {
try {
configService.shutDown();
namingService.shutDown();
} catch (NacosException e) {
LOGGER.error("NacosClientRegisterRepository close error!", e);
}
}
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {
String rpcType = metadata.getRpcType();
String contextPath = ContextPathUtils.buildRealNode(metadata.getContextPath(), metadata.getAppName());
// 注冊接口元數(shù)據(jù)
registerConfig(rpcType, contextPath, metadata);
}
/**
* Persist uri.
* 注冊 ip以及端口
* @param registerDTO the register dto
*/
@Override
public void persistURI(final URIRegisterDTO registerDTO) {
String rpcType = registerDTO.getRpcType();
String contextPath = ContextPathUtils.buildRealNode(registerDTO.getContextPath(), registerDTO.getAppName());
String host = registerDTO.getHost();
int port = registerDTO.getPort();
registerService(rpcType, contextPath, host, port, registerDTO);
}
// 這是一個 同步方法
private synchronized void registerService(final String rpcType,
final String contextPath,
final String host,
final int port,
final URIRegisterDTO registerDTO) {
// 注冊當前實例的 ip以及 端口
Instance instance = new Instance();
instance.setEphemeral(true);
instance.setIp(host);
instance.setPort(port);
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put(Constants.CONTEXT_PATH, contextPath);
metadataMap.put(URI_META_DATA, GsonUtils.getInstance().toJson(registerDTO));
instance.setMetadata(metadataMap);
String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType);
try {
// 通過nacos的 namingServer注冊當前服務實例
namingService.registerInstance(serviceName, instance);
} catch (NacosException e) {
throw new ShenyuException(e);
}
LOGGER.info("register service uri success: {}", serviceName);
}
private synchronized void registerConfig(final String rpcType,
final String contextPath,
final MetaDataRegisterDTO metadata) {
metadataCache.add(GsonUtils.getInstance().toJson(metadata));
String configName = RegisterPathConstants.buildServiceConfigPath(rpcType, contextPath);
try {
final String defaultGroup = NacosPathConstants.GROUP;
// 通過configServer注冊 接口元數(shù)據(jù)
if (configService.publishConfig(configName, defaultGroup, GsonUtils.getInstance().toJson(metadataCache))) {
LOGGER.info("register metadata success: {}", metadata.getRuleName());
} else {
throw new ShenyuException("register metadata fail , please check ");
}
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
}
然后我們來看一看 shenyu-admin側接收這些數(shù)據(jù)的邏輯,核心類-> ShenyuClientServerRegisterRepository
先來看位于shenyu-admin模塊中的配置類
@Configuration
public class RegisterCenterConfiguration {
/**
* Shenyu register center config shenyu register center config.
*
* @return the shenyu register center config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
@Bean(destroyMethod = "close")
public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 根據(jù)配置獲取到 spi對應的實現(xiàn)
ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
// 這里是從注冊中心接收到數(shù)據(jù)后琳拭,沒有立即調用shenyu內部邏輯训堆,而是通過一層disruptor解耦,并提供了高可用的一個削峰
RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
publisher.start(registerServiceMap);
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}
然后來看NacosClientServerRegisterRepository
@Join
public class NacosClientServerRegisterRepository implements ShenyuClientServerRegisterRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientServerRegisterRepository.class);
// 被注冊的業(yè)務服務白嘁,目前支持RpcTypeEnum.GRPC, RpcTypeEnum.HTTP, RpcTypeEnum.TARS, RpcTypeEnum.SPRING_CLOUD, RpcTypeEnum.DUBBO這些協(xié)議的后端
private static final List<RpcTypeEnum> RPC_URI_TYPE_SET = RpcTypeEnum.acquireSupportURIs();
// nacos 默認的分組
private final String defaultGroup = NacosPathConstants.GROUP;
// nacos 暴露出來的 configServer操作接口
private ConfigService configService;
// nacos 暴露出來的 namingServer操作接口
private NamingService namingService;
// 接收到的數(shù)據(jù)通過disruptor解耦坑鱼,并可以削峰
private ShenyuClientServerRegisterPublisher publisher;
// 通過跳表實現(xiàn)的線程安全的可排序的容器,用于接收到各種業(yè)務服務的實例的uri
private final ConcurrentSkipListSet<String> metadataConfigCache = new ConcurrentSkipListSet<>();
// key(不同的后端接口協(xié)議,例如grpc,springcloud等) -> 對應協(xié)議的所有后端服務實例的 uri
private final ConcurrentMap<String, ConcurrentSkipListSet<String>> uriServiceCache = new ConcurrentHashMap<>();
@Override
public void close() {
publisher.close();
}
@Override
public void init(final ShenyuClientServerRegisterPublisher publisher,
final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
String serverAddr = config.getServerLists();
Properties properties = config.getProps();
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty("nacosNameSpace"));
// the nacos authentication username
nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
// the nacos authentication password
nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
// access key for namespace
nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
// secret key for namespace
nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
try {
// 和nacos建立連接
this.configService = ConfigFactory.createConfigService(nacosProperties);
this.namingService = NamingFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
throw new ShenyuException(e);
}
// 對每一個 后端實例進行訂閱鲁沥,并訂閱對應實例的config(config是apache-shenyu自己用來同步業(yè)務服務接口元數(shù)據(jù)用的)
subscribe();
}
private void subscribe() {
// 每種協(xié)議都訂閱一遍 RpcTypeEnum.acquireSupportMetadatas().forEach(this::subscribeRpcTypeService);
}
private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
try {
Map<String, List<URIRegisterDTO>> services = new HashMap<>();
// 目前可以從nacos 的namingServer查到的可用實例
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
healthyInstances.forEach(healthyInstance -> {
String contextPath = healthyInstance.getMetadata().get("contextPath");
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 定于每個實例的 config信息呼股,shenyu用于同步接口元數(shù)據(jù)用
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
// 收集起來
services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
// 放入緩存
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
// 如果是apache-shenyu支持的接口協(xié)議,則通過disruptor發(fā)布真正的接收到nacos信息的事件
if (RPC_URI_TYPE_SET.contains(rpcType)) {
services.values().forEach(this::publishRegisterURI);
}
LOGGER.info("subscribe uri : {}", serviceName);
// 通過nacos的 namingService提供的監(jiān)聽器(長輪詢)來監(jiān)聽業(yè)務服務實例的上下線變化,再進行刷新,如果變化了画恰,會刷新本地緩存并重新調用當前方法彭谁,重新查詢一次服務實例,和config中的元數(shù)據(jù)進行刷新
namingService.subscribe(serviceName, event -> {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
instances.forEach(instance -> {
String contextPath = instance.getMetadata().get("contextPath");
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
refreshURIService(rpcType, serviceName);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
// 元數(shù)據(jù)訂閱
private void subscribeMetadata(final String serviceConfigName) {
// 注冊 接口元數(shù)據(jù)允扇,從nacos的configService讀取
registerMetadata(readData(serviceConfigName));
LOGGER.info("subscribe metadata: {}", serviceConfigName);
try {
configService.addListener(serviceConfigName, defaultGroup, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
// 通過configService提供的監(jiān)聽 如果config有變化則進行注冊
@Override
public void receiveConfigInfo(final String config) {
registerMetadata(config);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
@SuppressWarnings("unchecked")
private void registerMetadata(final String metadataConfig) {
List<String> metadataList = GsonUtils.getInstance().fromJson(metadataConfig, List.class);
metadataList.forEach(this::publishMetadata);
}
private void publishMetadata(final String data) {
LOGGER.info("publish metadata: {}", data);
publisher.publish(Lists.newArrayList(GsonUtils.getInstance().fromJson(data, MetaDataRegisterDTO.class)));
}
private void refreshURIService(final RpcTypeEnum rpcType, final String serviceName) {
Optional.ofNullable(uriServiceCache.get(serviceName)).ifPresent(services -> services.forEach(contextPath -> registerURI(contextPath, serviceName, rpcType)));
}
// 注冊服務實例的邏輯缠局,和注冊對應協(xié)議的服務實例邏輯有點像
private void registerURI(final String contextPath, final String serviceName, final RpcTypeEnum rpcType) {
try {
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
List<URIRegisterDTO> registerDTOList = new ArrayList<>();
healthyInstances.forEach(healthyInstance -> {
if (contextPath.equals(healthyInstance.getMetadata().get("contextPath"))) {
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
registerDTOList.add(uriRegisterDTO);
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 本地緩存沒有的 配置信息(接口元數(shù)據(jù))才會進行訂閱
if (!metadataConfigCache.contains(serviceConfigName)) {
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
}
}
});
if (!RPC_URI_TYPE_SET.contains(rpcType)) {
return;
}
if (registerDTOList.isEmpty()) {
URIRegisterDTO uriRegisterDTO = URIRegisterDTO.builder()
.contextPath(Constants.PATH_SEPARATOR + contextPath)
.rpcType(rpcType.getName()).build();
registerDTOList.add(uriRegisterDTO);
}
// 注冊服務實例的 ip port
publishRegisterURI(registerDTOList);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
private void publishRegisterURI(final List<URIRegisterDTO> registerDTOList) {
LOGGER.info("publish uri: {}", registerDTOList);
publisher.publish(registerDTOList);
}
private String readData(final String configName) {
try {
return configService.getConfig(configName, defaultGroup, 5000);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
}
上面關于業(yè)務服務的實例信息,接口元數(shù)據(jù)如何同步考润,通過查看其中一個nacos的實現(xiàn)狭园,看到了業(yè)務服務如何通過shenyu-client如果注冊到nacos,而shenyu-admin如何從nacos接收到數(shù)據(jù)
然后我們看看 shenyu-admin 與 shenyu-bootstrap如何同步這些數(shù)據(jù)糊治,我們主要從 websocket來看唱矛,主角是SyncDataService
如果使用spring-boot,我們引入了對應的jar包俊戳,這里實際直接通過spring.factories & 配置方式注入
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
// 這里通過配置注入
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
LOGGER.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
可以看到和之前client的區(qū)別揖赴,client的注入是根據(jù)配置針對性注入一個實現(xiàn)(當然其實也可以支持多實現(xiàn),但邏輯上沒有必要)也就是通過@Bean注入一個接口的實現(xiàn)抑胎,而數(shù)據(jù)同步的注入是分別在自己的模塊通過 spring.factories & 配置注入燥滑,通過配置是可以同時存在多個數(shù)據(jù)同步方式的。
同時shenyu抽象出來需要同步數(shù)據(jù)的幾個維度阿逃,包括AuthDataSubscriber權限數(shù)據(jù)铭拧,MetaDataSubscriber接口元數(shù)據(jù),PluginDataSubscriber插件數(shù)據(jù)
上述這些數(shù)據(jù)需要同步恃锉,如果分別是這些interface的方法入口搀菩,上述接口是抽象出來給shenyu-bootstrap使用,實現(xiàn)都是通過apache-shenyu的插件架構實現(xiàn)破托,而數(shù)據(jù)變化的生產者是通過DataChangedListener&DataChangedInit接口抽象在admin端進行發(fā)送肪跋,而bootstrap通過它進行訂閱感知數(shù)據(jù)變化后的操作。而這些數(shù)據(jù)變化的消費者在對應數(shù)據(jù)同步的子類模塊中如下
在shenyu-admin有數(shù)據(jù)同步的包
核心處理接口為 DataHandler#handle(String json, String eventType)方法土砂,AbstractDataHandler為基礎實現(xiàn)州既,分別對應元數(shù)據(jù),插件萝映,rule規(guī)則吴叶,selectorData,權限有自己的實現(xiàn)序臂,業(yè)務處理的入口類為WebsocketDataHandler
public class WebsocketDataHandler {
// 通過枚舉 + map 消除 if else 或者 switch
private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
/**
* Instantiates a new Websocket data handler.
*
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
/**
* Executor.
*
* @param type the type
* @param json the json
* @param eventType the event type
*/
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
ENUM_MAP.get(type).handle(json, eventType);
}
}
ShenyuWebsocketClient作為websocket協(xié)議和內部業(yè)務處理的適配器
public final class ShenyuWebsocketClient extends WebSocketClient {
/**
* logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);
private volatile boolean alreadySync = Boolean.FALSE;
private final WebsocketDataHandler websocketDataHandler;
private final Timer timer;
private TimerTask timerTask;
/**
* Instantiates a new shenyu websocket client.
*
* @param serverUri the server uri
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers
) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.timer = WheelTimerFactory.getSharedTimer();
this.connection();
}
private void connection() {
this.connectBlocking();
// 通過 timer線程進行心跳檢測
this.timer.add(timerTask = new AbstractRoundTask(null, TimeUnit.SECONDS.toMillis(10)) {
@Override
public void doRun(final String key, final TimerTask timerTask) {
// 心跳檢測
healthCheck();
}
});
}
@Override
public boolean connectBlocking() {
// websocket協(xié)議的連接方法回調
boolean success = false;
try {
success = super.connectBlocking();
} catch (Exception ignored) {
}
if (success) {
LOG.info("websocket connection server[{}] is successful.....", this.getURI().toString());
} else {
LOG.warn("websocket connection server[{}] is error.....", this.getURI().toString());
}
return success;
}
@Override
public void onOpen(final ServerHandshake serverHandshake) {
// websocket協(xié)議的打開連接 回調蚌卤,代表當前連接正常開啟了
if (!alreadySync) {
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
@Override
public void onMessage(final String result) {
// websocket協(xié)議的接收消息回調
handleResult(result);
}
@Override
public void onClose(final int i, final String s, final boolean b) {
// websocket協(xié)議的關閉連接回調
this.close();
}
@Override
public void onError(final Exception e) {
this.close();
}
@Override
public void close() {
alreadySync = false;
if (this.isOpen()) {
super.close();
}
}
/**
* Now close.
* now close. will cancel the task execution.
*/
public void nowClose() {
this.close();
timerTask.cancel();
}
// 看來是shenyu-bootstrap側進行主動通信,如果是關閉狀態(tài)會一直嘗試連接到shenyu-admin的webscoket,開啟狀態(tài)進行心跳檢測
private void healthCheck() {
try {
if (!this.isOpen()) {
this.reconnectBlocking();
} else {
this.sendPing();
LOG.debug("websocket send to [{}] ping message successful", this.getURI().toString());
}
} catch (Exception e) {
LOG.error("websocket connect is error :{}", e.getMessage());
}
}
@SuppressWarnings("ALL")
private void handleResult(final String result) {
LOG.info("handleResult({})", result);
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
// 業(yè)務入口
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
從上述代碼可以看到 shenyu-bootstrap網(wǎng)關通過SyncDataService下沉到對應不同的子實現(xiàn)的模塊后逊彭,通過抽象不同類型的數(shù)據(jù)接口進行操作珍逸。那么這些數(shù)據(jù)變化的事件源頭則是shenyu-admin通過DataChangedInit和DataChangedListenner兩個抽象的接口來操作媳荒。
DataChangedInit 是在服務啟動后初始化邏輯,沒有websocket的子實現(xiàn),其實就是在服務啟動后的一個鉤子,因為nacos仁烹,consul缺脉,etcd哨坪,zookeeper相當于都需要一個中間件服務來同步數(shù)據(jù)界拦,需要每次啟動會進行一次同步,而websocket則是直連锦聊,互相能感知到上下線歹嘹,可以直接在上下線中處理
public abstract class AbstractDataChangedInit implements DataChangedInit {
/**
* SyncDataService, sync all data.
*/
@Resource
private SyncDataService syncDataService;
//通過 CommandLineRunner 服務啟動的鉤子來初始化
@Override
public void run(final String... args) throws Exception {
if (notExist()) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
turn boolean.
*/
protected abstract boolean notExist();
}
而所有數(shù)據(jù)變化的邏輯是通過spring提供的觀察者機制,通過ApplicationEvent發(fā)布事件孔庭,由事件監(jiān)聽者來同步數(shù)據(jù)
分發(fā)事件的類為DataChangedEventDispatcher類
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private final ApplicationContext applicationContext;
// 可以看到也支持多種數(shù)據(jù)同步協(xié)議
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
//不同的數(shù)據(jù)類型進行一次強轉尺上,調用類是保證類型安全的。
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
然后我們來看websocket關于DataChangedListener的實現(xiàn)
全部委托給WebsocketCollector處理
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
WebsocketData<RuleData> configData =
new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
WebsocketData<AppAuthData> configData =
new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
WebsocketData<MetaData> configData =
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
}
我們來看WebsocketCollector,它也是一個websocket客戶端類
// shenyu-admin服務通過當前websocket與 shenyu-bootstrap建立連接圆到,由admin側的ShenyuWebsocketClient類的timer task心跳和嘗試連接
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static final String SESSION_KEY = "sessionKey";
@OnOpen
public void onOpen(final Session session) {
LOG.info("websocket on client[{}] open successful,maxTextMessageBufferSize:{}",
getClientIp(session), session.getMaxTextMessageBufferSize());
SESSION_SET.add(session);
}
private static String getClientIp(final Session session) {
Map<String, Object> userProperties = session.getUserProperties();
if (MapUtils.isEmpty(userProperties)) {
return StringUtils.EMPTY;
}
return Optional.ofNullable(userProperties.get(WebsocketListener.CLIENT_IP_NAME))
.map(Object::toString)
.orElse(StringUtils.EMPTY);
}
/**
* On message.
*
* @param message the message
* @param session the session
*/
@OnMessage
public void onMessage(final String message, final Session session) {
if (!Objects.equals(message, DataEventTypeEnum.MYSELF.name())) {
return;
}
try {
ThreadLocalUtils.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtils.clear();
}
}
@OnClose
public void onClose(final Session session) {
clearSession(session);
LOG.warn("websocket close on client[{}]", getClientIp(session));
}
@OnError
public void onError(final Session session, final Throwable error) {
clearSession(session);
LOG.error("websocket collection on client[{}] error: ", getClientIp(session), error);
}
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isBlank(message)) {
return;
}
if (DataEventTypeEnum.MYSELF == type) {
Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
if (Objects.nonNull(session)) {
sendMessageBySession(session, message);
}
} else {
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
// 發(fā)送同步的數(shù)據(jù)時都是 使用同步方法發(fā)送
private static synchronized void sendMessageBySession(final Session session, final String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOG.error("websocket send result is exception: ", e);
}
}
private void clearSession(final Session session) {
SESSION_SET.remove(session);
ThreadLocalUtils.clear();
}
}
至此上面的發(fā)送和接收邏輯也看到了怎抛,那么總結
注冊邏輯
- 客戶端(業(yè)務后端)ShenyuClientRegisterRepository 上報數(shù)據(jù)
- 服務端(shenyu-admin)ShenyuClientServerRegisterRepository 接收數(shù)據(jù)
- disruptor 削峰解耦注冊中心的數(shù)據(jù) ShenyuClientServerRegisterPublisher
- 如果是元數(shù)據(jù)發(fā)布 spring的event同步到shenyu-bootstrap
(因為本文只研究了nacos,springcloud協(xié)議的網(wǎng)關的后端實例列表都是通過springCloud中l(wèi)oadbalancer邏輯實現(xiàn)芽淡,ribbon通過一個定時線程從注冊中心拉取服務實例列表)其他非nacos未了解
數(shù)據(jù)同步
- 啟動時利用DataChangedInit從注冊中心同步全量數(shù)據(jù)(websocket除外马绝,websocket直連,可以直接感知到連接和斷開)
- 通過DataChangedEventDispatcher利用spring的applicationEvent監(jiān)聽到數(shù)據(jù)變化事件
- 調用DataChangedListener 數(shù)據(jù)變化邏輯