一文帶你快速入門Canal,看這篇就夠了肠骆!


前言

???????? ?我們在做實時數倉時數據往往都是保存到數據庫中例如MySQL算途,當有一條數據新增或修改需要馬上將數據同步到kafka中或其他的數據庫中,這時候我們需要借助阿里開源出來的Canal,來實現我們功能蚀腿。

一嘴瓤、什么是Canal

我們看下官網的描述:

canal [k?'n?l]扫外,譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析廓脆,提供增量數據訂閱和消費

二筛谚、Canal能干什么

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業(yè)務 cache 刷新
  • 帶業(yè)務邏輯的增量數據處理

注意: 當前Canal支持的MySQL版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

三停忿、Canal工作原理

  • MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events驾讲,可以通過 show binlog events 進行查看)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
  • MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據

canal 工作原理

  • canal 模擬 MySQL slave 的交互協(xié)議席赂,偽裝自己為 MySQL slave 吮铭,向 MySQL master 發(fā)送dump 協(xié)議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

四颅停、部署Canal

4.1 安裝MySQL

???????? ?我之前發(fā)過如何部署MySQL我在這就不在寫一遍了谓晌,如果你的機器中沒有安裝MySQL那可以去看這篇—> https://blog.csdn.net/qq_43791724/article/details/108196454

開啟MySQL的 binary log 日志

???????? 當我們在安裝成功MySQL成功后會有一個my.cnf文件需要添加一下內容

[mysqld]
log-bin=/var/lib/mysql/mysql-bin?#?開啟?binlog
binlog-format=ROW?#?選擇?ROW?模式
server_id=1?#?配置?MySQL?replaction?需要定義,不要和?canal?的?slaveId?重復

???????? 注意: 當我們在開啟了binary log日志模式后會在我們log-bin目錄下創(chuàng)建 mysql-bin.* 的文件癞揉。當我們數據庫中的數據發(fā)生改變時就會mysql-bin.*文件中生成記錄纸肉。

4.2 安裝Canal

去官下載需要的版本 https://github.com/alibaba/canal/releases我在這里使用的版本為:1.0.24

  1. 將下載好的gz包上傳到指定的目錄下
  2. 創(chuàng)建個文件夾
mkdir?canal
  1. 解壓gz包
tar?-zxvf?canal.deployer-1.0.24.tar.gz??-C?../servers/canal/
  1. 配置 canal.properties

common 屬性前四個配置項:

canal.id=?1
canal.ip=
canal.port=?11111
canal.zkServers=

canal.id是canal的編號,在集群環(huán)境下喊熟,不同canal的id不同毁靶,注意它和mysql的server_id不同。ip這里不指定逊移,默認為本機,比如上面是192.168.100.201龙填,端口號是11111胳泉。zk用于canal cluster。5. 再看下canal.propertiesdestinations相關的配置:

#################################################
#########???????destinations????????#############?
#################################################
canal.destinations?=?example
canal.conf.dir?=?../conf
canal.auto.scan?=?true
canal.auto.scan.interval?=?5
canal.instance.global.mode?=?spring?
canal.instance.global.lazy?=?false
canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml

這里的canal.destinations = example可以設置多個岩遗,比如example1,example2扇商,則需要創(chuàng)建對應的兩個文件夾,并且每個文件夾下都有一個instance.properties文件宿礁。全局的canal實例管理用spring案铺,這里的file-instance.xml最終會實例化所有的destinations instances:\

  1. 全局的canal實例管理用spring,這里的file-instance.xml最終會實例化所有的destinations instances:
<!--?properties?-->
<bean?class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer"?lazy-init="false">
?<property?name="ignoreResourceNotFound"?value="true"?/>
????<property?name="systemPropertiesModeName"?value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!--?允許system覆蓋?-->
????<property?name="locationNames">
?????<list>
?????????<value>classpath:canal.properties</value>?????????????????????<value>classpath:${canal.instance.destination:}/instance.properties</value>
?????????</list>
????</property>
</bean>

<bean?id="socketAddressEditor"?class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor"?/>
<bean?class="org.springframework.beans.factory.config.CustomEditorConfigurer">?
???<property?name="propertyEditorRegistrars">
????<list>
??????<ref?bean="socketAddressEditor"?/>
???????</list>
???</property>
</bean>
<bean?id="instance"?class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
?<property?name="destination"?value="${canal.instance.destination}"?/>
????<property?name="eventParser">
?????<ref?local="eventParser"?/>
????</property>
????<property?name="eventSink">
????????<ref?local="eventSink"?/>
????</property>
????<property?name="eventStore">
????????<ref?local="eventStore"?/>
????</property>
????<property?name="metaManager">
????????<ref?local="metaManager"?/>
????</property>
????<property?name="alarmHandler">
????????<ref?local="alarmHandler"?/>
????</property>
</bean>

比如canal.instance.destination等于example梆靖,就會加載example/instance.properties配置文件7. 修改instance 配置文件

##?mysql?serverId控汉,這里的slaveId不能和myql集群中已有的server_id一樣
canal.instance.mysql.slaveId?=?1234

#??按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
#?username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername?=?root
canal.instance.dbPassword?=?123456
#################################################
  1. 啟動
sh?bin/startup.sh
  1. 關閉
sh?bin/stop.sh
  1. 通過jps 查詢服務狀態(tài)
[root@node01?~]#?jps
2133?CanalLauncher
4184?Jps

到這里說明我們的服務就配好了,這時候我們可以使用java代碼創(chuàng)建一個客戶端來進行測試

五返吻、通過Java編寫Canal客戶端

5.1 導入依賴

?<dependencies>
????????<dependency>
????????????<groupId>com.alibaba.otter</groupId>
????????????<artifactId>canal.client</artifactId>
????????????<version>1.0.24</version>
????????</dependency>
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>fastjson</artifactId>
????????????<version>1.2.58</version>
????????</dependency>
????</dependencies>

5.2 編寫測試類

package?com.canal.Test;

/**
?*?@author?大數據老哥
?*?@version?V1.0
?*?@Package?com.canal.Test
?*?@File?:CanalTest.java
?*?@date?2021/1/11?21:54?*/

import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.Message;
import?com.google.protobuf.InvalidProtocolBufferException;
import?java.net.InetSocketAddress;
import?java.util.List;

/**
?*?測試canal配置是否成功?*/
public?class?CanalTest?{

????public?static?void?main(String[]?args)?{
????????//1.創(chuàng)建連接
????????CanalConnector?connect?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("192.168.100.201",?11111),
????????????????"example",?"",?"");????????//指定一次性讀取的條數
????????int?bachChSize?=?1000;
????????//?設置轉態(tài)
????????boolean?running?=?true;
????????while?(running)?{
????????????//2.建立連接
????????????connect.connect();
????????????//回滾上次請求的信息放置防止數據丟失
????????????connect.rollback();
????????????//?訂閱匹配日志
????????????connect.subscribe();
????????????while?(running)?{
????????????????Message?message?=?connect.getWithoutAck(bachChSize);
????????????????//?獲取batchId
????????????????long?batchId?=?message.getId();
????????????????//?獲取binlog數據的條數
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{

????????????????}?else?{
????????????????????printSummary(message);
????????????????}
????????????????//?確認指定的batchId已經消費成功
????????????????connect.ack(batchId);
????????????}
????????}
????}

????private?static?void?printSummary(Message?message)?{
????????//?遍歷整個batch中的每個binlog實體
????????for?(CanalEntry.Entry?entry?:?message.getEntries())?{
????????????//?事務開始
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}

????????????//?獲取binlog文件名
????????????String?logfileName?=?entry.getHeader().getLogfileName();
????????????//?獲取logfile的偏移量
????????????long?logfileOffset?=?entry.getHeader().getLogfileOffset();
????????????//?獲取sql語句執(zhí)行時間戳
????????????long?executeTime?=?entry.getHeader().getExecuteTime();
????????????//?獲取數據庫名
????????????String?schemaName?=?entry.getHeader().getSchemaName();
????????????//?獲取表名
????????????String?tableName?=?entry.getHeader().getTableName();
????????????//?獲取事件類型?insert/update/delete
????????????String?eventTypeName?=?entry.getHeader().getEventType().toString().toLowerCase();

????????????System.out.println("logfileName"?+?":"?+?logfileName);
????????????System.out.println("logfileOffset"?+?":"?+?logfileOffset);
????????????System.out.println("executeTime"?+?":"?+?executeTime);
????????????System.out.println("schemaName"?+?":"?+?schemaName);
????????????System.out.println("tableName"?+?":"?+?tableName);
????????????System.out.println("eventTypeName"?+?":"?+?eventTypeName);

????????????CanalEntry.RowChange?rowChange?=?null;
????????????try?{
????????????????//?獲取存儲數據姑子,并將二進制字節(jié)數據解析為RowChange實體
????????????????rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????e.printStackTrace();
????????????}

????????????//?迭代每一條變更數據
????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????//?判斷是否為刪除事件
????????????????if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.DELETE)?{
????????????????????System.out.println("---delete---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????????//?判斷是否為更新事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.UPDATE)?{
????????????????????System.out.println("---update---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????}
????????????????//?判斷是否為插入事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.INSERT)?{
????????????????????System.out.println("---insert---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????}
????????}
????}
????//?打印所有列名和列值
????private?static?void?printColumnList(List<CanalEntry.Column>?columnList)?{
????????for?(CanalEntry.Column?column?:?columnList)?{
????????????System.out.println(column.getName()?+?"\t"?+?column.getValue());
????????}
????}
}

5.3 啟動測試

小結

???????? ?今天給大家分享了Canle它的主要的功能做增量數據同步,后面會使用Canle進行做實時數倉测僵。我在這里為大家提供大數據的資源需要的朋友可以去下面GitHub去下載街佑,信自己,努力和汗水總會能得到回報的。我是大數據老哥沐旨,我們下期見~~~

資源獲取 獲取Flink面試題森逮,Spark面試題,程序員必備軟件磁携,hive面試題褒侧,Hadoop面試題,Docker面試題颜武,簡歷模板等資源請去GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigDataGitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects


?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末璃搜,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子鳞上,更是在濱河造成了極大的恐慌这吻,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件篙议,死亡現場離奇詭異唾糯,居然都是意外死亡,警方通過查閱死者的電腦和手機鬼贱,發(fā)現死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門移怯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人这难,你說我怎么就攤上這事舟误。” “怎么了姻乓?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵嵌溢,是天一觀的道長。 經常有香客問我蹋岩,道長赖草,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任剪个,我火速辦了婚禮秧骑,結果婚禮上,老公的妹妹穿的比我還像新娘扣囊。我一直安慰自己乎折,他們只是感情好,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布如暖。 她就那樣靜靜地躺著笆檀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪盒至。 梳的紋絲不亂的頭發(fā)上酗洒,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天士修,我揣著相機與錄音,去河邊找鬼樱衷。 笑死棋嘲,一個胖子當著我的面吹牛,可吹牛的內容都是我干的矩桂。 我是一名探鬼主播沸移,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼侄榴!你這毒婦竟也來了雹锣?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤癞蚕,失蹤者是張志新(化名)和其女友劉穎蕊爵,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體桦山,經...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡攒射,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了恒水。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片会放。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钉凌,靈堂內的尸體忽然破棺而出咧最,到底是詐尸還是另有隱情,我是刑警寧澤御雕,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布窗市,位于F島的核電站,受9級特大地震影響饮笛,放射性物質發(fā)生泄漏。R本人自食惡果不足惜论熙,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一福青、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脓诡,春花似錦无午、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至交惯,卻和暖如春次泽,著一層夾襖步出監(jiān)牢的瞬間穿仪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工意荤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留啊片,地道東北人。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓玖像,卻偏偏與公主長得像紫谷,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捐寥,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內容