引子
從官網(wǎng) clone 代碼下來后菜谣,依次啟動(dòng) soul-admin饱溢、soul-bootstrap 和 soul-examples-http 模塊异逐,啟動(dòng)成功后查看 soul-admin 會(huì)發(fā)現(xiàn) soul-examples-http 模塊里的代理配置和路由規(guī)則已經(jīng)自動(dòng)同步到了 soul-admin 上,通過 postman 調(diào)用發(fā)現(xiàn)配置也自動(dòng)生效蜈膨。
soul-examples-http 核心配置
soul:
http:
adminUrl: http://localhost:9095
port: 8188
contextPath: /http
appName: http
full: false
soul-examples-http 主要接口
soul-admin 自動(dòng)更新到的配置
devide 配置全局
選擇器詳情
可以看到選擇器已經(jīng)讀取了 soul-examples-http 配置文件中的信息悼沿,并且 soul-admin 自動(dòng)獲取到了soul-examples-http 服務(wù)的 IP 和端口信息赖瞒。
postman 驗(yàn)證
直接請(qǐng)求 soul-examples-http 服務(wù)
通過網(wǎng)關(guān)轉(zhuǎn)發(fā)到 soul-examples-http 服務(wù)
結(jié)果符合預(yù)期干跛,證明 soul-examples-http 的配置自動(dòng)同步給了 soul-admin 和 soul-bootstrap公你,接下來就研究一下 soul 是怎么實(shí)現(xiàn)這一塊邏輯的志电。
客戶端向 soul-admin 同步
配置分析
通過查看 soul-examples-http 的配置文件,可以看到有配置 soul-admin 的地址
soul:
http:
adminUrl: http://localhost:9095
跟進(jìn)這個(gè)配置映射 Java 實(shí)體拉讯,找到了soul-client-springmvc 下的SoulSpringMvcConfig類,這個(gè)類定值了配置相關(guān)的選擇
public class SoulSpringMvcConfig {
/**
* soul-admin 地址
*/
private String adminUrl;
/**
* 代理服務(wù)上下文地址
*/
private String contextPath;
/**
* 服務(wù)名
*/
private String appName;
/**
* 是否全局代理
*/
private boolean full;
/**
* 服務(wù) host 地址
*/
private String host;
/**
* 服務(wù)端口
*/
private Integer port;
}
客戶端源碼解析
查看配置類的調(diào)用方鳖藕,追蹤到兩個(gè)類魔慷,分別是 ContextRegisterListener 和 SpringMvcClientBeanPostProcessor,分別在項(xiàng)目啟動(dòng)完成后執(zhí)行和 bean 掃描后執(zhí)行著恩。具體代碼和解析如下:
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
private final AtomicBoolean registered = new AtomicBoolean(false);
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
/**
* Instantiates a new Context register listener.
*
* @param soulSpringMvcConfig the soul spring mvc config
*/
public ContextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
}
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
if (!registered.compareAndSet(false, true)) {
return;
}
if (soulSpringMvcConfig.isFull()) {
RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP);
}
}
private String buildJsonParams() {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + "/**";
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(path)
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
核心代碼邏輯是當(dāng)配置為全局代理的時(shí)候院尔,調(diào)用 soul-admin 的/soul-client/springmvc-register 接口,將配置信息同步給 soul-admin喉誊。
package org.dromara.soul.client.springmvc.init;
/**
* The type Soul spring mvc client bean post processor.
*
* @author xiaoyu(Myth)
*/
@Slf4j
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
private final ThreadPoolExecutor executorService;
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
if (soulSpringMvcConfig.isFull()) {
return bean;
}
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
if (controller != null || restController != null || requestMapping != null) {
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
private String buildJsonParams(final SoulSpringMvcClient soulSpringMvcClient, final String prePath) {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + prePath + soulSpringMvcClient.path();
String desc = soulSpringMvcClient.desc();
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
String configRuleName = soulSpringMvcClient.ruleName();
String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.pathDesc(desc)
.rpcType(soulSpringMvcClient.rpcType())
.enabled(soulSpringMvcClient.enabled())
.ruleName(ruleName)
.registerMetaData(soulSpringMvcClient.registerMetaData())
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
代碼核心邏輯是
- 校驗(yàn)代碼是否有 @Controller 或者 @RestController 或者 @RequestMapping 注解
- 如果有邀摆,則校驗(yàn)是否有 @SoulSpringMvcClient 注解
- 檢查路徑配置是否有 1 個(gè)以上的 * 號(hào),如果有伍茄,則直接發(fā)送配置信息到 soul-admin
- 如果不包含 1 個(gè)以上 * 號(hào)栋盹,則遍歷類中每一個(gè)方法,找到有 @SoulSpringMvcClient 注解的方法敷矫,發(fā)送配置到 soul-admin
soul-admin 源碼解析
研究服務(wù)端注冊(cè)接口 /soul-client/springmvc-register 例获,其實(shí)現(xiàn)類為 SoulClientController 汉额。
具體實(shí)現(xiàn)代碼是下面部分
@Override
@Transactional
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
if (dto.isRegisterMetaData()) {
MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
if (Objects.isNull(exist)) {
saveSpringMvcMetaData(dto);
}
}
String selectorId = handlerSpringMvcSelector(dto);
handlerSpringMvcRule(selectorId, dto);
return SoulResultMessage.SUCCESS;
}
- 根據(jù)配置地址查庫校驗(yàn)是否已存在
- 如果不存在,則根據(jù)收到的配置保存元數(shù)據(jù)
- 校驗(yàn) Selector 是否存在榨汤,不存在則保存
- 校驗(yàn) Rule 是否存在蠕搜,不存在則保存
小結(jié)
至此,客戶端向 soul-admin 同步代碼的邏輯就梳理清楚了收壕,其核心配置是 yml 文件中的配置文件和 @SoulSpringMvcClient 注解妓灌,發(fā)送配置實(shí)現(xiàn)的核心代碼為 ContextRegisterListener 類和 SpringMvcClientBeanPostProcessor 類,接收配置核心類為 SoulClientController蜜宪。
soul-admin 向 soul-bootstrap 同步
數(shù)據(jù)同步流程圖
閱讀官方文檔虫埂,發(fā)現(xiàn)數(shù)據(jù)同步流程圖如下
核心依賴
從圖中可以看到,數(shù)據(jù)同步核心為粉色框部分端壳,四種方式的 SPI告丢,然后 soul-gateway 去pull/watch 數(shù)據(jù)同步到本地。
查看 soul-gateway 的依賴损谦,找到依賴模塊為
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use http-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>
<!-- soul data sync start use nacos -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
<version>${project.version}</version>
</dependency>
查看他們的 SPI 定義岖免,代碼位置在下圖
代碼解析
根據(jù) SpringBoot 的 SPI 規(guī)范,查看模塊對(duì)應(yīng)的 spring.factories 描述文件照捡,以 websocket 為例颅湘,配置了同步的配置文件如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
找到其代碼實(shí)現(xiàn)為
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
主要是 new 了 WebsocketSyncDataService 服務(wù),說明核心服務(wù)在 WebsocketSyncDataService 類中栗精。繼續(xù)跟蹤代碼研究具體實(shí)現(xiàn)邏輯闯参。
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
- 讀取配置中的同步地址列表,根據(jù)地址數(shù)量創(chuàng)建定時(shí)調(diào)度連接池
- 每個(gè)同步地址都發(fā)起一個(gè) websocket 連接悲立,核心為放入了3個(gè)訂閱者
- 每個(gè) socket 連接每 30 秒檢測(cè)一次健康度鹿寨,并打印日志
- 線程池的線程發(fā)起一個(gè)每 30 秒觸發(fā)一次的請(qǐng)求,檢查連接是否成功薪夕,并打印日志
很明顯脚草,核心為第二步的3個(gè)訂閱者,這里采用了訂閱者模式原献,發(fā)布者一定是在 soul-admin 中進(jìn)行的馏慨。
數(shù)據(jù)發(fā)布實(shí)現(xiàn)
觀察 soul-admin 代碼,找到如下代碼
四種不同的同步方式姑隅,都是對(duì) DataChangedListener 的不同實(shí)現(xiàn)写隶,發(fā)送的自定義事件為DataChangedEvent。
DataChangedListener 定義了5種響應(yīng)時(shí)間讲仰,分別是 AppAuth 變更慕趴、Plugin 變更、Selector 變更、MetaData 變更和 Rule 變更秩贰。
DataChangedEvent 定義了 2 中枚舉類型霹俺,分別是操作類型(新增、變更等)和數(shù)據(jù)類型(Plugin毒费、Rule等)丙唧。
一次發(fā)送事件的實(shí)例代碼
代碼源于org.dromara.soul.admin.listener.zookeeper.HttpServiceDiscovery:line171
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(selectorData)));
小結(jié)
soul-admin 向 soul-gateway 同步核心原理為 SPI 機(jī)制,SPI 的定義在soul-spring-boot-starter-sync-data-center 模塊下的spring.factories描述文件中觅玻。
SPI 具體實(shí)現(xiàn)在 soul-sync-data-center 模塊下想际,本文以 websocket 方式為例解析了代碼,其它方式同步在具體實(shí)現(xiàn)方式上略有不同溪厘。
網(wǎng)關(guān)獲取數(shù)據(jù)采用發(fā)布訂閱模式胡本,訂閱者為 soul-gateway 網(wǎng)關(guān),發(fā)布者為 soul-admin畸悬,發(fā)布者的實(shí)現(xiàn)邏輯在 org.dromara.soul.admin.listener 包下面侧甫。