寫在前面
<font color=#00cc66 size=4 face="黑體">前段時間在實時獲取SQLServer數(shù)據(jù)庫變化時候鱼喉,整個過程可謂是坎坷。然后就想在這里記錄一下川背。 </font>
本文的技術(shù)棧: Debezium SQL Server Source Connector+Kafka+Spark+MySQL
ps:后面應(yīng)該會將數(shù)據(jù)放到Kudu上速缆。
然后主要記錄一下窘奏,整個組件使用和組件對接過程中一些注意點和坑。
開始吧
在處理實時數(shù)據(jù)時,需要即時地獲得數(shù)據(jù)庫表中數(shù)據(jù)的變化阀湿,然后將數(shù)據(jù)變化發(fā)送到Kafka中赶熟。不同的數(shù)據(jù)庫有不同的組件進行處理。
常見的MySQL數(shù)據(jù)庫陷嘴,就有比較多的支持 canal 映砖,maxwell等,他們都是類似 MySQL binlog 增量訂閱&消費組件這種模式 罩旋。那么關(guān)于微軟的SQLServer數(shù)據(jù)庫啊央,好像整個開源社區(qū) 支持就沒有那么好了。
1.選擇Connector
Debezium的SQL Server連接器是一種源連接器涨醋,可以獲取SQL Server數(shù)據(jù)庫中現(xiàn)有數(shù)據(jù)的快照瓜饥,然后監(jiān)視和記錄對該數(shù)據(jù)的所有后續(xù)行級更改。每個表的所有事件都記錄在單獨的Kafka Topic中浴骂,應(yīng)用程序和服務(wù)可以輕松使用它們乓土。然后本連接器也是基于MSSQL的change data capture實現(xiàn)。
2.安裝Connector
我參照官方文檔安裝是沒有問題的溯警。
2.1 Installing Confluent Hub Client
Confluent Hub客戶端本地安裝為Confluent Platform的一部分趣苏,位于/ bin目錄中。
Linux
Download and unzip the Confluent Hub tarball.
[root@hadoop001 softs]# ll confluent-hub-client-latest.tar
-rw-r--r--. 1 root root 6909785 9月 24 10:02 confluent-hub-client-latest.tar
[root@hadoop001 softs]# tar confluent-hub-client-latest.tar -C ../app/conn/
[root@hadoop001 softs]# ll ../app/conn/
總用量 6748
drwxr-xr-x. 2 root root 27 9月 24 10:43 bin
-rw-r--r--. 1 root root 6909785 9月 24 10:02 confluent-hub-client-latest.tar
drwxr-xr-x. 3 root root 34 9月 24 10:05 etc
drwxr-xr-x. 2 root root 6 9月 24 10:08 kafka-mssql
drwxr-xr-x. 4 root root 29 9月 24 10:05 share
[root@hadoop001 softs]#
配置bin目錄到系統(tǒng)環(huán)境變量中
export CONN_HOME=/root/app/conn
export PATH=$CONN_HOME/bin:$PATH
確認是否安裝成功
[root@hadoop001 ~]# source /etc/profile
[root@hadoop001 ~]# confluent-hub
usage: confluent-hub <command> [ <args> ]
Commands are:
help Display help information
install install a component from either Confluent Hub or from a local file
See 'confluent-hub help <command>' for more information on a specific command.
[root@hadoop001 ~]#
2.2 Install the SQL Server Connector
使用命令confluent-hub
[root@hadoop001 ~]# confluent-hub install debezium/debezium-connector-sqlserver:0.9.4
The component can be installed in any of the following Confluent Platform installations:
1. / (installed rpm/deb package)
2. /root/app/conn (where this tool is installed)
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /root/app/conn/share/confluent-hub-components? (yN) n
Specify installation directory: /root/app/conn/share/java/confluent-hub-client
Component's license:
Apache 2.0
https://github.com/debezium/debezium/blob/master/LICENSE.txt
I agree to the software license agreement (yN) y
You are about to install 'debezium-connector-sqlserver' from Debezium Community, as published on Confluent Hub.
Do you want to continue? (yN) y
注意:Specify installation directory:這個安裝目錄最好是你剛才的confluent-hub 目錄下的 /share/java/confluent-hub-client 這個目錄下梯轻。其余的基本操作就好食磕。
3.配置Connector
首先需要對Connector進行配置,配置文件位于 $KAFKA_HOME/config/connect-distributed.properties:
# These are defaults. This file just demonstrates how to override some settings.
# kafka集群地址喳挑,我這里是單節(jié)點多Broker模式
bootstrap.servers=haoop001:9093,hadoop001:9094,hadoop001:9095
# Connector集群的名稱彬伦,同一集群內(nèi)的Connector需要保持此group.id一致
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# 存儲到kafka的數(shù)據(jù)格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# 內(nèi)部轉(zhuǎn)換器的格式,針對offsets伊诵、config和status单绑,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Topic to use for storing offsets. This topic should have many partitions and be replicated.
# 用于保存offsets的topic,應(yīng)該有多個partitions曹宴,并且擁有副本(replication)搂橙,主要根據(jù)你的集群實際情況來
# Kafka Connect會自動創(chuàng)建這個topic,但是你可以根據(jù)需要自行創(chuàng)建
offset.storage.topic=connect-offsets-2
offset.storage.replication.factor=3
offset.storage.partitions=1
# 保存connector和task的配置笛坦,應(yīng)該只有1個partition区转,并且有3個副本
config.storage.topic=connect-configs-2
config.storage.replication.factor=3
# 用于保存狀態(tài),可以擁有多個partition和replication
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status-2
status.storage.replication.factor=3
status.storage.partitions=1
offset.storage.file.filename=/root/data/kafka-logs/offset-storage-file
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# REST端口號
rest.port=18083
# 保存connectors的路徑
#plugin.path=/root/app/kafka_2.11-0.10.1.1/connectors
plugin.path=/root/app/conn/share/java/confluent-hub-client
4.創(chuàng)建kafka Topic
我這里是單節(jié)點多Broker模式的Kafka,那么創(chuàng)建Topic可以如下:
kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-offsets-2 --replication-factor 3 --partitions 1
kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-configs-2 --replication-factor 3 --partitions 1
kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-status-2 --replication-factor 3 --partitions 1
查看狀態(tài) <很重要>
[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-offsets-2
Topic:connect-offsets-2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: connect-offsets-2 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-configs-2
Topic:connect-configs-2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: connect-configs-2 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-status-2
Topic:connect-status-2 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: connect-status-2 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
[root@hadoop001 ~]#
5.開啟SqlServer Change Data Capture(CDC)更改數(shù)據(jù)捕獲
變更數(shù)據(jù)捕獲用于捕獲應(yīng)用到 SQL Server 表中的插入版扩、更新和刪除活動废离,并以易于使用的關(guān)系格式提供這些變更的詳細信息。變更數(shù)據(jù)捕獲所使用的更改表中包含鏡像所跟蹤源表列結(jié)構(gòu)的列资厉,同時還包含了解所發(fā)生的變更所需的元數(shù)據(jù)厅缺。變更數(shù)據(jù)捕獲提供有關(guān)對表和數(shù)據(jù)庫所做的 DML 更改的信息。通過使用變更數(shù)據(jù)捕獲,您無需使用費用高昂的方法湘捎,如用戶觸發(fā)器诀豁、時間戳列和聯(lián)接查詢等。
數(shù)據(jù)變更歷史表會隨著業(yè)務(wù)的持續(xù)窥妇,變得很大舷胜,所以默認情況下,變更數(shù)據(jù)歷史會在本地數(shù)據(jù)庫保留3天(可以通過視圖msdb.dbo.cdc_jobs的字段retention來查詢活翩,當(dāng)然也可以更改對應(yīng)的表來修改保留時間)烹骨,每天會通過SqlServer后臺代理任務(wù),每天晚上2點定時刪除材泄。所以推薦定期的將變更數(shù)據(jù)轉(zhuǎn)移到數(shù)據(jù)倉庫中沮焕。
以下命令基本就夠用了
--查看數(shù)據(jù)庫是否起用CDC
GO
SELECT [name], database_id, is_cdc_enabled
FROM sys.databases
GO
--數(shù)據(jù)庫起用CDC
USE test1
GO
EXEC sys.sp_cdc_enable_db
GO
--關(guān)閉數(shù)據(jù)庫CDC
USE test1
go
exec sys.sp_cdc_disable_db
go
--查看表是否啟用CDC
USE test1
GO
SELECT [name], is_tracked_by_cdc
FROM sys.tables
GO
--啟用表的CDC,前提是數(shù)據(jù)庫啟用
USE Demo01
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'user',
@capture_instance='user',
@role_name = NULL
GO
--關(guān)閉表上的CDC功能
USE test1
GO
EXEC sys.sp_cdc_disable_table
@source_schema = 'dbo',
@source_name = 'user',
@capture_instance='user'
GO
--可能不記得或者不知道開啟了什么表的捕獲,返回所有表的變更捕獲配置信息
EXECUTE sys.sp_cdc_help_change_data_capture;
GO
--查看對某個實例(即表)的哪些列做了捕獲監(jiān)控:
EXEC sys.sp_cdc_get_captured_columns
@capture_instance = 'user'
--查找配置信息 -retention 變更數(shù)據(jù)保留的分鐘數(shù)
SELECT * FROM test1.dbo.cdc_jobs
--更改數(shù)據(jù)保留時間為分鐘
EXECUTE sys.sp_cdc_change_job
@job_type = N'cleanup',
@retention=1440
GO
--停止捕獲作業(yè)
exec sys.sp_cdc_stop_job N'capture'
go
--啟動捕獲作業(yè)
exec sys.sp_cdc_start_job N'capture'
go
6.運行Connector
怎么運行呢?參照
[root@hadoop001 bin]# pwd
/root/app/kafka_2.11-1.1.1/bin
[root@hadoop001 bin]# ./connect-distributed.sh
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties
[root@hadoop001 bin]#
[root@hadoop001 bin]# ./connect-distributed.sh ../config/connect-distributed.properties
... 這里就會有大量日志輸出
驗證:
[root@hadoop001 ~]# netstat -tanp |grep 18083
tcp6 0 0 :::18083 :::* LISTEN 29436/java
[root@hadoop001 ~]#
6.1 獲取Worker的信息
ps:可能你需要安裝jq這個軟件: yum -y install jq ,當(dāng)然可以在瀏覽器上打開
[root@hadoop001 ~]# curl -s hadoop001:18083 | jq
{
"version": "1.1.1",
"commit": "8e07427ffb493498",
"kafka_cluster_id": "dmUSlNNLQ9OyJiK-bUc6Tw"
}
[root@hadoop001 ~]#
6.2 獲取Worker上已經(jīng)安裝的Connector
[root@hadoop001 ~]# curl -s hadoop001:18083/connector-plugins | jq
[
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "0.9.5.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "1.1.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "1.1.1"
}
]
[root@hadoop001 ~]#
可以看見io.debezium.connector.sqlserver.SqlServerConnector 這個是我們自己剛才安裝的連接器
6.3 列出當(dāng)前運行的connector(task)
[root@hadoop001 ~]# curl -s hadoop001:18083/connectors | jq
[]
[root@hadoop001 ~]#
6.4 提交Connector用戶配置 《重點》
當(dāng)提交用戶配置時拉宗,就會啟動一個Connector Task峦树,
Connector Task執(zhí)行實際的作業(yè)。
用戶配置是一個Json文件旦事,同樣通過REST API提交:
curl -s -X POST -H "Content-Type: application/json" --data '{
"name": "connector-mssql-online-1",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "test1",
"database.hostname" : "hadoop001",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "xxx",
"database.dbname" : "test1",
"database.history.kafka.bootstrap.servers" : "hadoop001:9093",
"database.history.kafka.topic": "test1.t201909262.bak"
}
}' http://hadoop001:18083/connectors
馬上查看connector當(dāng)前狀態(tài),確保狀態(tài)是RUNNING
[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/status | jq
{
"name": "connector-mssql-online-1",
"connector": {
"state": "RUNNING",
"worker_id": "xxx:18083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "xxx:18083"
}
],
"type": "source"
}
[root@hadoop001 ~]#
此時查看Kafka Topic
[root@hadoop001 ~]# kafka-topics.sh --list --zookeeper hadoop001:2181
__consumer_offsets
connect-configs-2
connect-offsets-2
connect-status-2
#自動生成的Topic魁巩, 記錄表結(jié)構(gòu)的變化,生成規(guī)則:你的connect中自定義的
test1.t201909262.bak
[root@hadoop001 ~]#
再次列出運行的connector(task)
[root@hadoop001 ~]# curl -s hadoop001:18083/connectors | jq
[
"connector-mssql-online-1"
]
[root@hadoop001 ~]#
6.5 查看connector的信息
[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1 | jq
{
"name": "connector-mssql-online-1",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.user": "sa",
"database.dbname": "test1",
"tasks.max": "1",
"database.hostname": "hadoop001",
"database.password": "xxx",
"database.history.kafka.bootstrap.servers": "hadoop001:9093",
"database.history.kafka.topic": "test1.t201909262.bak",
"name": "connector-mssql-online-1",
"database.server.name": "test1",
"database.port": "1433"
},
"tasks": [
{
"connector": "connector-mssql-online-1",
"task": 0
}
],
"type": "source"
}
[root@hadoop001 ~]#
6.6 查看connector下運行的task信息
[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/tasks | jq
[
{
"id": {
"connector": "connector-mssql-online-1",
"task": 0
},
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.user": "sa",
"database.dbname": "test1",
"task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",
"tasks.max": "1",
"database.hostname": "hadoop001",
"database.password": "xxx",
"database.history.kafka.bootstrap.servers": "hadoop001:9093",
"database.history.kafka.topic": "test1.t201909262.bak",
"name": "connector-mssql-online-1",
"database.server.name": "test1",
"database.port": "1433"
}
}
]
[root@hadoop001 ~]#
task的配置信息繼承自connector的配置
6.7 暫停/重啟/刪除 Connector
# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/pause
# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/resume
# curl -s -X DELETE hadoop001:18083/connectors/connector-mssql-online-1
7.從Kafka中讀取變動數(shù)據(jù)
# 記錄表結(jié)構(gòu)的變化姐浮,生成規(guī)則:你的connect中自定義的
kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.t201909262.bak --from-beginning
# 記錄數(shù)據(jù)的變化谷遂,生成規(guī)則:test1.dbo.t201909262
kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning
這里就是:
kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning
kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262
8. 對表進行 DML語句 操作
新增數(shù)據(jù):
然后kafka控制臺也就會馬上打出日志
spark 對接kafka 10s一個批次
然后就會將這個新增的數(shù)據(jù)插入到MySQL中去
具體的處理邏輯后面再花時間來記錄一下
修改和刪除也是OK的,就不演示了
有任何問題卖鲤,歡迎留言一起交流~~
更多好文:https://blog.csdn.net/liuge36
*參考文章: