零幅疼、前戲
最近在“玩”demo的過程中,就一直好奇昼接。插件信息等元數(shù)據(jù)爽篷,soul是怎么從soul-admin同步到soul-bootstrap中的。
本著大膽瞎猜慢睡,小心求證的心態(tài)逐工,就有今天的關(guān)于websocket這篇文章。
一漂辐、服務(wù)端--soul-admin
- 首先從jar入手泪喊,soul-admin中的只有一個spring-boot-starter-websocket官方j(luò)ar包。
猜測:服務(wù)端實現(xiàn)是在寫在soul-admin項目內(nèi)部
- 通過全文搜索“websocket”髓涯,找到了DataSyncConfiguration
涉及websocket的配置初始化代碼如下:
與yml中的配置項對應(yīng)
- 進入WebsocketCollector中一探究竟
@ServerEndpoint("/websocket")科普帖子:https://blog.csdn.net/weixin_39793752/article/details/80949128
@ServerEndpoint("/websocket")
public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static final String SESSION_KEY = "sessionKey";
/**
* On open.
*
* @param session the session
*/
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
SESSION_SET.add(session);
}
/**
* On message.
*
* @param message the message
* @param session the session
*/
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
try {
ThreadLocalUtil.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtil.clear();
}
}
}
/**
* On close.
*
* @param session the session
*/
@OnClose
public void onClose(final Session session) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
}
/**
* On error.
*
* @param session the session
* @param error the error
*/
@OnError
public void onError(final Session session, final Throwable error) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
log.error("websocket collection error: ", error);
}
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}
@OnOpen:連接建立成功調(diào)用的方法
@OnClose:連接關(guān)閉調(diào)用方法
@OnMessage:收到客戶端消息后調(diào)用的方法
@OnError:發(fā)送錯誤時調(diào)用
- 當(dāng)服務(wù)端收到一條MYSELF類型的消息袒啼,將同步全量元數(shù)據(jù)信息
- admin 會推送一次全量數(shù)據(jù),后續(xù)如果配置數(shù)據(jù)發(fā)生變更則將增量數(shù)據(jù)通過 websocket 主動推送給 soul-web
詳細機制此處暫留坑复凳,后文再續(xù)
二瘤泪、客戶端--soul-bootstrap
從pom文件入手,找到了soul-spring-boot-starter-sync-data-websocket中的WebsocketSyncDataConfiguration類
- 深入WebsocketSyncDataService
- 按照配置soul-admin的個數(shù)育八,創(chuàng)建線程池大小
- 通過線程池的定時任務(wù)对途,實現(xiàn)心跳機制----保持心跳
- /使用調(diào)度線程池進行斷線重連,30秒進行一次髓棋。如果連接斷開实檀,將client.connectBlocking(3000, TimeUnit.MILLISECONDS);重連惶洲,重新發(fā)送MYSELF類型信息,進行全量更新膳犹。
線程池科普貼:https://blog.csdn.net/weixin_35756522/article/details/81707276
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
command:執(zhí)行線程
initialDelay:初始化延時
period:兩次開始執(zhí)行最小間隔時間
unit:計時單位
- SoulWebsocketClient 是具體處理websocket類
- 在與服務(wù)端建立連接時恬吕,會發(fā)送MYSELF類型的消息。****如上文所述须床,服務(wù)端接收****MYSELF類****型消息后將發(fā)送全量消息用于同步铐料。
小結(jié):
- soul-admin與soul-bootstrap基本通信鏈路,基本捋順豺旬。相關(guān)數(shù)據(jù)處理的細節(jié)钠惩,將在后續(xù)文章中淺析。
- 日拱一卒族阅,每天進步一點點