本文主要解析下canal server的啟動過程敛苇,希望能有所收獲妆绞。
一、序列圖
1.1 啟動
1.2 停止
二枫攀、源碼分析
整個server啟動的過程比較復(fù)雜括饶,看圖難以理解,需要輔以文字說明来涨。
首先程序的入口在CanalLauncher的main方法中图焰。
2.1 加載配置文件
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
從canal.properties文件中l(wèi)oad所有的配置信息,加載到上下文中蹦掐。不再贅述技羔。
2.2 構(gòu)造CanalController
根據(jù)配置文件來構(gòu)造CanalController僵闯,這塊的代碼比較多,主要分為七個步驟藤滥,具體如下鳖粟。
2.2.1 初始化全局參數(shù)配置
調(diào)用initGlobalConfig方法,過程如下:
- 判斷運行模式拙绊,是從spring加載還是manager加載向图,目前開源版本建議使用spring
- 獲取是否懶加載
- 如果是manager模式啟動,獲取manager的ip地址标沪;如果是spring模式啟動张漂,獲取spring xml的文件地址,加載到全部配置中
- 構(gòu)造一個實例構(gòu)造器CanalInstanceGenerator谨娜,我們用到的就是在spring的beanFactory中加上destination的bean航攒,這個destination就是canal instance的名稱
這塊邏輯在CanalController的initGlobalConfig方法中。
2.2.2 初始化實例配置
這塊的邏輯是從instance.properties里面初始化實例趴梢。
private void initInstanceConfig(Properties properties) {
String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
for (String destination : destinations) {
InstanceConfig config = parseInstanceConfig(properties, destination);
InstanceConfig oldConfig = instanceConfigs.put(destination, config);
if (oldConfig != null) {
logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
oldConfig, config });
}
}
}
從這段代碼中可以看出漠畜,我們在一個canal.properties文件中,可以配置多個destination坞靶,也就是可以配置多個instance憔狞,不同的instance以逗號隔開。這里主要看的是parseInstanceConfig()方法彰阴,里面的邏輯如下:
- 獲取啟動模式瘾敢,是manager還是spring,我們這邊默認(rèn)都是spring尿这。
- 獲取懶加載字段
- 獲取spring xml配置文件地址
2.2.3 初始SocketChannel
從配置文件中獲取canal.socketChannel字段簇抵,放到全局變量中。
2.2.4 準(zhǔn)備canal server
從配置文件中分別獲取canal.id射众、ip碟摆、port(對外提供socket服務(wù)的端口),獲取一個內(nèi)存級的server單例叨橱,同時也獲取一個對外提供Netty服務(wù)的單例典蜕。
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設(shè)置自定義的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
2.2.5 初始化系統(tǒng)目錄
從配置文件中獲取zk地址(canal.zkServers),啟動一個zk客戶端罗洗,然后初始化兩個系統(tǒng)目錄愉舔,分別是:
- /otter/canal/destinations
- /otter/canal/cluster
2.2.6 初始化系統(tǒng)監(jiān)控
根據(jù)destination構(gòu)造運行時監(jiān)控,其實就是根據(jù)instance名來構(gòu)造ServerRunningMonitor伙菜。其實就是實現(xiàn)了ServerRunningListener中的一些方法轩缤。
public interface ServerRunningListener {
/**
* 啟動時回調(diào)做點事情
*/
public void processStart();
/**
* 關(guān)閉時回調(diào)做點事情
*/
public void processStop();
/**
* 觸發(fā)現(xiàn)在輪到自己做為active,需要載入上一個active的上下文數(shù)據(jù)
*/
public void processActiveEnter();
/**
* 觸發(fā)一下當(dāng)前active模式失敗
*/
public void processActiveExit();
}
然后初始化一下ServerRunningMonitor。
runningMonitor.init();
這個init方法跟蹤的結(jié)果典奉,其實就是執(zhí)行了ServerRunningListener中的processStart方法躺翻。
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內(nèi)容丧叽,其實就是server的地址卫玖,最后一個ip:port是個zk的臨時節(jié)點。然后訂閱一下節(jié)點事件踊淳,當(dāng)節(jié)點有事件推送過來后假瞬,做一些動作。
2.2.7 初始化配置文件監(jiān)控
如果canal.auto.scan配置為true(默認(rèn)為true)迂尝,首先定義一個InstanceAction脱茉,包含了啟動、停止垄开、重啟instance的動作琴许。
定義一個SpringInstanceConfigMonitor,配置定時掃描的事件為canal.auto.scan.interval溉躲,默認(rèn)5s榜田,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結(jié)合起來锻梳。
2.3 啟動CanalController
上面的構(gòu)造方法其實就是定義一些必要的內(nèi)容箭券,真正的啟動在這個方法中。
2.3.1 創(chuàng)建工作節(jié)點
創(chuàng)建臨時節(jié)點/otter/canal/cluster/ip:port疑枯,同時啟動監(jiān)聽器.
2.3.2 啟動embeded服務(wù)
embededCanalServer.start();
這個start里面辩块,一個是將當(dāng)前server的running狀態(tài)置為true,同時根據(jù)destination構(gòu)建CanalInstance荆永。
2.3.3 HA啟動
遍歷Map<String, InstanceConfig>中的InstanceConfig废亭,如果CanalInsance還沒啟動,如果不是懶加載的話具钥,直接HA啟動ServerRunningMonitor滔以。
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
public synchronized void start() {
super.start();
try {
processStart();
if (zkClient != null) {
// 如果需要盡可能釋放instance資源,不需要監(jiān)聽running節(jié)點氓拼,不然即使stop了這臺機(jī)器你画,另一臺機(jī)器立馬會start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 沒有zk,直接啟動
}
} catch (Exception e) {
logger.error("start failed", e);
// 沒有正常啟動桃漾,重置一下狀態(tài)坏匪,避免干擾下一次start
stop();
}
}
這里面啟動的內(nèi)容我們來看看。
- 首先調(diào)用super.start()把當(dāng)前的running狀態(tài)置為true撬统。
- 然后啟動zk節(jié)點的監(jiān)聽(這邊的processStart是否多余了适滓?)。
- 監(jiān)聽路徑/otter/canal/destinations/{destination}/running節(jié)點的變化
zkClient.subscribeDataChanges(path, dataListener);
- 這里的dataListener是ServerRunningMonitor構(gòu)造函數(shù)中定義的恋追,就是定義一些zk節(jié)點監(jiān)聽的動作凭迹。
- 如果有數(shù)據(jù)變化罚屋,如果running節(jié)點中的內(nèi)容ServerRunningData發(fā)生了變化,字段active變?yōu)榱薴alse嗅绸,而且address就是本機(jī)脾猛,說明本機(jī)出現(xiàn)了主動釋放,需要釋放運行時狀態(tài)鱼鸠。此時需要調(diào)用到processActiveExit方法猛拴,其實就是停止了本機(jī)的server中destination對應(yīng)的instance。
- 如果節(jié)點發(fā)生了刪除動作蚀狰,如果上一次active的狀態(tài)就是本機(jī)愉昆,則即時觸發(fā)一下active搶占,調(diào)用initRunning()方法麻蹋,當(dāng)然跛溉,如果啟動失敗,也不是立即切換扮授,而是會等待5s芳室,再嘗試啟動。這個啟動方法中糙箍,主要調(diào)用的是processActiveEnter()方法渤愁,來啟動了embededCanalServer.start(destination)。其實就是啟動canalInstance深夯,這塊后續(xù)再分析抖格。
- 其實除了監(jiān)聽器,在本身的ServerRunningMonitor的start方法中咕晋,也有initRunning方法雹拄。這塊啟動canalInstance的方法,我們下一篇文章分析掌呜。
2.3.4 instance文件掃描啟動
在掃描之前滓玖,把destination和InstanceAction綁定到緩存中。
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
首先啟動一個全局掃描质蕉,然后再對應(yīng)的destination配置文件的掃描势篡。
if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
這個start方法啟動了一個定時器,默認(rèn)5s掃描一次模暗。掃描的內(nèi)容就是配置文件路徑下的內(nèi)容禁悠,針對文件的新增、刪除兑宇、修改碍侦,對應(yīng)InstanceAction中的start,stop和reload方法。也就是說瓷产,我們在canal運行的過程中站玄,通過動態(tài)修改配置文件,來實現(xiàn)動態(tài)調(diào)整運行時參數(shù)濒旦,主要可以用來進(jìn)行重復(fù)消費株旷,位點的遷移等等。
2.3.5 網(wǎng)絡(luò)接口啟動
CanalServerWithNetty的啟動疤估,首先需要啟動CanalServerWithEmbedded灾常,主要的業(yè)務(wù)邏輯在SessionHandler中霎冯。這塊其實是暴露外部服務(wù)铃拇,給canal client進(jìn)行調(diào)用。
2.4 增加關(guān)閉hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal server");
controller.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal Server:", e);
} finally {
logger.info("## canal server is down.");
}
}
});
在server停止時沈撞,調(diào)用controller.stop()方法慷荔。
public void stop() throws Throwable {
canalServer.stop();
if (autoScan) {
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (monitor.isStart()) {
monitor.stop();
}
}
}
for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) {
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
}
// 釋放canal的工作節(jié)點
releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
logger.info("## stop the canal server[{}:{}]", ip, port);
if (zkclientx != null) {
zkclientx.close();
}
}
主要是停止controller,server相關(guān)的monitor缠俺,instance相關(guān)的monitor显晶,然后釋放zk節(jié)點,關(guān)閉zk連接壹士。