前言
???????? ?我們在做實時數倉時數據往往都是保存到數據庫中例如MySQL算途,當有一條數據新增或修改需要馬上將數據同步到kafka中或其他的數據庫中,這時候我們需要借助阿里開源出來的Canal
,來實現我們功能蚀腿。
一嘴瓤、什么是Canal
我們看下官網的描述:
canal [k?'n?l]扫外,譯意為水道/管道/溝渠,主要用途是基于
MySQL
數據庫增量日志解析廓脆,提供增量數據訂閱和消費
二筛谚、Canal能干什么
- 數據庫鏡像
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業(yè)務 cache 刷新
- 帶業(yè)務邏輯的增量數據處理
注意: 當前Canal支持的MySQL版本有 5.1.x
, 5.5.x
, 5.6.x
, 5.7.x
, 8.0.x
三停忿、Canal工作原理
- MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events驾讲,可以通過 show binlog events 進行查看)
- MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
- MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
canal 工作原理
- canal 模擬 MySQL slave 的交互協(xié)議席赂,偽裝自己為 MySQL slave 吮铭,向 MySQL master 發(fā)送dump 協(xié)議
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對象(原始為 byte 流)
四颅停、部署Canal
4.1 安裝MySQL
???????? ?我之前發(fā)過如何部署MySQL我在這就不在寫一遍了谓晌,如果你的機器中沒有安裝MySQL那可以去看這篇—> https://blog.csdn.net/qq_43791724/article/details/108196454
開啟MySQL的 binary log 日志
???????? 當我們在安裝成功MySQL成功后會有一個my.cnf
文件需要添加一下內容
[mysqld]
log-bin=/var/lib/mysql/mysql-bin?#?開啟?binlog
binlog-format=ROW?#?選擇?ROW?模式
server_id=1?#?配置?MySQL?replaction?需要定義,不要和?canal?的?slaveId?重復
???????? 注意: 當我們在開啟了binary log
日志模式后會在我們log-bin
目錄下創(chuàng)建 mysql-bin.*
的文件癞揉。當我們數據庫中的數據發(fā)生改變時就會mysql-bin.*
文件中生成記錄纸肉。
4.2 安裝Canal
去官下載需要的版本 https://github.com/alibaba/canal/releases我在這里使用的版本為:1.0.24
- 將下載好的gz包上傳到指定的目錄下
- 創(chuàng)建個文件夾
mkdir?canal
- 解壓gz包
tar?-zxvf?canal.deployer-1.0.24.tar.gz??-C?../servers/canal/
- 配置
canal.properties
common 屬性前四個配置項:
canal.id=?1
canal.ip=
canal.port=?11111
canal.zkServers=
canal.id是canal的編號,在集群環(huán)境下喊熟,不同canal的id不同毁靶,注意它和mysql的server_id不同
。ip這里不指定逊移,默認為本機,比如上面是192.168.100.201龙填,端口號是11111胳泉。zk用于canal cluster。5. 再看下canal.properties
下destinations相關的配置:
#################################################
#########???????destinations????????#############?
#################################################
canal.destinations?=?example
canal.conf.dir?=?../conf
canal.auto.scan?=?true
canal.auto.scan.interval?=?5
canal.instance.global.mode?=?spring?
canal.instance.global.lazy?=?false
canal.instance.global.spring.xml?=?classpath:spring/file-instance.xml
這里的canal.destinations = example
可以設置多個岩遗,比如example1,example2扇商,則需要創(chuàng)建對應的兩個文件夾,并且每個文件夾下都有一個instance.properties
文件宿礁。全局的canal實例管理用spring案铺,這里的file-instance.xml
最終會實例化所有的destinations instances:\
- 全局的canal實例管理用spring,這里的
file-instance.xml
最終會實例化所有的destinations instances:
<!--?properties?-->
<bean?class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer"?lazy-init="false">
?<property?name="ignoreResourceNotFound"?value="true"?/>
????<property?name="systemPropertiesModeName"?value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!--?允許system覆蓋?-->
????<property?name="locationNames">
?????<list>
?????????<value>classpath:canal.properties</value>?????????????????????<value>classpath:${canal.instance.destination:}/instance.properties</value>
?????????</list>
????</property>
</bean>
<bean?id="socketAddressEditor"?class="com.alibaba.otter.canal.instance.spring.support.SocketAddressEditor"?/>
<bean?class="org.springframework.beans.factory.config.CustomEditorConfigurer">?
???<property?name="propertyEditorRegistrars">
????<list>
??????<ref?bean="socketAddressEditor"?/>
???????</list>
???</property>
</bean>
<bean?id="instance"?class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
?<property?name="destination"?value="${canal.instance.destination}"?/>
????<property?name="eventParser">
?????<ref?local="eventParser"?/>
????</property>
????<property?name="eventSink">
????????<ref?local="eventSink"?/>
????</property>
????<property?name="eventStore">
????????<ref?local="eventStore"?/>
????</property>
????<property?name="metaManager">
????????<ref?local="metaManager"?/>
????</property>
????<property?name="alarmHandler">
????????<ref?local="alarmHandler"?/>
????</property>
</bean>
比如canal.instance.destination
等于example梆靖,就會加載example/instance.properties
配置文件7. 修改instance 配置文件
##?mysql?serverId控汉,這里的slaveId不能和myql集群中已有的server_id一樣
canal.instance.mysql.slaveId?=?1234
#??按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.120:3306
#?username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername?=?root
canal.instance.dbPassword?=?123456
#################################################
- 啟動
sh?bin/startup.sh
- 關閉
sh?bin/stop.sh
- 通過jps 查詢服務狀態(tài)
[root@node01?~]#?jps
2133?CanalLauncher
4184?Jps
到這里說明我們的服務就配好了,這時候我們可以使用java代碼創(chuàng)建一個客戶端來進行測試
五返吻、通過Java編寫Canal客戶端
5.1 導入依賴
?<dependencies>
????????<dependency>
????????????<groupId>com.alibaba.otter</groupId>
????????????<artifactId>canal.client</artifactId>
????????????<version>1.0.24</version>
????????</dependency>
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>fastjson</artifactId>
????????????<version>1.2.58</version>
????????</dependency>
????</dependencies>
5.2 編寫測試類
package?com.canal.Test;
/**
?*?@author?大數據老哥
?*?@version?V1.0
?*?@Package?com.canal.Test
?*?@File?:CanalTest.java
?*?@date?2021/1/11?21:54?*/
import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.alibaba.otter.canal.protocol.Message;
import?com.google.protobuf.InvalidProtocolBufferException;
import?java.net.InetSocketAddress;
import?java.util.List;
/**
?*?測試canal配置是否成功?*/
public?class?CanalTest?{
????public?static?void?main(String[]?args)?{
????????//1.創(chuàng)建連接
????????CanalConnector?connect?=?CanalConnectors.newSingleConnector(new?InetSocketAddress("192.168.100.201",?11111),
????????????????"example",?"",?"");????????//指定一次性讀取的條數
????????int?bachChSize?=?1000;
????????//?設置轉態(tài)
????????boolean?running?=?true;
????????while?(running)?{
????????????//2.建立連接
????????????connect.connect();
????????????//回滾上次請求的信息放置防止數據丟失
????????????connect.rollback();
????????????//?訂閱匹配日志
????????????connect.subscribe();
????????????while?(running)?{
????????????????Message?message?=?connect.getWithoutAck(bachChSize);
????????????????//?獲取batchId
????????????????long?batchId?=?message.getId();
????????????????//?獲取binlog數據的條數
????????????????int?size?=?message.getEntries().size();
????????????????if?(batchId?==?-1?||?size?==?0)?{
????????????????}?else?{
????????????????????printSummary(message);
????????????????}
????????????????//?確認指定的batchId已經消費成功
????????????????connect.ack(batchId);
????????????}
????????}
????}
????private?static?void?printSummary(Message?message)?{
????????//?遍歷整個batch中的每個binlog實體
????????for?(CanalEntry.Entry?entry?:?message.getEntries())?{
????????????//?事務開始
????????????if?(entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONBEGIN?||?entry.getEntryType()?==?CanalEntry.EntryType.TRANSACTIONEND)?{
????????????????continue;
????????????}
????????????//?獲取binlog文件名
????????????String?logfileName?=?entry.getHeader().getLogfileName();
????????????//?獲取logfile的偏移量
????????????long?logfileOffset?=?entry.getHeader().getLogfileOffset();
????????????//?獲取sql語句執(zhí)行時間戳
????????????long?executeTime?=?entry.getHeader().getExecuteTime();
????????????//?獲取數據庫名
????????????String?schemaName?=?entry.getHeader().getSchemaName();
????????????//?獲取表名
????????????String?tableName?=?entry.getHeader().getTableName();
????????????//?獲取事件類型?insert/update/delete
????????????String?eventTypeName?=?entry.getHeader().getEventType().toString().toLowerCase();
????????????System.out.println("logfileName"?+?":"?+?logfileName);
????????????System.out.println("logfileOffset"?+?":"?+?logfileOffset);
????????????System.out.println("executeTime"?+?":"?+?executeTime);
????????????System.out.println("schemaName"?+?":"?+?schemaName);
????????????System.out.println("tableName"?+?":"?+?tableName);
????????????System.out.println("eventTypeName"?+?":"?+?eventTypeName);
????????????CanalEntry.RowChange?rowChange?=?null;
????????????try?{
????????????????//?獲取存儲數據姑子,并將二進制字節(jié)數據解析為RowChange實體
????????????????rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????e.printStackTrace();
????????????}
????????????//?迭代每一條變更數據
????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????//?判斷是否為刪除事件
????????????????if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.DELETE)?{
????????????????????System.out.println("---delete---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????????//?判斷是否為更新事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.UPDATE)?{
????????????????????System.out.println("---update---");
????????????????????printColumnList(rowData.getBeforeColumnsList());
????????????????????System.out.println("---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????}
????????????????//?判斷是否為插入事件
????????????????else?if?(entry.getHeader().getEventType()?==?CanalEntry.EventType.INSERT)?{
????????????????????System.out.println("---insert---");
????????????????????printColumnList(rowData.getAfterColumnsList());
????????????????????System.out.println("---");
????????????????}
????????????}
????????}
????}
????//?打印所有列名和列值
????private?static?void?printColumnList(List<CanalEntry.Column>?columnList)?{
????????for?(CanalEntry.Column?column?:?columnList)?{
????????????System.out.println(column.getName()?+?"\t"?+?column.getValue());
????????}
????}
}
5.3 啟動測試
小結
???????? ?今天給大家分享了Canle它的主要的功能做增量數據同步,后面會使用Canle進行做實時數倉测僵。我在這里為大家提供大數據的資源
需要的朋友可以去下面GitHub去下載街佑,信自己,努力和汗水總會能得到回報的。我是大數據老哥沐旨,我們下期見~~~
資源獲取 獲取Flink面試題森逮,Spark面試題,程序員必備軟件磁携,hive面試題褒侧,Hadoop面試題,Docker面試題颜武,簡歷模板等資源請去GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigDataGitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects