Debezium SQL Server Source Connector+Kafka+Spark+MySQL 實時數(shù)據(jù)處理

寫在前面

<font color=#00cc66 size=4 face="黑體">前段時間在實時獲取SQLServer數(shù)據(jù)庫變化時候鱼喉,整個過程可謂是坎坷。然后就想在這里記錄一下川背。 </font>

本文的技術(shù)棧: Debezium SQL Server Source Connector+Kafka+Spark+MySQL

ps:后面應(yīng)該會將數(shù)據(jù)放到Kudu上速缆。

然后主要記錄一下窘奏,整個組件使用和組件對接過程中一些注意點和坑。

開始吧

在處理實時數(shù)據(jù)時,需要即時地獲得數(shù)據(jù)庫表中數(shù)據(jù)的變化阀湿,然后將數(shù)據(jù)變化發(fā)送到Kafka中赶熟。不同的數(shù)據(jù)庫有不同的組件進行處理。

常見的MySQL數(shù)據(jù)庫陷嘴,就有比較多的支持 canal 映砖,maxwell等,他們都是類似 MySQL binlog 增量訂閱&消費組件這種模式 罩旋。那么關(guān)于微軟的SQLServer數(shù)據(jù)庫啊央,好像整個開源社區(qū) 支持就沒有那么好了。

1.選擇Connector

Debezium的SQL Server連接器是一種源連接器涨醋,可以獲取SQL Server數(shù)據(jù)庫中現(xiàn)有數(shù)據(jù)的快照瓜饥,然后監(jiān)視和記錄對該數(shù)據(jù)的所有后續(xù)行級更改。每個表的所有事件都記錄在單獨的Kafka Topic中浴骂,應(yīng)用程序和服務(wù)可以輕松使用它們乓土。然后本連接器也是基于MSSQL的change data capture實現(xiàn)。

2.安裝Connector

我參照官方文檔安裝是沒有問題的溯警。

2.1 Installing Confluent Hub Client

Confluent Hub客戶端本地安裝為Confluent Platform的一部分趣苏,位于/ bin目錄中。

Linux

Download and unzip the Confluent Hub tarball.


[root@hadoop001 softs]# ll confluent-hub-client-latest.tar

-rw-r--r--. 1 root root 6909785 9月  24 10:02 confluent-hub-client-latest.tar

[root@hadoop001 softs]# tar confluent-hub-client-latest.tar -C ../app/conn/

[root@hadoop001 softs]# ll ../app/conn/

總用量 6748

drwxr-xr-x. 2 root root      27 9月  24 10:43 bin

-rw-r--r--. 1 root root 6909785 9月  24 10:02 confluent-hub-client-latest.tar

drwxr-xr-x. 3 root root      34 9月  24 10:05 etc

drwxr-xr-x. 2 root root      6 9月  24 10:08 kafka-mssql

drwxr-xr-x. 4 root root      29 9月  24 10:05 share

[root@hadoop001 softs]#

配置bin目錄到系統(tǒng)環(huán)境變量中


export CONN_HOME=/root/app/conn

export PATH=$CONN_HOME/bin:$PATH

確認是否安裝成功


[root@hadoop001 ~]# source /etc/profile

[root@hadoop001 ~]# confluent-hub

usage: confluent-hub <command> [ <args> ]

Commands are:

    help      Display help information

    install  install a component from either Confluent Hub or from a local file

See 'confluent-hub help <command>' for more information on a specific command.

[root@hadoop001 ~]#

2.2 Install the SQL Server Connector

    使用命令confluent-hub

[root@hadoop001 ~]# confluent-hub install debezium/debezium-connector-sqlserver:0.9.4

The component can be installed in any of the following Confluent Platform installations:

  1. / (installed rpm/deb package)

  2. /root/app/conn (where this tool is installed)

Choose one of these to continue the installation (1-2): 2

Do you want to install this into /root/app/conn/share/confluent-hub-components? (yN) n

Specify installation directory: /root/app/conn/share/java/confluent-hub-client

Component's license:

Apache 2.0

https://github.com/debezium/debezium/blob/master/LICENSE.txt

I agree to the software license agreement (yN) y

You are about to install 'debezium-connector-sqlserver' from Debezium Community, as published on Confluent Hub.

Do you want to continue? (yN) y

注意:Specify installation directory:這個安裝目錄最好是你剛才的confluent-hub 目錄下的 /share/java/confluent-hub-client 這個目錄下梯轻。其余的基本操作就好食磕。

3.配置Connector

首先需要對Connector進行配置,配置文件位于 $KAFKA_HOME/config/connect-distributed.properties:


# These are defaults. This file just demonstrates how to override some settings.

# kafka集群地址喳挑,我這里是單節(jié)點多Broker模式

bootstrap.servers=haoop001:9093,hadoop001:9094,hadoop001:9095

# Connector集群的名稱彬伦,同一集群內(nèi)的Connector需要保持此group.id一致

group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

# need to configure these based on the format they want their data in when loaded from or stored into Kafka

# 存儲到kafka的數(shù)據(jù)格式

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will

# 內(nèi)部轉(zhuǎn)換器的格式,針對offsets伊诵、config和status单绑,一般不需要修改

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.

# 用于保存offsets的topic,應(yīng)該有多個partitions曹宴,并且擁有副本(replication)搂橙,主要根據(jù)你的集群實際情況來

# Kafka Connect會自動創(chuàng)建這個topic,但是你可以根據(jù)需要自行創(chuàng)建

offset.storage.topic=connect-offsets-2

offset.storage.replication.factor=3

offset.storage.partitions=1

# 保存connector和task的配置笛坦,應(yīng)該只有1個partition区转,并且有3個副本

config.storage.topic=connect-configs-2

config.storage.replication.factor=3

# 用于保存狀態(tài),可以擁有多個partition和replication

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.

status.storage.topic=connect-status-2

status.storage.replication.factor=3

status.storage.partitions=1

offset.storage.file.filename=/root/data/kafka-logs/offset-storage-file

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

# REST端口號

rest.port=18083

# 保存connectors的路徑

#plugin.path=/root/app/kafka_2.11-0.10.1.1/connectors

plugin.path=/root/app/conn/share/java/confluent-hub-client

4.創(chuàng)建kafka Topic

我這里是單節(jié)點多Broker模式的Kafka,那么創(chuàng)建Topic可以如下:


kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-offsets-2 --replication-factor 3 --partitions 1

kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-configs-2 --replication-factor 3 --partitions 1

kafka-topics.sh --zookeeper hadoop001:2181 --create --topic connect-status-2 --replication-factor 3 --partitions 1

查看狀態(tài) <很重要>


[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-offsets-2

Topic:connect-offsets-2 PartitionCount:1    ReplicationFactor:3 Configs:

Topic: connect-offsets-2    Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2

[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-configs-2

Topic:connect-configs-2 PartitionCount:1    ReplicationFactor:3 Configs:

Topic: connect-configs-2    Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3

[root@hadoop001 ~]# kafka-topics.sh --describe --zookeeper hadoop001:2181 --topic connect-status-2

Topic:connect-status-2  PartitionCount:1    ReplicationFactor:3 Configs:

Topic: connect-status-2 Partition: 0    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2

[root@hadoop001 ~]#

5.開啟SqlServer Change Data Capture(CDC)更改數(shù)據(jù)捕獲

變更數(shù)據(jù)捕獲用于捕獲應(yīng)用到 SQL Server 表中的插入版扩、更新和刪除活動废离,并以易于使用的關(guān)系格式提供這些變更的詳細信息。變更數(shù)據(jù)捕獲所使用的更改表中包含鏡像所跟蹤源表列結(jié)構(gòu)的列资厉,同時還包含了解所發(fā)生的變更所需的元數(shù)據(jù)厅缺。變更數(shù)據(jù)捕獲提供有關(guān)對表和數(shù)據(jù)庫所做的 DML 更改的信息。通過使用變更數(shù)據(jù)捕獲,您無需使用費用高昂的方法湘捎,如用戶觸發(fā)器诀豁、時間戳列和聯(lián)接查詢等。

數(shù)據(jù)變更歷史表會隨著業(yè)務(wù)的持續(xù)窥妇,變得很大舷胜,所以默認情況下,變更數(shù)據(jù)歷史會在本地數(shù)據(jù)庫保留3天(可以通過視圖msdb.dbo.cdc_jobs的字段retention來查詢活翩,當(dāng)然也可以更改對應(yīng)的表來修改保留時間)烹骨,每天會通過SqlServer后臺代理任務(wù),每天晚上2點定時刪除材泄。所以推薦定期的將變更數(shù)據(jù)轉(zhuǎn)移到數(shù)據(jù)倉庫中沮焕。

以下命令基本就夠用了


--查看數(shù)據(jù)庫是否起用CDC

  GO

  SELECT [name], database_id, is_cdc_enabled

  FROM sys.databases     

  GO

--數(shù)據(jù)庫起用CDC

USE test1

GO

EXEC sys.sp_cdc_enable_db

GO

--關(guān)閉數(shù)據(jù)庫CDC

USE test1

go

exec sys.sp_cdc_disable_db

go

--查看表是否啟用CDC

USE test1

GO

SELECT [name], is_tracked_by_cdc

FROM sys.tables

GO

--啟用表的CDC,前提是數(shù)據(jù)庫啟用

USE Demo01

GO

EXEC sys.sp_cdc_enable_table

@source_schema = 'dbo',

@source_name  = 'user',

@capture_instance='user',

@role_name    = NULL

GO

--關(guān)閉表上的CDC功能

USE test1

GO

EXEC sys.sp_cdc_disable_table

@source_schema = 'dbo',

@source_name  = 'user',

@capture_instance='user'

GO

--可能不記得或者不知道開啟了什么表的捕獲,返回所有表的變更捕獲配置信息

EXECUTE sys.sp_cdc_help_change_data_capture;

GO

--查看對某個實例(即表)的哪些列做了捕獲監(jiān)控:

EXEC sys.sp_cdc_get_captured_columns

@capture_instance = 'user'

--查找配置信息 -retention 變更數(shù)據(jù)保留的分鐘數(shù)

SELECT * FROM test1.dbo.cdc_jobs

--更改數(shù)據(jù)保留時間為分鐘

EXECUTE sys.sp_cdc_change_job

@job_type = N'cleanup',

@retention=1440

GO

--停止捕獲作業(yè)

exec sys.sp_cdc_stop_job N'capture'

go

--啟動捕獲作業(yè)

exec sys.sp_cdc_start_job N'capture'

go

6.運行Connector

怎么運行呢?參照


[root@hadoop001 bin]# pwd

/root/app/kafka_2.11-1.1.1/bin

[root@hadoop001 bin]# ./connect-distributed.sh

USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties

[root@hadoop001 bin]#

[root@hadoop001 bin]# ./connect-distributed.sh ../config/connect-distributed.properties

... 這里就會有大量日志輸出

驗證:


[root@hadoop001 ~]# netstat -tanp |grep 18083

tcp6      0      0 :::18083                :::*                    LISTEN      29436/java         

[root@hadoop001 ~]#

6.1 獲取Worker的信息

ps:可能你需要安裝jq這個軟件: yum -y install jq ,當(dāng)然可以在瀏覽器上打開


[root@hadoop001 ~]# curl -s hadoop001:18083 | jq

{

  "version": "1.1.1",

  "commit": "8e07427ffb493498",

  "kafka_cluster_id": "dmUSlNNLQ9OyJiK-bUc6Tw"

}

[root@hadoop001 ~]#

6.2 獲取Worker上已經(jīng)安裝的Connector


[root@hadoop001 ~]# curl -s hadoop001:18083/connector-plugins | jq

[

  {

    "class": "io.debezium.connector.sqlserver.SqlServerConnector",

    "type": "source",

    "version": "0.9.5.Final"

  },

  {

    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

    "type": "sink",

    "version": "1.1.1"

  },

  {

    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",

    "type": "source",

    "version": "1.1.1"

  }

]

[root@hadoop001 ~]#

可以看見io.debezium.connector.sqlserver.SqlServerConnector 這個是我們自己剛才安裝的連接器

6.3 列出當(dāng)前運行的connector(task)


[root@hadoop001 ~]#  curl -s hadoop001:18083/connectors | jq

[]

[root@hadoop001 ~]#

6.4 提交Connector用戶配置 《重點》

當(dāng)提交用戶配置時拉宗,就會啟動一個Connector Task峦树,

Connector Task執(zhí)行實際的作業(yè)。

用戶配置是一個Json文件旦事,同樣通過REST API提交:


curl -s -X POST -H "Content-Type: application/json" --data '{

"name": "connector-mssql-online-1",

"config": {

    "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",

    "tasks.max" : "1",

    "database.server.name" : "test1",

    "database.hostname" : "hadoop001",

    "database.port" : "1433",

    "database.user" : "sa",

    "database.password" : "xxx",

    "database.dbname" : "test1",

    "database.history.kafka.bootstrap.servers" : "hadoop001:9093",

    "database.history.kafka.topic": "test1.t201909262.bak"

    }

}' http://hadoop001:18083/connectors

馬上查看connector當(dāng)前狀態(tài),確保狀態(tài)是RUNNING


[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/status | jq

{

  "name": "connector-mssql-online-1",

  "connector": {

    "state": "RUNNING",

    "worker_id": "xxx:18083"

  },

  "tasks": [

    {

      "state": "RUNNING",

      "id": 0,

      "worker_id": "xxx:18083"

    }

  ],

  "type": "source"

}

[root@hadoop001 ~]#

此時查看Kafka Topic


[root@hadoop001 ~]#  kafka-topics.sh --list --zookeeper hadoop001:2181

__consumer_offsets

connect-configs-2

connect-offsets-2

connect-status-2

#自動生成的Topic魁巩, 記錄表結(jié)構(gòu)的變化,生成規(guī)則:你的connect中自定義的

test1.t201909262.bak

[root@hadoop001 ~]#

再次列出運行的connector(task)


[root@hadoop001 ~]#  curl -s hadoop001:18083/connectors | jq

[

  "connector-mssql-online-1"

]

[root@hadoop001 ~]#

6.5 查看connector的信息


[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1 | jq

{

  "name": "connector-mssql-online-1",

  "config": {

    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",

    "database.user": "sa",

    "database.dbname": "test1",

    "tasks.max": "1",

    "database.hostname": "hadoop001",

    "database.password": "xxx",

    "database.history.kafka.bootstrap.servers": "hadoop001:9093",

    "database.history.kafka.topic": "test1.t201909262.bak",

    "name": "connector-mssql-online-1",

    "database.server.name": "test1",

    "database.port": "1433"

  },

  "tasks": [

    {

      "connector": "connector-mssql-online-1",

      "task": 0

    }

  ],

  "type": "source"

}

[root@hadoop001 ~]#

6.6 查看connector下運行的task信息


[root@hadoop001 ~]# curl -s hadoop001:18083/connectors/connector-mssql-online-1/tasks | jq

[

  {

    "id": {

      "connector": "connector-mssql-online-1",

      "task": 0

    },

    "config": {

      "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",

      "database.user": "sa",

      "database.dbname": "test1",

      "task.class": "io.debezium.connector.sqlserver.SqlServerConnectorTask",

      "tasks.max": "1",

      "database.hostname": "hadoop001",

      "database.password": "xxx",

      "database.history.kafka.bootstrap.servers": "hadoop001:9093",

      "database.history.kafka.topic": "test1.t201909262.bak",

      "name": "connector-mssql-online-1",

      "database.server.name": "test1",

      "database.port": "1433"

    }

  }

]

[root@hadoop001 ~]#

task的配置信息繼承自connector的配置

6.7 暫停/重啟/刪除 Connector


# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/pause

# curl -s -X PUT hadoop001:18083/connectors/connector-mssql-online-1/resume

# curl -s -X DELETE hadoop001:18083/connectors/connector-mssql-online-1

7.從Kafka中讀取變動數(shù)據(jù)


# 記錄表結(jié)構(gòu)的變化姐浮,生成規(guī)則:你的connect中自定義的

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.t201909262.bak --from-beginning

# 記錄數(shù)據(jù)的變化谷遂,生成規(guī)則:test1.dbo.t201909262

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning

這里就是:


kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262 --from-beginning

kafka-console-consumer.sh --bootstrap-server hadoop001:9093 --topic test1.dbo.t201909262

8. 對表進行 DML語句 操作

新增數(shù)據(jù):

然后kafka控制臺也就會馬上打出日志

在這里插入圖片描述

spark 對接kafka 10s一個批次

在這里插入圖片描述

然后就會將這個新增的數(shù)據(jù)插入到MySQL中去

具體的處理邏輯后面再花時間來記錄一下

修改和刪除也是OK的,就不演示了

有任何問題卖鲤,歡迎留言一起交流~~

更多好文:https://blog.csdn.net/liuge36

*參考文章:

https://docs.confluent.io/current/connect/debezium-connect-sqlserver/index.html#sqlserver-source-connector

https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/track-data-changes-sql-server?view=sql-server-2017

https://blog.csdn.net/qq_19518987/article/details/89329464

http://www.tracefact.net/tech/087.html*

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末肾扰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子扫尖,更是在濱河造成了極大的恐慌白对,老刑警劉巖掠廓,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件换怖,死亡現(xiàn)場離奇詭異,居然都是意外死亡蟀瞧,警方通過查閱死者的電腦和手機沉颂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來悦污,“玉大人铸屉,你說我怎么就攤上這事∏卸耍” “怎么了彻坛?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我昌屉,道長钙蒙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任间驮,我火速辦了婚禮躬厌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘竞帽。我一直安慰自己扛施,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布屹篓。 她就那樣靜靜地躺著疙渣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪堆巧。 梳的紋絲不亂的頭發(fā)上昌阿,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天,我揣著相機與錄音恳邀,去河邊找鬼懦冰。 笑死,一個胖子當(dāng)著我的面吹牛谣沸,可吹牛的內(nèi)容都是我干的刷钢。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乳附,長吁一口氣:“原來是場噩夢啊……” “哼内地!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起赋除,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤阱缓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后举农,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荆针,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年颁糟,在試婚紗的時候發(fā)現(xiàn)自己被綠了航背。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡棱貌,死狀恐怖玖媚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情婚脱,我是刑警寧澤今魔,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布勺像,位于F島的核電站,受9級特大地震影響错森,放射性物質(zhì)發(fā)生泄漏咏删。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一问词、第九天 我趴在偏房一處隱蔽的房頂上張望督函。 院中可真熱鬧,春花似錦激挪、人聲如沸辰狡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宛篇。三九已至,卻和暖如春薄湿,著一層夾襖步出監(jiān)牢的瞬間叫倍,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工豺瘤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留吆倦,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓坐求,卻偏偏與公主長得像蚕泽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子桥嗤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容