CDC介紹
關于CDC的介紹可參考 cdc(程序CDC類)_百度百科 (baidu.com),CDC 是change data capture,即變化數(shù)據(jù)捕捉责蝠。是數(shù)據(jù)庫進行備份的一種方式矩欠,常用于大量數(shù)據(jù)的備份工作钮热。分為入侵式的和非入侵式的備份方法髓棋,入侵式的有基于觸發(fā)器備份喧锦、基于時間戳備份昨忆、基于快照備份丁频,非入侵式的備份方法是基于日志的備份。mysql 基于日志的CDC就是要開啟mysql binary log邑贴。
SIDDHI 關于CDC的配置參數(shù)
SIDDHI支持“l(fā)istening”和“polling”兩種模式
listening支持 'INSERT', 'UPDATE'和'DELETE'三種模式席里,可通過operation參數(shù)進行配置。只支持MYSQL數(shù)據(jù)庫
可使用的參數(shù)有:
url:數(shù)據(jù)庫連接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8拢驾; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
mode:CDC模式(listening)
username:數(shù)據(jù)庫用戶名奖磁,要有'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE'和'REPLICATION CLIENT' 權限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'user'@'%';
password:數(shù)據(jù)庫用戶密碼
table.name:要監(jiān)聽的表名
operation:支持 'INSERT', 'UPDATE'和'DELETE'
pool.properties:數(shù)據(jù)庫連接的池參數(shù)可以指定為鍵值對。
connector.properties:在這里繁疤,您可以將Debezium連接器屬性指定為逗號分隔的字符串咖为。
database.server.id加入MySQL數(shù)據(jù)庫集群讀取bin日志時使用的ID。這應該是一個介于1到2^32之間的唯一整數(shù)稠腊。
database.server.name:標識數(shù)據(jù)庫服務器并為其提供名稱空間的邏輯名稱
polling 支持insert案疲、update ; 支持多種數(shù)據(jù)庫
mode:CDC模式(polling )
datasource.name:SIDDHI配置文件內配置的數(shù)據(jù)庫源
url:數(shù)據(jù)庫連接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
username:數(shù)據(jù)庫用戶名麻养,要有'SELECT' 權限
password:數(shù)據(jù)庫用戶密碼
polling.column:要監(jiān)聽的字段
polling.interval:呆監(jiān)聽的記錄數(shù)褐啡,默認為1
table.name:要監(jiān)聽的表名
wait.on.missed.record:指示進程是否需要等待丟失/無序記錄。
當此標志設置為“true”時鳖昌,一旦識別出丟失的記錄备畦,流程將被保留低飒。缺少的recrod由polling.column值的順序標識。這只能用于數(shù)字字段懂盐,不建議用于時間值褥赊,因為它不是連續(xù)的。
僅當記錄可能無序寫入時(例如并發(fā)寫入器)莉恼,才應啟用此功能拌喉,因為這會降低性能。
missed.record.waiting.timeout:重試丟失/無序記錄的超時時間(以秒為單位)俐银。這應該與wait.on.missed.record參數(shù)一起使用尿背。如果未設置參數(shù),進程將無限期地等待丟失的記錄捶惜。
數(shù)據(jù)庫CDC配置
Mysql 5.7.X以后版本支持田藐,默認不支持CDC,需要配置指定參數(shù)吱七。8.X.X以后配置默認支持
show VARIABLES like '%log%':查看日志配置參數(shù)
server_id:數(shù)據(jù)庫ID汽久,單機模式下設置 server_id=1,集群模式下不同節(jié)點的server_id必須不同
log_bin:指定binlog文件名和儲存位置踊餐,可設置為log_bin=mysql-bin
binlog_format:binlog格式景醇。有3個值可以選擇:ROW:記錄哪條數(shù)據(jù)被修改和修改之后的數(shù)據(jù),會產生大量日志吝岭。STATEMENT:記錄修改數(shù)據(jù)的SQL三痰,日志量較小。MIXED:混合使用上述兩個模式苍碟。CDC要求必須配置為ROW。binlog_format=row
expire_logs_days:bin_log過期時間撮执,超過該時間的log會自動刪除微峰。這里設置為7天,expire_logs_days=7
binlog_do_db:binlog記錄哪些數(shù)據(jù)庫抒钱。如果需要配置多個庫蜓肆,如例子中配置多項。切勿使用逗號分隔谋币。單例數(shù)據(jù)庫可不設置仗扬,集群模式需要配置多條
binlog_do_db=db_a
binlog_do_db=db_b
Oracle,默認是開啟的蕾额。
SIDDHI驅動程序早芭,試了很多版本,建議使用siddhi-io-cdc-2.0.3.jar 诅蝶,踩了很多坑退个。程序默認有2.0.5募壕,2.0.10多多少少都有問題。
mysql數(shù)據(jù)源配置實例
- name: mysql
description: The datasource used for mysql
jndiConfig:
name: jdbc/mysql
useJndiReference: true
definition:
type: RDBMS
configuration:
jdbcUrl: 'jdbc:mysql://192.168.1.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8'
username: XXXXX
password: XXXXX
driverClassName: com.mysql.cj.jdbc.Driver
maxPoolSize: 20
idleTimeout: 60000
connectionTestQuery: SELECT 1 FROM DUAL
validationTimeout: 30000
isAutoCommit: false
oracle數(shù)據(jù)源配置實例
- name: test
description: The datasource used for test
jndiConfig:
name: jdbc/test
useJndiReference: true
definition:
type: RDBMS
configuration:
jdbcUrl: 'jdbc:oracle:thin:@192.168.1.10:1521/ORCL'
username: XXXXX
password: XXXXX
driverClassName: oracle.jdbc.driver.OracleDriver
maxPoolSize: 20
idleTimeout: 60000
connectionTestQuery: SELECT 1 FROM DUAL
validationTimeout: 30000
isAutoCommit: false
mysql程序開發(fā)實例
@App:name("test_mysql_cdc")
@App:description("Description of the plan")
-- @source(type = 'cdc' , url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
-- username = 'XXXX', password = 'XXXX',
-- table.name = 'person', operation = 'insert',
-- @map(type='keyvalue', @attributes(id = 'ID', name = 'Name', age = 'Age')))
-- define stream inputStream (id int, name string, age int);
@source(type = 'cdc', mode='polling', polling.column = 'ID',
polling.interval = '1',
jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
username = 'admin', password = '123456',
table.name = 'person',
@map(type='keyvalue'))
define stream pollingStream (id string, name string, age string);
-- from inputStream#log()
-- select *
-- insert into testString;
from pollingStream#log()
select *
insert into testPolling;
oracle程序開發(fā)實例
@source(type = 'cdc',
datasource.name = 'test',
url = '',
mode = 'polling',
polling.column = 'name',
polling.interval = '1',
username = '',
password = '',
table.name = 'user',
@map(type = 'keyvalue',fail.on.missing.attribute="false" ))