【Canal源碼分析】Canal Server的啟動和停止過程

本文主要解析下canal server的啟動過程敛苇,希望能有所收獲妆绞。

一、序列圖

1.1 啟動

啟動序列圖.png

1.2 停止

停止序列圖.png

二枫攀、源碼分析

整個server啟動的過程比較復(fù)雜括饶,看圖難以理解,需要輔以文字說明来涨。

首先程序的入口在CanalLauncher的main方法中图焰。

2.1 加載配置文件

String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
    conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
    properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
    properties.load(new FileInputStream(conf));
}

從canal.properties文件中l(wèi)oad所有的配置信息,加載到上下文中蹦掐。不再贅述技羔。

2.2 構(gòu)造CanalController

根據(jù)配置文件來構(gòu)造CanalController僵闯,這塊的代碼比較多,主要分為七個步驟藤滥,具體如下鳖粟。

2.2.1 初始化全局參數(shù)配置

調(diào)用initGlobalConfig方法,過程如下:

  • 判斷運行模式拙绊,是從spring加載還是manager加載向图,目前開源版本建議使用spring
  • 獲取是否懶加載
  • 如果是manager模式啟動,獲取manager的ip地址标沪;如果是spring模式啟動张漂,獲取spring xml的文件地址,加載到全部配置中
  • 構(gòu)造一個實例構(gòu)造器CanalInstanceGenerator谨娜,我們用到的就是在spring的beanFactory中加上destination的bean航攒,這個destination就是canal instance的名稱

這塊邏輯在CanalController的initGlobalConfig方法中。

2.2.2 初始化實例配置

這塊的邏輯是從instance.properties里面初始化實例趴梢。

private void initInstanceConfig(Properties properties) {
    String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
    String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

    for (String destination : destinations) {
        InstanceConfig config = parseInstanceConfig(properties, destination);
        InstanceConfig oldConfig = instanceConfigs.put(destination, config);

        if (oldConfig != null) {
            logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
                    oldConfig, config });
        }
    }
}

從這段代碼中可以看出漠畜,我們在一個canal.properties文件中,可以配置多個destination坞靶,也就是可以配置多個instance憔狞,不同的instance以逗號隔開。這里主要看的是parseInstanceConfig()方法彰阴,里面的邏輯如下:

  • 獲取啟動模式瘾敢,是manager還是spring,我們這邊默認(rèn)都是spring尿这。
  • 獲取懶加載字段
  • 獲取spring xml配置文件地址

2.2.3 初始SocketChannel

從配置文件中獲取canal.socketChannel字段簇抵,放到全局變量中。

2.2.4 準(zhǔn)備canal server

從配置文件中分別獲取canal.id射众、ip碟摆、port(對外提供socket服務(wù)的端口),獲取一個內(nèi)存級的server單例叨橱,同時也獲取一個對外提供Netty服務(wù)的單例典蜕。

cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設(shè)置自定義的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);

2.2.5 初始化系統(tǒng)目錄

從配置文件中獲取zk地址(canal.zkServers),啟動一個zk客戶端罗洗,然后初始化兩個系統(tǒng)目錄愉舔,分別是:

  • /otter/canal/destinations
  • /otter/canal/cluster

2.2.6 初始化系統(tǒng)監(jiān)控

根據(jù)destination構(gòu)造運行時監(jiān)控,其實就是根據(jù)instance名來構(gòu)造ServerRunningMonitor伙菜。其實就是實現(xiàn)了ServerRunningListener中的一些方法轩缤。

public interface ServerRunningListener {

    /**
     * 啟動時回調(diào)做點事情
     */
    public void processStart();

    /**
     * 關(guān)閉時回調(diào)做點事情
     */
    public void processStop();

    /**
     * 觸發(fā)現(xiàn)在輪到自己做為active,需要載入上一個active的上下文數(shù)據(jù)
     */
    public void processActiveEnter();

    /**
     * 觸發(fā)一下當(dāng)前active模式失敗
     */
    public void processActiveExit();

}

然后初始化一下ServerRunningMonitor。

runningMonitor.init();

