apache-shenyu之注冊中心整合與元數(shù)據(jù)同步

(apache-shenyu 2.4.3版本)本文一起學習apache-shyu的設計與實現(xiàn)÷牛看看shenyu如何支持多種注冊中心谦炒,多種數(shù)據(jù)同步協(xié)議的。
apache-shenyu數(shù)據(jù)同步以及注冊邏輯圖

####### 當然通過 shenyu的disruptor應用
了解到所有數(shù)據(jù)同步风喇,實例以及元數(shù)據(jù)注冊的發(fā)送宁改,接收都使用了disruptor解耦并提高了可用性。

本文以 SyncDataService/DataChangedListener:websocket魂莫,以及ShenyuClientRegisterRepository/ShenyuClientServerRegisterRepository:nacos為主線來研究apache-shenyu的注冊中心整合以及元數(shù)據(jù)同步邏輯透且。

注冊中心整合

注冊中心邏輯的模塊

shenyu-register-center 作為注冊中心模塊,主要有三個模塊需要了解

  1. shenyu-register-client:ShenyuClientRegisterRepository,作為后端服務的注冊邏輯封裝秽誊,也就是我們業(yè)務服務會引入這個包的jar根據(jù)配置來選擇consul鲸沮,etcd,http锅论,nacos讼溺,zookeeper目前支持的5種注冊中心。(包括元數(shù)據(jù)最易,uri的注冊)
  2. shenyu-register-client-server:ShenyuClientServerRegisterRepository怒坯,作為業(yè)務服務上報的數(shù)據(jù)接收端,接收上報過來的數(shù)據(jù)然后處理藻懒。包括初始化剔猿,建立監(jiān)聽等,包括consul嬉荆,etcd归敬,nacos,zookeeper的子類供選擇鄙早,http比較特殊并木有這個模塊汪茧,http直接通過shenyu-admin的ShenyuClientHttpRegistryController類提供了兩個post接口用于上報元數(shù)據(jù)及uri
  3. shenyu-register-instance:用于注冊業(yè)務服務實例,shenyu這里的核心類ShenyuInstanceRegisterRepository只有consul限番,zookeeper舱污,etcd三個實現(xiàn)類,在這個模塊使用了spring的WebServerInitializedEvent通過web容器初始化事件觀察者進行實例注冊弥虐,可以提供給shenyu網(wǎng)關調用扩灯,也可以實現(xiàn)負載均衡等邏輯,但是這里為什么只有這三個注冊中心會加這個實例注冊邏輯目前不太了解...
下面來看看代碼
spring-boot入口

入口為ShenyuClientCommonBeanConfiguration類霜瘪。通過spring.factories指定驴剔,然后初始化一個注冊中心策略
ShenyuClientRegisterRepositoryFactory

public final class ShenyuClientRegisterRepositoryFactory {
// 可以看到 是可以支持多注冊中心邏輯
    private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
    public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
        if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通過RegisterType 來初始化 注冊中心子類實現(xiàn)
            ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
            result.init(shenyuRegisterCenterConfig);
            ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
            REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
            return result;
        }
        return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
    }
}

可以看到我們的業(yè)務服務只要引入了apache-shenyu的pom文件,就會通過spring-boot自動注入一個ShenyuClientRegisterRepository通過如下配置類

@Configuration
public class ShenyuClientCommonBeanConfiguration {
    
    /**
     * Register the register repository for http client bean post processor.
     *
     * @param config the config
     * @return the client register repository
     */
    @Bean
    public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
        return ShenyuClientRegisterRepositoryFactory.newInstance(config);
    }
    
    /**
     * Shenyu Register Center Config.
     *
     * @return the Register Center Config
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    /**
     * Shenyu client config shenyu client config.
     *
     * @return the shenyu client config
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu")
    public ShenyuClientConfig shenyuClientConfig() {
        return new ShenyuClientConfig();
    }
}
核心api接口粥庄,ShenyuClientRegisterRepository
@SPI
public interface ShenyuClientRegisterRepository {

    /**
     * Init.
     * 初始化業(yè)務服務的注冊客戶端
     * @param config the config
     */
    default void init(ShenyuRegisterCenterConfig config) {
    }
    
    /**
     * Persist metadata.
     * 注冊controller層接口的入口
     * @param metadata metadata
     */
    void persistInterface(MetaDataRegisterDTO metadata);
    
    /**
     * Persist uri.
     * 注冊當前業(yè)務服務的ip及端口信息
     * @param registerDTO the register dto
     */
    default void persistURI(URIRegisterDTO registerDTO) {
    }
    
    /**
     * Close.
     */
    default void close() {
// 關閉
    }
}

我們只看nacos的實現(xiàn)


nacos的客戶端注冊
@Join
public class NacosClientRegisterRepository implements ShenyuClientRegisterRepository {

    private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientRegisterRepository.class);
//用于獲取nacos name space 配置的key
    private static final String NAMESPACE = "nacosNameSpace";
// 用于傳遞metadata 數(shù)據(jù)的key
    private static final String URI_META_DATA = "uriMetadata";
    // nacos 暴露出來的 操作configServer的接口
    private ConfigService configService;
        // nacos 暴露出來的 操作namingServer的接口
    private NamingService namingService;
// 用于元數(shù)據(jù)的緩存,我們的業(yè)務服務會通過nacos的configServer上報關于當前服務的接口的元數(shù)據(jù)豺妓,通過當前類緩存惜互,每次都會全量上報
    private final ConcurrentLinkedQueue<String> metadataCache = new ConcurrentLinkedQueue<>();

    public NacosClientRegisterRepository() { }

    public NacosClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
        init(config);
    }

    @Override
    public void init(final ShenyuRegisterCenterConfig config) {
// 去和nacos建立
        String serverAddr = config.getServerLists();
        Properties properties = config.getProps();
        Properties nacosProperties = new Properties();
        nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);

        nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty(NAMESPACE));
        // the nacos authentication username
        nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
        // the nacos authentication password
        nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
        // access key for namespace
        nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
        // secret key for namespace
        nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
        try {
            this.configService = ConfigFactory.createConfigService(nacosProperties);
            this.namingService = NamingFactory.createNamingService(nacosProperties);
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }

    @Override
    public void close() {
        try {
            configService.shutDown();
            namingService.shutDown();
        } catch (NacosException e) {
            LOGGER.error("NacosClientRegisterRepository close error!", e);
        }
    }

    @Override
    public void persistInterface(final MetaDataRegisterDTO metadata) {
        String rpcType = metadata.getRpcType();
        String contextPath = ContextPathUtils.buildRealNode(metadata.getContextPath(), metadata.getAppName());
// 注冊接口元數(shù)據(jù)
        registerConfig(rpcType, contextPath, metadata);
    }
    
    /**
     * Persist uri.
     * 注冊 ip以及端口
     * @param registerDTO the register dto
     */
    @Override
    public void persistURI(final URIRegisterDTO registerDTO) {
        String rpcType = registerDTO.getRpcType();
        String contextPath = ContextPathUtils.buildRealNode(registerDTO.getContextPath(), registerDTO.getAppName());
        String host = registerDTO.getHost();
        int port = registerDTO.getPort();
        registerService(rpcType, contextPath, host, port, registerDTO);
    }
// 這是一個 同步方法
    private synchronized void registerService(final String rpcType,
                                              final String contextPath,
                                              final String host,
                                              final int port,
                                              final URIRegisterDTO registerDTO) {
// 注冊當前實例的 ip以及 端口
        Instance instance = new Instance();
        instance.setEphemeral(true);
        instance.setIp(host);
        instance.setPort(port);
        Map<String, String> metadataMap = new HashMap<>();
        metadataMap.put(Constants.CONTEXT_PATH, contextPath);
        metadataMap.put(URI_META_DATA, GsonUtils.getInstance().toJson(registerDTO));
        instance.setMetadata(metadataMap);

        String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType);
        try {
// 通過nacos的 namingServer注冊當前服務實例
            namingService.registerInstance(serviceName, instance);
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
        LOGGER.info("register service uri success: {}", serviceName);
    }

    private synchronized void registerConfig(final String rpcType,
                                             final String contextPath,
                                             final MetaDataRegisterDTO metadata) {
        metadataCache.add(GsonUtils.getInstance().toJson(metadata));
        String configName = RegisterPathConstants.buildServiceConfigPath(rpcType, contextPath);
        try {
            final String defaultGroup = NacosPathConstants.GROUP;
// 通過configServer注冊 接口元數(shù)據(jù)
            if (configService.publishConfig(configName, defaultGroup, GsonUtils.getInstance().toJson(metadataCache))) {
                LOGGER.info("register metadata success: {}", metadata.getRuleName());
            } else {
                throw new ShenyuException("register metadata fail , please check ");
            }
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }
}

然后我們來看一看 shenyu-admin側接收這些數(shù)據(jù)的邏輯,核心類-> ShenyuClientServerRegisterRepository

先來看位于shenyu-admin模塊中的配置類

@Configuration
public class RegisterCenterConfiguration {

    /**
     * Shenyu register center config shenyu register center config.
     *
     * @return the shenyu register center config
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu.register")
    public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
        return new ShenyuRegisterCenterConfig();
    }
    
    @Bean(destroyMethod = "close")
    public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
                                                                               final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
        String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 根據(jù)配置獲取到 spi對應的實現(xiàn)
        ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
// 這里是從注冊中心接收到數(shù)據(jù)后琳拭,沒有立即調用shenyu內部邏輯训堆,而是通過一層disruptor解耦,并提供了高可用的一個削峰
        RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
        Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
        publisher.start(registerServiceMap);
        registerRepository.init(publisher, shenyuRegisterCenterConfig);
        return registerRepository;
    }
}

然后來看NacosClientServerRegisterRepository

@Join
public class NacosClientServerRegisterRepository implements ShenyuClientServerRegisterRepository {

    private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientServerRegisterRepository.class);
// 被注冊的業(yè)務服務白嘁,目前支持RpcTypeEnum.GRPC, RpcTypeEnum.HTTP, RpcTypeEnum.TARS, RpcTypeEnum.SPRING_CLOUD, RpcTypeEnum.DUBBO這些協(xié)議的后端
    private static final List<RpcTypeEnum> RPC_URI_TYPE_SET = RpcTypeEnum.acquireSupportURIs();
// nacos 默認的分組
    private final String defaultGroup = NacosPathConstants.GROUP;
// nacos 暴露出來的 configServer操作接口
    private ConfigService configService;
// nacos 暴露出來的 namingServer操作接口
    private NamingService namingService;
// 接收到的數(shù)據(jù)通過disruptor解耦坑鱼,并可以削峰
    private ShenyuClientServerRegisterPublisher publisher;
// 通過跳表實現(xiàn)的線程安全的可排序的容器,用于接收到各種業(yè)務服務的實例的uri
    private final ConcurrentSkipListSet<String> metadataConfigCache = new ConcurrentSkipListSet<>();
// key(不同的后端接口協(xié)議,例如grpc,springcloud等) -> 對應協(xié)議的所有后端服務實例的 uri
    private final ConcurrentMap<String, ConcurrentSkipListSet<String>> uriServiceCache = new ConcurrentHashMap<>();

    @Override
    public void close() {
        publisher.close();
    }

    @Override
    public void init(final ShenyuClientServerRegisterPublisher publisher,
                     final ShenyuRegisterCenterConfig config) {
        this.publisher = publisher;
        String serverAddr = config.getServerLists();
        Properties properties = config.getProps();
        Properties nacosProperties = new Properties();
        nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty("nacosNameSpace"));
        // the nacos authentication username
        nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
        // the nacos authentication password
        nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
        // access key for namespace
        nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
        // secret key for namespace
        nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));

        try {
// 和nacos建立連接
            this.configService = ConfigFactory.createConfigService(nacosProperties);
            this.namingService = NamingFactory.createNamingService(nacosProperties);
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
// 對每一個 后端實例進行訂閱鲁沥,并訂閱對應實例的config(config是apache-shenyu自己用來同步業(yè)務服務接口元數(shù)據(jù)用的)
        subscribe();
    }

    private void subscribe() {
// 每種協(xié)議都訂閱一遍        RpcTypeEnum.acquireSupportMetadatas().forEach(this::subscribeRpcTypeService);
    }

    private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
        final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
        try {
            Map<String, List<URIRegisterDTO>> services = new HashMap<>();
// 目前可以從nacos 的namingServer查到的可用實例
            List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
            healthyInstances.forEach(healthyInstance -> {
                String contextPath = healthyInstance.getMetadata().get("contextPath");
                String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 定于每個實例的 config信息呼股,shenyu用于同步接口元數(shù)據(jù)用
                subscribeMetadata(serviceConfigName);
                metadataConfigCache.add(serviceConfigName);
                String metadata = healthyInstance.getMetadata().get("uriMetadata");
                URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
// 收集起來
                services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
// 放入緩存
                uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
            });
// 如果是apache-shenyu支持的接口協(xié)議,則通過disruptor發(fā)布真正的接收到nacos信息的事件
            if (RPC_URI_TYPE_SET.contains(rpcType)) {
                services.values().forEach(this::publishRegisterURI);
            }
            LOGGER.info("subscribe uri : {}", serviceName);
// 通過nacos的 namingService提供的監(jiān)聽器(長輪詢)來監(jiān)聽業(yè)務服務實例的上下線變化,再進行刷新,如果變化了画恰,會刷新本地緩存并重新調用當前方法彭谁,重新查詢一次服務實例,和config中的元數(shù)據(jù)進行刷新
            namingService.subscribe(serviceName, event -> {
                if (event instanceof NamingEvent) {
                    List<Instance> instances = ((NamingEvent) event).getInstances();
                    instances.forEach(instance -> {
                        String contextPath = instance.getMetadata().get("contextPath");
                        uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
                    });
                    refreshURIService(rpcType, serviceName);
                }
            });
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }
// 元數(shù)據(jù)訂閱
    private void subscribeMetadata(final String serviceConfigName) {
// 注冊 接口元數(shù)據(jù)允扇,從nacos的configService讀取
        registerMetadata(readData(serviceConfigName));
        LOGGER.info("subscribe metadata: {}", serviceConfigName);
        try {
            configService.addListener(serviceConfigName, defaultGroup, new Listener() {

                @Override
                public Executor getExecutor() {
                    return null;
                }
// 通過configService提供的監(jiān)聽 如果config有變化則進行注冊
                @Override
                public void receiveConfigInfo(final String config) {
                    registerMetadata(config);
                }
            });
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }

    @SuppressWarnings("unchecked")
    private void registerMetadata(final String metadataConfig) {
        List<String> metadataList = GsonUtils.getInstance().fromJson(metadataConfig, List.class);
        metadataList.forEach(this::publishMetadata);
    }

    private void publishMetadata(final String data) {
        LOGGER.info("publish metadata: {}", data);
        publisher.publish(Lists.newArrayList(GsonUtils.getInstance().fromJson(data, MetaDataRegisterDTO.class)));
    }

    private void refreshURIService(final RpcTypeEnum rpcType, final String serviceName) {
        Optional.ofNullable(uriServiceCache.get(serviceName)).ifPresent(services -> services.forEach(contextPath -> registerURI(contextPath, serviceName, rpcType)));
    }
// 注冊服務實例的邏輯缠局,和注冊對應協(xié)議的服務實例邏輯有點像
    private void registerURI(final String contextPath, final String serviceName, final RpcTypeEnum rpcType) {
        try {
            List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
            List<URIRegisterDTO> registerDTOList = new ArrayList<>();
            healthyInstances.forEach(healthyInstance -> {
                if (contextPath.equals(healthyInstance.getMetadata().get("contextPath"))) {
                    String metadata = healthyInstance.getMetadata().get("uriMetadata");
                    URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
                    registerDTOList.add(uriRegisterDTO);

                    String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 本地緩存沒有的 配置信息(接口元數(shù)據(jù))才會進行訂閱
                    if (!metadataConfigCache.contains(serviceConfigName)) {
                        subscribeMetadata(serviceConfigName);
                        metadataConfigCache.add(serviceConfigName);
                    }
                }
            });
            if (!RPC_URI_TYPE_SET.contains(rpcType)) {
                return;
            }
            if (registerDTOList.isEmpty()) {
                URIRegisterDTO uriRegisterDTO = URIRegisterDTO.builder()
                        .contextPath(Constants.PATH_SEPARATOR + contextPath)
                        .rpcType(rpcType.getName()).build();
                registerDTOList.add(uriRegisterDTO);
            }
// 注冊服務實例的 ip port
            publishRegisterURI(registerDTOList);
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }

    private void publishRegisterURI(final List<URIRegisterDTO> registerDTOList) {
        LOGGER.info("publish uri: {}", registerDTOList);
        publisher.publish(registerDTOList);
    }

    private String readData(final String configName) {
        try {
            return configService.getConfig(configName, defaultGroup, 5000);
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }
}

上面關于業(yè)務服務的實例信息,接口元數(shù)據(jù)如何同步考润,通過查看其中一個nacos的實現(xiàn)狭园,看到了業(yè)務服務如何通過shenyu-client如果注冊到nacos,而shenyu-admin如何從nacos接收到數(shù)據(jù)

然后我們看看 shenyu-admin 與 shenyu-bootstrap如何同步這些數(shù)據(jù)糊治,我們主要從 websocket來看唱矛,主角是SyncDataService
spring-boot

如果使用spring-boot,我們引入了對應的jar包俊戳,這里實際直接通過spring.factories & 配置方式注入

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
// 這里通過配置注入
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);

    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        LOGGER.info("you use websocket sync shenyu data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    /**
     * Config websocket config.
     *
     * @return the websocket config
     */
    @Bean
    @ConfigurationProperties(prefix = "shenyu.sync.websocket")
    public WebsocketConfig websocketConfig() {
        return new WebsocketConfig();
    }

}

可以看到和之前client的區(qū)別揖赴,client的注入是根據(jù)配置針對性注入一個實現(xiàn)(當然其實也可以支持多實現(xiàn),但邏輯上沒有必要)也就是通過@Bean注入一個接口的實現(xiàn)抑胎,而數(shù)據(jù)同步的注入是分別在自己的模塊通過 spring.factories & 配置注入燥滑,通過配置是可以同時存在多個數(shù)據(jù)同步方式的。

同時shenyu抽象出來需要同步數(shù)據(jù)的幾個維度阿逃,包括AuthDataSubscriber權限數(shù)據(jù)铭拧,MetaDataSubscriber接口元數(shù)據(jù),PluginDataSubscriber插件數(shù)據(jù)
所有抽象的維度

上述這些數(shù)據(jù)需要同步恃锉,如果分別是這些interface的方法入口搀菩,上述接口是抽象出來給shenyu-bootstrap使用,實現(xiàn)都是通過apache-shenyu的插件架構實現(xiàn)破托,而數(shù)據(jù)變化的生產者是通過DataChangedListener&DataChangedInit接口抽象在admin端進行發(fā)送肪跋,而bootstrap通過它進行訂閱感知數(shù)據(jù)變化后的操作。而這些數(shù)據(jù)變化的消費者在對應數(shù)據(jù)同步的子類模塊中如下


websocket的模塊中的類
在shenyu-admin有數(shù)據(jù)同步的包
數(shù)據(jù)同步的所有實現(xiàn)以及左邊的包
核心處理接口為 DataHandler#handle(String json, String eventType)方法土砂,AbstractDataHandler為基礎實現(xiàn)州既,分別對應元數(shù)據(jù),插件萝映,rule規(guī)則吴叶,selectorData,權限有自己的實現(xiàn)序臂,業(yè)務處理的入口類為WebsocketDataHandler
public class WebsocketDataHandler {
// 通過枚舉 + map 消除 if else 或者 switch
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

    /**
     * Instantiates a new Websocket data handler.
     *
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers  the meta data subscribers
     * @param authDataSubscribers  the auth data subscribers
     */
    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers,
                                final List<AuthDataSubscriber> authDataSubscribers) {
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
    }

    /**
     * Executor.
     *
     * @param type      the type
     * @param json      the json
     * @param eventType the event type
     */
    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        ENUM_MAP.get(type).handle(json, eventType);
    }
}
ShenyuWebsocketClient作為websocket協(xié)議和內部業(yè)務處理的適配器
public final class ShenyuWebsocketClient extends WebSocketClient {
    
    /**
     * logger.
     */
    private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);
    
    private volatile boolean alreadySync = Boolean.FALSE;
    
    private final WebsocketDataHandler websocketDataHandler;
    
    private final Timer timer;
    
    private TimerTask timerTask;
    
    /**
     * Instantiates a new shenyu websocket client.
     *
     * @param serverUri            the server uri
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers  the meta data subscribers
     * @param authDataSubscribers  the auth data subscribers
     */
    public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
                                 final List<MetaDataSubscriber> metaDataSubscribers,
                                 final List<AuthDataSubscriber> authDataSubscribers
    ) {
        super(serverUri);
        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
        this.timer = WheelTimerFactory.getSharedTimer();
        this.connection();
    }
    
    private void connection() {
        this.connectBlocking();
        // 通過 timer線程進行心跳檢測
        this.timer.add(timerTask = new AbstractRoundTask(null, TimeUnit.SECONDS.toMillis(10)) {
            @Override
            public void doRun(final String key, final TimerTask timerTask) {
                // 心跳檢測
                healthCheck();
            }
        });
    }
    
    @Override
    public boolean connectBlocking() {
        // websocket協(xié)議的連接方法回調
        boolean success = false;
        try {
            success = super.connectBlocking();
        } catch (Exception ignored) {
        }
        if (success) {
            LOG.info("websocket connection server[{}] is successful.....", this.getURI().toString());
        } else {
            LOG.warn("websocket connection server[{}] is error.....", this.getURI().toString());
        }
        return success;
    }
    
    @Override
    public void onOpen(final ServerHandshake serverHandshake) {
        // websocket協(xié)議的打開連接 回調蚌卤,代表當前連接正常開啟了
        if (!alreadySync) {
            send(DataEventTypeEnum.MYSELF.name());
            alreadySync = true;
        }
    }
    
    @Override
    public void onMessage(final String result) {
        // websocket協(xié)議的接收消息回調
        handleResult(result);
    }
    
    @Override
    public void onClose(final int i, final String s, final boolean b) {
        // websocket協(xié)議的關閉連接回調
        this.close();
    }
    
    @Override
    public void onError(final Exception e) {
        this.close();
    }
    
    @Override
    public void close() {
        alreadySync = false;
        if (this.isOpen()) {
            super.close();
        }
    }
    
    /**
     * Now close.
     * now close. will cancel the task execution.
     */
    public void nowClose() {
        this.close();
        timerTask.cancel();
    }
    // 看來是shenyu-bootstrap側進行主動通信,如果是關閉狀態(tài)會一直嘗試連接到shenyu-admin的webscoket,開啟狀態(tài)進行心跳檢測
    private void healthCheck() {
        try {
            if (!this.isOpen()) {
                this.reconnectBlocking();
            } else {
                this.sendPing();
                LOG.debug("websocket send to [{}] ping message successful", this.getURI().toString());
            }
        } catch (Exception e) {
            LOG.error("websocket connect is error :{}", e.getMessage());
        }
    }
    
    @SuppressWarnings("ALL")
    private void handleResult(final String result) {
        LOG.info("handleResult({})", result);
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        String eventType = websocketData.getEventType();
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        // 業(yè)務入口
        websocketDataHandler.executor(groupEnum, json, eventType);
    }
}

從上述代碼可以看到 shenyu-bootstrap網(wǎng)關通過SyncDataService下沉到對應不同的子實現(xiàn)的模塊后逊彭,通過抽象不同類型的數(shù)據(jù)接口進行操作珍逸。那么這些數(shù)據(jù)變化的事件源頭則是shenyu-admin通過DataChangedInit和DataChangedListenner兩個抽象的接口來操作媳荒。

DataChangedInit 是在服務啟動后初始化邏輯,沒有websocket的子實現(xiàn),其實就是在服務啟動后的一個鉤子,因為nacos仁烹,consul缺脉,etcd哨坪,zookeeper相當于都需要一個中間件服務來同步數(shù)據(jù)界拦,需要每次啟動會進行一次同步,而websocket則是直連锦聊,互相能感知到上下線歹嘹,可以直接在上下線中處理

public abstract class AbstractDataChangedInit implements DataChangedInit {

    /**
     * SyncDataService, sync all data.
     */
    @Resource
    private SyncDataService syncDataService;
//通過 CommandLineRunner 服務啟動的鉤子來初始化
    @Override
    public void run(final String... args) throws Exception {
        if (notExist()) {
            syncDataService.syncAll(DataEventTypeEnum.REFRESH);
        }
    }
turn boolean.
     */
    protected abstract boolean notExist();
}

而所有數(shù)據(jù)變化的邏輯是通過spring提供的觀察者機制,通過ApplicationEvent發(fā)布事件孔庭,由事件監(jiān)聽者來同步數(shù)據(jù)
分發(fā)事件的類為DataChangedEventDispatcher類

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private final ApplicationContext applicationContext;
// 可以看到也支持多種數(shù)據(jù)同步協(xié)議
    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
//不同的數(shù)據(jù)類型進行一次強轉尺上,調用類是保證類型安全的。
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }
}

然后我們來看websocket關于DataChangedListener的實現(xiàn)
全部委托給WebsocketCollector處理

public class WebsocketDataChangedListener implements DataChangedListener {

    @Override
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }

    @Override
    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
        WebsocketData<SelectorData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }

    @Override
    public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
        WebsocketData<RuleData> configData =
                new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

    @Override
    public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
        WebsocketData<AppAuthData> configData =
                new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

    @Override
    public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
        WebsocketData<MetaData> configData =
                new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }
}

我們來看WebsocketCollector,它也是一個websocket客戶端類

// shenyu-admin服務通過當前websocket與 shenyu-bootstrap建立連接圆到,由admin側的ShenyuWebsocketClient類的timer task心跳和嘗試連接
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
    
    private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);
    
    private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
    
    private static final String SESSION_KEY = "sessionKey";
    
    @OnOpen
    public void onOpen(final Session session) {
        LOG.info("websocket on client[{}] open successful,maxTextMessageBufferSize:{}",
                getClientIp(session), session.getMaxTextMessageBufferSize());
        SESSION_SET.add(session);
    }
    
    private static String getClientIp(final Session session) {
        Map<String, Object> userProperties = session.getUserProperties();
        if (MapUtils.isEmpty(userProperties)) {
            return StringUtils.EMPTY;
        }
        
        return Optional.ofNullable(userProperties.get(WebsocketListener.CLIENT_IP_NAME))
                .map(Object::toString)
                .orElse(StringUtils.EMPTY);
    }
    
    /**
     * On message.
     *
     * @param message the message
     * @param session the session
     */
    @OnMessage
    public void onMessage(final String message, final Session session) {
        if (!Objects.equals(message, DataEventTypeEnum.MYSELF.name())) {
            return;
        }
        
        try {
            ThreadLocalUtils.put(SESSION_KEY, session);
            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
        } finally {
            ThreadLocalUtils.clear();
        }

    }
    

    @OnClose
    public void onClose(final Session session) {
        clearSession(session);
        LOG.warn("websocket close on client[{}]", getClientIp(session));
    }
    

    @OnError
    public void onError(final Session session, final Throwable error) {
        clearSession(session);
        LOG.error("websocket collection on client[{}] error: ", getClientIp(session), error);
    }
    
    /**
     * Send.
     *
     * @param message the message
     * @param type    the type
     */
    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        
        if (DataEventTypeEnum.MYSELF == type) {
            Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
            if (Objects.nonNull(session)) {
                sendMessageBySession(session, message);
            }
        } else {
            SESSION_SET.forEach(session -> sendMessageBySession(session, message));
        }
        
    }
    // 發(fā)送同步的數(shù)據(jù)時都是 使用同步方法發(fā)送
    private static synchronized void sendMessageBySession(final Session session, final String message) {
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            LOG.error("websocket send result is exception: ", e);
        }
    }
    
    private void clearSession(final Session session) {
        SESSION_SET.remove(session);
        ThreadLocalUtils.clear();
    }
}

至此上面的發(fā)送和接收邏輯也看到了怎抛,那么總結

注冊邏輯

  1. 客戶端(業(yè)務后端)ShenyuClientRegisterRepository 上報數(shù)據(jù)
  2. 服務端(shenyu-admin)ShenyuClientServerRegisterRepository 接收數(shù)據(jù)
  3. disruptor 削峰解耦注冊中心的數(shù)據(jù) ShenyuClientServerRegisterPublisher
  4. 如果是元數(shù)據(jù)發(fā)布 spring的event同步到shenyu-bootstrap
    (因為本文只研究了nacos,springcloud協(xié)議的網(wǎng)關的后端實例列表都是通過springCloud中l(wèi)oadbalancer邏輯實現(xiàn)芽淡,ribbon通過一個定時線程從注冊中心拉取服務實例列表)其他非nacos未了解

數(shù)據(jù)同步

  1. 啟動時利用DataChangedInit從注冊中心同步全量數(shù)據(jù)(websocket除外马绝,websocket直連,可以直接感知到連接和斷開)
  2. 通過DataChangedEventDispatcher利用spring的applicationEvent監(jiān)聽到數(shù)據(jù)變化事件
  3. 調用DataChangedListener 數(shù)據(jù)變化邏輯
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末挣菲,一起剝皮案震驚了整個濱河市富稻,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌白胀,老刑警劉巖椭赋,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異或杠,居然都是意外死亡哪怔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門向抢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來认境,“玉大人,你說我怎么就攤上這事笋额。” “怎么了篷扩?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵兄猩,是天一觀的道長。 經常有香客問我,道長枢冤,這世上最難降的妖魔是什么鸠姨? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮淹真,結果婚禮上讶迁,老公的妹妹穿的比我還像新娘。我一直安慰自己核蘸,他們只是感情好巍糯,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著客扎,像睡著了一般祟峦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上徙鱼,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天宅楞,我揣著相機與錄音,去河邊找鬼袱吆。 笑死厌衙,一個胖子當著我的面吹牛,可吹牛的內容都是我干的绞绒。 我是一名探鬼主播婶希,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼处铛!你這毒婦竟也來了饲趋?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤撤蟆,失蹤者是張志新(化名)和其女友劉穎奕塑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體家肯,經...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡龄砰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了讨衣。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片换棚。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖反镇,靈堂內的尸體忽然破棺而出固蚤,到底是詐尸還是另有隱情,我是刑警寧澤歹茶,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布夕玩,位于F島的核電站你弦,受9級特大地震影響,放射性物質發(fā)生泄漏燎孟。R本人自食惡果不足惜禽作,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望揩页。 院中可真熱鬧旷偿,春花似錦、人聲如沸爆侣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽累提。三九已至尘喝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間斋陪,已是汗流浹背朽褪。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留无虚,地道東北人缔赠。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像友题,于是被迫代替她去往敵國和親嗤堰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內容