使用canal實現(xiàn)增量同步MySQL的數(shù)據(jù)
搭建環(huán)境
- 操作系統(tǒng): CentOS release 6.5 (Final)
- MySQL版本: 10.0.33-MariaDB-wsrep
- JDK版本:1.8(強力要求,否則會導致ES和canal-adapter無法啟動)
- ElasticSearch版本:6.8.0
- canal版本: 1.1.3
- zookeeper
技術方案概覽
- 開啟MySQL的binary log日志記錄
- 修改MySQL的binary log模式為
ROW
- canal-server充當MySQL集群的一個slave,獲取master的binary log信息
- canal-server將拿到的binary log信息推送給canal-adapter
- canal-server和canal-adapter采用多節(jié)點部署的方式提高可用性
- canal-adapter將數(shù)據(jù)同步到es集群
MySQL配置
- 開啟master的binary log記錄功能,并且選擇模式為ROW
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義舷手,不能和canal的slaveId重復
- canal的原理是模擬自己為mysql slave,所以這里一定需要做為mysql slave的相關權限.
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
ES安裝
下載安裝包
- 進入到Elasticsearch的官網(wǎng)下載頁面
https://www.elastic.co/cn/downloads/elasticsearch
-
如果不想安裝最新版本因妙,可以選擇歷史版本
-
本次安裝版本號選用6.8.0
- 下載安裝包
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.0.tar.gz
新增系統(tǒng)用戶
- 由于elasticsearch不能使用root用戶啟動阳懂,所以我們創(chuàng)建一個新的用戶
# 新建用戶
adduser es
# 給新用戶添加密碼
passwd es
# 切換登陸用戶
su es
- 將安裝包copy到路徑
/home/es/elasticsearch
下
mv elasticsearch-6.8.0.tar.gz /home/es/elasticsearch
解壓安裝包
cd /home/es/elasticsearch
tar -xzvf elasticsearch-6.8.0.tar.gz
修改配置文件
vi config/elasticsearch.yml
#集群的名稱,同一個集群該值必須設置成相同的
cluster.name: okami-application
#該節(jié)點的名字
node.name: node-1
#該節(jié)點有機會成為master節(jié)點
node.master: true
#該節(jié)點可以存儲數(shù)據(jù)
node.data: true
#shard的數(shù)目
#index.number_of_shards: 5
#數(shù)據(jù)副本的數(shù)目
#index.number_of_replicas: 3
#設置綁定的IP地址盆赤,可以是IPV4或者IPV6
network.bind_host: 0.0.0.0
#設置其他節(jié)點與該節(jié)點交互的IP地址
network.publish_host: 192.168.10.1
#該參數(shù)用于同時設置bind_host和publish_host
network.host: 192.168.10.1
#設置節(jié)點之間交互的端口號
transport.tcp.port: 9300
#設置是否壓縮tcp上交互傳輸?shù)臄?shù)據(jù)
transport.tcp.compress: true
#設置對外服務的http端口號
http.port: 9200
#設置http內容的最大大小
http.max_content_length: 100mb
#是否開啟http服務對外提供服務
http.enabled: true
#設置這個參數(shù)來保證集群中的節(jié)點可以知道其它N個有master資格的節(jié)點。默認為1悲柱,對于大的集群來說揖曾,可以設置大一點的值(2-4)
discovery.zen.minimum_master_nodes: 1
#設置集群中自動發(fā)現(xiàn)其他節(jié)點時ping連接的超時時間
discovery.zen.ping_timeout: 120s
#設置是否打開多播發(fā)現(xiàn)節(jié)點
#discovery.zen.ping.multicast.enabled: true
#設置集群中的Master節(jié)點的初始列表尾序,可以通過這些節(jié)點來自動發(fā)現(xiàn)其他新加入集群的節(jié)點
discovery.zen.ping.unicast.hosts: ["192.168.10.1:9300"]
path.data: /usr/hdp/2.5.0.0-1245/esdata
path.logs: /usr/hdp/2.5.0.0-1245/eslog
http.cors.enabled: true
http.cors.allow-origin: "*"
#--------------------------------------------------------------------------------
#index.analysis.analyzer.ik.type: "ik"
啟動ES
- ES要求Java版本至少1.8刺彩,所以要檢查Java版本,如果版本過低的話需要更新
[es@xxx elasticsearch-7.1.1]# java -version
java version "1.8.0_172"
Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)
- 啟動ES(添加參數(shù)-d迷郑,后臺啟動)
./home/es/elasticsearch/elasticsearch-6.8.0/bin/elasticsearch -d
- 檢查ES節(jié)點是否部署成功
[es@xxx ~]# curl http://127.0.0.1:9200
{
"name" : "node-1",
"cluster_name" : "okami-application",
"cluster_uuid" : "Q00-w01oQT6vsXx7E6KIeA",
"version" : {
"number" : "6.8.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "65b6179",
"build_date" : "2019-05-15T20:06:13.172855Z",
"build_snapshot" : false,
"lucene_version" : "7.7.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
安裝部署其他主機
- 在同一個局域網(wǎng)段內的其他主機按照以上步驟安裝部署ES
檢查集群的部署情況
[es@xxx ~]# curl http://127.0.0.1:9200/_cluster/health
{"cluster_name":"okami-application","status":"green","timed_out":false,"number_of_nodes":3,"number_of_data_nodes":3,"active_primary_shards":0,"active_shards":0,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":0,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":100.0}
安裝中遇到的問題
-
- max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
- 每個進程最大同時打開文件數(shù)太小,可通過下面2個命令查看當前數(shù)量
ulimit -Hn ulimit -Sn
- 修改/etc/security/limits.conf文件创倔,增加配置嗡害,用戶退出后重新登錄生效
* soft nofile 65536 * hard nofile 65536
-
- max number of threads [3818] for user [es] is too low, increase to at least [4096]
- 問題同上,最大線程個數(shù)太低三幻。修改配置文件/etc/security/limits.conf,增加配置
可通過命令查看* soft nproc 4096 * hard nproc 4096
ulimit -Hu ulimit -Su
- max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
- 修改/etc/sysctl.conf文件呐能,增加配置vm.max_map_count=262144
vi /etc/sysctl.conf sysctl -p
- max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
canal-server的安裝
下載canal
- (可以直接下載安裝包念搬,也可以下載源碼自己打包,我們采用直接下載的方式), 已下載的話直接拷貝到安裝目錄即可
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
- 將下載好的文件移動到自定義的安裝路徑
mv canal.deployer-1.1.3.tar.gz /opt/app/canal
解壓
tar zxvf canal.deployer-1.1.3.tar.gz
修改配置文件
- vi /opt/app/canal/canal_server/conf/canal.properties
canal.id = 1 # 每個canal server實例的唯一標識摆出,暫無實際意義
canal.ip = 192.111.112.103 # canal server綁定的本地IP信息朗徊,如果不配置,默認選擇一個本機IP進行啟動服務
canal.port = 11111 # canal server提供socket服務的端口
canal.metrics.pull.port = 11112
canal.zkServers = 192.168.1.111:2181 #canal server鏈接zookeeper集群的鏈接信息
# flush data to zk
canal.zookeeper.flush.period = 1000 #canal持久化數(shù)據(jù)到zookeeper上的更新頻率偎漫,單位毫秒
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = password
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = example_01,example_02 # 當前server上部署的instance列表
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring # 全局配置加載方式
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =
- 配置多個destination爷恳, 需要在conf下創(chuàng)建對應的目錄
mkdir conf/example_01
mkdir conf/example_02
- 在對應的目錄下邊編寫配置文件
instance.properties
canal.instance.mysql.slaveId=99
canal.instance.gtidon=false
# position info
canal.instance.master.address=
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
# username/password
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.defaultDatabaseName=dbName
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=.*\\..*
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
配置說明
mysql鏈接時的起始位置
canal.instance.master.journal.name + canal.instance.master.position : 精確指定一個binlog位點,進行啟動
canal.instance.master.timestamp : 指定一個時間戳象踊,canal會自動遍歷mysql binlog温亲,找到對應時間戳的binlog位點后,進行啟動
不指定任何信息:默認從當前數(shù)據(jù)庫的位點杯矩,進行啟動
-
instance.xml配置文件
- memory-instance.xml: 所有的組件(parser , sink , store)都選擇了內存版模式栈虚,記錄位點的都選擇了memory模式,重啟后又會回到初始位點進行解析
- default-instance.xml: store選擇了內存模式史隆,其余的parser/sink依賴的位點管理選擇了持久化模式魂务,目前持久化的方式主要是寫入zookeeper,保證數(shù)據(jù)集群共享.
- group-instance.xml: 主要針對需要進行多庫合并時泌射,可以將多個物理instance合并為一個邏輯instance粘姜,提供客戶端訪問
-
多個destination配置
- 在canal.properties里邊配置
canal.destinations
, 用英文逗號分隔 - 在conf路徑下創(chuàng)建對應的路徑并添加對應的instance.properties
- 在canal.properties里邊配置
canal.instance.filter.regex的編寫規(guī)則
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打頭的表:canal\\.canal.*
4. canal schema下的一張表:canal.test1
5. 多個規(guī)則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
啟動
-
進入到路徑bin下邊,有幾個腳本
canal.pid # 記錄服務的進程ID restart.sh # 重啟服務 startup.sh # 啟動腳本 stop.sh # 停止服務
運行
./startup.sh
就可以啟動了
查看日志
服務啟動日志(logs/canal/canal.log)
實例運行日志 (logs/example/example.log)
canal-adapter的安裝
下載安裝包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
解壓
tar xzvf canal.adapter-1.1.3.tar.gz
修改配置文件
- 修改conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
zookeeperHosts: 192.111.111.173:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.1.100:3306/test?useUnicode=true
username: username
password: password
defaultDS2:
url: jdbc:mysql://192.168.1.101:3306/test?useUnicode=true
username: username
password: password
canalAdapters:
- instance: example_01
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 192.168.1.110:9300
properties:
cluster.name: okami-application
- instance: example_02
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 192.168.1.111:9300
properties:
cluster.name: okami-application
- 在conf/es/路徑下添加配置文件example_01.yml 和 example_02.yml
vi conf/es/example_01.yml
dataSourceKey: defaultDS
destination: example_01
groupId: g1
esMapping:
_index: indexName
_type: typeName
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
vi conf/es/example_02.yml
dataSourceKey: defaultDS2
destination: example_02
groupId: g1
esMapping:
_index: indexName
_type: typeName
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
配置說明
- 一份數(shù)據(jù)可以被多個group同時消費, 多個group之間會是一個并行執(zhí)行, 一個group內部是一個串行執(zhí)行多個outerAdapters
啟動
-
進入到路徑bin下邊熔酷,有幾個腳本
canal.pid # 記錄服務的進程ID restart.sh # 重啟服務 startup.sh # 啟動腳本 stop.sh # 停止服務
運行
./startup.sh
就可以啟動了
查看日志
tail -f logs/adapter/adapter.log
通過Http請求管理
-
查詢所有訂閱同步的canal instance:
http://112.33.11.124:8081/destinations
[
{
"destination": "example_01",
"status": "on"
},
{
"destination": "example_02",
"status": "on"
}
]
-
數(shù)據(jù)同步開關狀態(tài):
http://112.33.11.124:8081/syncSwitch/example_02
{
"stauts": "off"
}
-
數(shù)據(jù)同步開關
http://112.33.11.124:8081/syncSwitch/example_01/on PUT
{
"code": 20000,
"message": "實例: example_01 開啟同步成功"
}