這個init方法跟蹤的結(jié)果典奉,其實就是執(zhí)行了ServerRunningListener中的processStart方法躺翻。

public void processStart() {
    try {
        if (zkclientx != null) {
            final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
            initCid(path);
            zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
    } finally {
        MDC.remove(CanalConstants.MDC_DESTINATION);
    }
}

首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內(nèi)容丧叽,其實就是server的地址卫玖,最后一個ip:port是個zk的臨時節(jié)點。然后訂閱一下節(jié)點事件踊淳,當(dāng)節(jié)點有事件推送過來后假瞬,做一些動作。

2.2.7 初始化配置文件監(jiān)控

如果canal.auto.scan配置為true(默認(rèn)為true)迂尝,首先定義一個InstanceAction脱茉,包含了啟動、停止垄开、重啟instance的動作琴许。

定義一個SpringInstanceConfigMonitor,配置定時掃描的事件為canal.auto.scan.interval溉躲,默認(rèn)5s榜田,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結(jié)合起來锻梳。

2.3 啟動CanalController

上面的構(gòu)造方法其實就是定義一些必要的內(nèi)容箭券,真正的啟動在這個方法中。

2.3.1 創(chuàng)建工作節(jié)點

創(chuàng)建臨時節(jié)點/otter/canal/cluster/ip:port疑枯,同時啟動監(jiān)聽器.

2.3.2 啟動embeded服務(wù)

embededCanalServer.start();

這個start里面辩块,一個是將當(dāng)前server的running狀態(tài)置為true,同時根據(jù)destination構(gòu)建CanalInstance荆永。

2.3.3 HA啟動

遍歷Map<String, InstanceConfig>中的InstanceConfig废亭,如果CanalInsance還沒啟動,如果不是懶加載的話具钥,直接HA啟動ServerRunningMonitor滔以。

ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
    runningMonitor.start();
}

public synchronized void start() {
    super.start();
    try {
        processStart();
        if (zkClient != null) {
            // 如果需要盡可能釋放instance資源,不需要監(jiān)聽running節(jié)點氓拼,不然即使stop了這臺機(jī)器你画,另一臺機(jī)器立馬會start
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            zkClient.subscribeDataChanges(path, dataListener);

            initRunning();
        } else {
            processActiveEnter();// 沒有zk,直接啟動
        }
    } catch (Exception e) {
        logger.error("start failed", e);
        // 沒有正常啟動桃漾,重置一下狀態(tài)坏匪,避免干擾下一次start
        stop();
    }

}

這里面啟動的內(nèi)容我們來看看。

  • 首先調(diào)用super.start()把當(dāng)前的running狀態(tài)置為true撬统。
  • 然后啟動zk節(jié)點的監(jiān)聽(這邊的processStart是否多余了适滓?)。
  • 監(jiān)聽路徑/otter/canal/destinations/{destination}/running節(jié)點的變化
zkClient.subscribeDataChanges(path, dataListener);
  • 這里的dataListener是ServerRunningMonitor構(gòu)造函數(shù)中定義的恋追,就是定義一些zk節(jié)點監(jiān)聽的動作凭迹。
    • 如果有數(shù)據(jù)變化罚屋,如果running節(jié)點中的內(nèi)容ServerRunningData發(fā)生了變化,字段active變?yōu)榱薴alse嗅绸,而且address就是本機(jī)脾猛,說明本機(jī)出現(xiàn)了主動釋放,需要釋放運行時狀態(tài)鱼鸠。此時需要調(diào)用到processActiveExit方法猛拴,其實就是停止了本機(jī)的server中destination對應(yīng)的instance。
    • 如果節(jié)點發(fā)生了刪除動作蚀狰,如果上一次active的狀態(tài)就是本機(jī)愉昆,則即時觸發(fā)一下active搶占,調(diào)用initRunning()方法麻蹋,當(dāng)然跛溉,如果啟動失敗,也不是立即切換扮授,而是會等待5s芳室,再嘗試啟動。這個啟動方法中糙箍,主要調(diào)用的是processActiveEnter()方法渤愁,來啟動了embededCanalServer.start(destination)。其實就是啟動canalInstance深夯,這塊后續(xù)再分析抖格。
  • 其實除了監(jiān)聽器,在本身的ServerRunningMonitor的start方法中咕晋,也有initRunning方法雹拄。這塊啟動canalInstance的方法,我們下一篇文章分析掌呜。

2.3.4 instance文件掃描啟動

在掃描之前滓玖,把destination和InstanceAction綁定到緩存中。

instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);

首先啟動一個全局掃描质蕉,然后再對應(yīng)的destination配置文件的掃描势篡。

if (autoScan) {
    instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
    for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
        if (!monitor.isStart()) {
            monitor.start();
        }
    }
}

這個start方法啟動了一個定時器,默認(rèn)5s掃描一次模暗。掃描的內(nèi)容就是配置文件路徑下的內(nèi)容禁悠,針對文件的新增、刪除兑宇、修改碍侦,對應(yīng)InstanceAction中的start,stop和reload方法。也就是說瓷产,我們在canal運行的過程中站玄,通過動態(tài)修改配置文件,來實現(xiàn)動態(tài)調(diào)整運行時參數(shù)濒旦,主要可以用來進(jìn)行重復(fù)消費株旷,位點的遷移等等。

2.3.5 網(wǎng)絡(luò)接口啟動

CanalServerWithNetty的啟動疤估,首先需要啟動CanalServerWithEmbedded灾常,主要的業(yè)務(wù)邏輯在SessionHandler中霎冯。這塊其實是暴露外部服務(wù)铃拇,給canal client進(jìn)行調(diào)用。

2.4 增加關(guān)閉hook

Runtime.getRuntime().addShutdownHook(new Thread() {

    public void run() {
        try {
            logger.info("## stop the canal server");
            controller.stop();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping canal Server:", e);
        } finally {
            logger.info("## canal server is down.");
        }
    }

});

在server停止時沈撞,調(diào)用controller.stop()方法慷荔。

public void stop() throws Throwable {
    canalServer.stop();

    if (autoScan) {
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (monitor.isStart()) {
                monitor.stop();
            }
        }
    }

    for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    }

    // 釋放canal的工作節(jié)點
    releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
    logger.info("## stop the canal server[{}:{}]", ip, port);
        
    if (zkclientx != null) {
        zkclientx.close();
    }
}

主要是停止controller,server相關(guān)的monitor缠俺,instance相關(guān)的monitor显晶,然后釋放zk節(jié)點,關(guān)閉zk連接壹士。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末磷雇,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子躏救,更是在濱河造成了極大的恐慌唯笙,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盒使,死亡現(xiàn)場離奇詭異崩掘,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)少办,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門苞慢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人英妓,你說我怎么就攤上這事挽放。” “怎么了蔓纠?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵辑畦,是天一觀的道長。 經(jīng)常有香客問我贺纲,道長航闺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮潦刃,結(jié)果婚禮上侮措,老公的妹妹穿的比我還像新娘。我一直安慰自己乖杠,他們只是感情好分扎,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著胧洒,像睡著了一般畏吓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上卫漫,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天菲饼,我揣著相機(jī)與錄音,去河邊找鬼列赎。 笑死宏悦,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的包吝。 我是一名探鬼主播饼煞,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诗越!你這毒婦竟也來了砖瞧?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤嚷狞,失蹤者是張志新(化名)和其女友劉穎块促,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體感耙,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡褂乍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了即硼。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片逃片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖只酥,靈堂內(nèi)的尸體忽然破棺而出褥实,到底是詐尸還是另有隱情,我是刑警寧澤裂允,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布损离,位于F島的核電站,受9級特大地震影響绝编,放射性物質(zhì)發(fā)生泄漏僻澎。R本人自食惡果不足惜貌踏,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望窟勃。 院中可真熱鬧祖乳,春花似錦、人聲如沸秉氧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽汁咏。三九已至亚斋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間攘滩,已是汗流浹背帅刊。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留轰驳,地道東北人厚掷。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓弟灼,卻偏偏與公主長得像级解,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子田绑,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容