使用Debezium、Postgres和Kafka進(jìn)行數(shù)據(jù)實時采集(CDC)
碼匠君
9小時前
1. 背景
一直在完善自己的微服務(wù)架構(gòu)偿乖,其中包含分布式工作流服務(wù)的建設(shè)冶伞,目前采用的是Camunda工作流引擎。使用Camunda工作流筑凫,就會涉及到工作流引擎的用戶體系如何與現(xiàn)有用戶體系集成的問題(Flowable、Activity也類似)〔⒋澹現(xiàn)有設(shè)計中巍实,工作流定位偏重于企業(yè)內(nèi)部流程的流轉(zhuǎn),因此系統(tǒng)中設(shè)計了單位哩牍、部門棚潦、人員以及人事歸屬與Camunda工作流用戶體系對應(yīng)。
功能設(shè)計完成膝昆,就面臨另外一個問題丸边,如何解決現(xiàn)有人事體系數(shù)據(jù)如何【`實時`】同步至Camunda工作流引擎中。如果現(xiàn)有體系數(shù)據(jù)與工作流數(shù)據(jù)在同一個庫中荚孵,相對比較好解決妹窖。而微服務(wù)架構(gòu)中,不同服務(wù)的數(shù)據(jù)通常存放在不同數(shù)據(jù)庫中收叶,那么就需要進(jìn)行數(shù)據(jù)的同步骄呼。采用的方式不同,可以取得的效果也相同判没。
最初考慮如下兩種方案蜓萄,但是都略感不足:
ETL:使用ETL工具進(jìn)行數(shù)據(jù)同步是典型的方式,可以選擇工具也比較多澄峰。開源的ETL工具增量同步問題解決的并不理想绕德,不使用增量同步數(shù)那么數(shù)據(jù)同步始終存在時間差;商業(yè)的ETL工具增量同步解決的比較好摊阀,但是龐大且昂貴。
消息隊列:消息隊列是多系統(tǒng)集成普遍采用的方式,可以很好地解決數(shù)據(jù)同步的實時問題胞此。但是數(shù)據(jù)同步的兩端都需要自己編寫代碼臣咖,一端寫生產(chǎn)代碼一端寫消費(fèi)代碼,生產(chǎn)端代碼還要捆綁現(xiàn)有體系數(shù)據(jù)所有操作漱牵,需要的編寫量比較大夺蛇。
查詢對比的大量的資料,最終選擇了Debezimu來解決以上問題以及未來更多數(shù)據(jù)同步的問題酣胀。
2. Debezium介紹
RedHat開源的Debezium是一個將多種數(shù)據(jù)源實時變更數(shù)據(jù)捕獲刁赦,形成數(shù)據(jù)流輸出的開源工具。
它是一種CDC(Change Data Capture)工具闻镶,工作原理類似大家所熟知的Canal, DataBus, Maxwell等甚脉,是通過抽取數(shù)據(jù)庫日志來獲取變更的。
官方介紹為:
Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong
Debezium是一個分布式平臺铆农,它將您現(xiàn)有的數(shù)據(jù)庫轉(zhuǎn)換為事件流牺氨,因此應(yīng)用程序可以看到數(shù)據(jù)庫中的每一個行級更改并立即做出響應(yīng)。Debezium構(gòu)建在Apache KAFKA之上墩剖,并提供Kafka連接兼容的連接器來監(jiān)視特定的數(shù)據(jù)庫管理系統(tǒng)猴凹。
Debezium現(xiàn)在已支持以下數(shù)據(jù)庫:
MySQL
MongoDB
PostgreSQL
Oracle
SQL Server
Db2
Cassandra
Vitess
與ETL不同,Debezimu只支持在生產(chǎn)端連接數(shù)據(jù)庫岭皂,消費(fèi)端不支持連接數(shù)據(jù)庫郊霎,而是需要自己編寫代碼接收Kafka消息數(shù)據(jù)。分析下來這種方式更加靈活爷绘,還可以很好利用現(xiàn)有微服務(wù)架構(gòu)中的Kafka书劝。
3. 快速搭建Debezimu測試環(huán)境。
目前揉阎,Debezium最新的Stable版本是1.6庄撮。 Debezium已經(jīng)把要用到的Component打包成了Docker的Image,因此毙籽,我們只需要安裝并啟動Docker后就可以按下面的步驟快速搭建測試環(huán)境了洞斯。
3.1 運(yùn)行Zookeeper
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6
3.2 運(yùn)行Kafka
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6
3.3 運(yùn)行PostgreSQL
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6
上面代碼中使用的是:
debezium/example-postgres:1.6,查看Debezimu官方文檔以及其它示例都是這個坑赡。實際上Debezimu對postgresql 9~13都進(jìn)行了Docker封裝烙如,可以根據(jù)自己的需要在Docker Hub中選擇相應(yīng)的PostgreSQL版本。
debezium/postgres很小毅否,使用也比較方便亚铁,而且也進(jìn)行了必要的設(shè)置,無須再進(jìn)行額外的配置就可以直接使用螟加。
3.4 運(yùn)行Debezimu Connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6
Debezium的container啟動時需要傳入如下環(huán)境變量:
GROUP_ID: 分組ID徘溢,若需要啟動多個Debezium的實例組成集群吞琐,那么它們的GROUP_ID必須被設(shè)置為一樣
CONFIG_STORAGE_TOPIC:下面需要調(diào)用Debezium提供的RestFUL API管理connector,connector的信息就是保存在CONFIG_STORAGE_TOPIC指定的kafka topic下然爆。
OFFSET_STORAGE_TOPIC: connector監(jiān)控數(shù)據(jù)流的offset站粟,若我們使用的是PostgreSQL Connector,那么OFFSET_STORAGE_TOPIC指定的topic中存的就是PostgreSQL的lsn曾雕。
3.5 創(chuàng)建Connector
經(jīng)過上面4個步驟后奴烙,Debezium的測試環(huán)境就搭建好了,現(xiàn)在需要調(diào)用Debezium提供的API創(chuàng)建connector剖张,使Debezium與數(shù)據(jù)庫之間建立關(guān)系切诀。我們把下面的payload POST到`http://<ip addr of debezium>:8083/connectors/`。
{
? "name": "fulfillment-connector",?
? "config": {
? ? "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
? ? "database.hostname": "192.168.99.100",
? ? "database.port": "5432",
? ? "database.user": "postgres",
? ? "database.password": "postgres",
? ? "database.dbname" : "postgres",
? ? "database.server.name": "fulfillment",
? ? "table.include.list": "public.inventory"
? }
}
"name":注冊到Kafka Connect服務(wù)的Connector名稱
"connector.class":PostgreSQL connector class名稱
"database.hostname":PostgreSQL 數(shù)據(jù)庫地址
"database.port":PostgreSQL 數(shù)據(jù)庫端口
"database.user":PostgreSQL 數(shù)據(jù)庫用戶名
"database.password":PostgreSQL數(shù)據(jù)密碼
"database.dbname":連接的PostgreSQL數(shù)據(jù)庫
"database.server.name":虛擬的數(shù)據(jù)庫Server名稱搔弄,可以根據(jù)實際需求定義幅虑,消費(fèi)Kafka數(shù)據(jù)時要使用該值
"table.include.list":監(jiān)聽的數(shù)據(jù)表列表,以","分割肯污。PostgreSQL要將表名寫全翘单,格式"<Schema-name>.<table-name>"。如果沒有特定的Schema蹦渣,那么就是默認(rèn)的`public`
下面為完成的curl命令:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.99.100", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres",? ? "database.server.name": "fulfillment", "table.include.list": "public.inventory" }}'
上面是示例哄芜,因為使用的是Windows,個人覺得curl不方便柬唯,換用postman:
3.6 Docker Compose 配置
為了方便使用认臊,將以上Docker命令整合為Docker Compose配置,具體如下:
version: "3"
services:
? postgres:
? ? image: debezium/postgres:13
? ? container_name: postgres
? ? hostname: postgres
? ? environment:
? ? ? POSTGRES_USER: herodotus
? ? ? POSTGRES_PASSWORD: herodotus
? ? ports:
? ? ? - 5432:5432
? zookeeper:
? ? image: debezium/zookeeper:1.6
? ? container_name: zookeeper
? ? restart: always
? ? ports:
? ? ? - 2181:2181
? ? ? - 2888:2888
? ? ? - 3888:3888
? kafka:
? ? image: debezium/kafka:1.6
? ? container_name: kafka
? ? restart: always
? ? ports:
? ? ? - 9092:9092
? ? environment:
? ? ? ZOOKEEPER_CONNECT: zookeeper:2181
? ? ? BOOTSTRAP_SERVERS: kafka:9092
? ? depends_on:
? ? ? - zookeeper
? connect:
? ? image: debezium/connect:1.6
? ? container_name: connect
? ? restart: always
? ? ports:
? ? ? - 8083:8083
? ? environment:
? ? ? GROUP_ID: 1
? ? ? CONFIG_STORAGE_TOPIC: herodotus_connect_configs
? ? ? OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
? ? ? STATUS_STORAGE_TOPIC: herodotus_connect_statuses
? ? ? BOOTSTRAP_SERVERS: kafka:9092
? ? depends_on:
? ? ? - kafka
4. 外部數(shù)據(jù)庫配置
上一章節(jié)锄奢,介紹了Debezimu測試環(huán)境的方式失晴,其中使用的debezium/postgres是已經(jīng)進(jìn)行過配置的,所以使用起來比較方便拘央。在實際使用過程中涂屁,很多時候是使用獨(dú)立搭建PostgreSQL,那么就需要對PostgreSQL進(jìn)行配置灰伟。
4.1 以Docker的方式運(yùn)行基礎(chǔ)組件
本章節(jié)主要介紹Debezimu與獨(dú)立的PostgreSQL數(shù)據(jù)庫連接拆又,因此除了PostgreSQL以外,Zookeeper栏账、Kafka帖族、Debezimu Connect仍舊使用Docker方式部署。具體部署的Docker Compose配置如下:
version: "3"
services:
? zookeeper:
? ? image: debezium/zookeeper:1.6
? ? container_name: zookeeper
? ? hostname: zookeeper
? ? environment:
? ? ? ZOOKEEPER_SERVER_ID: 1
? ? ports:
? ? ? - 2181:2181
? ? ? - 2888:2888
? ? ? - 3888:3888
? kafka:
? ? image: debezium/kafka:1.6
? ? container_name: kafka
? ? hostname: kafka
? ? ports:
? ? ? - 9092:9092
? ? environment:
? ? ? BROKER_ID: 1
? ? ? ZOOKEEPER_CONNECT: zookeeper:2181
? ? ? KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092
? ? ? KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092
? ? ? KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT
? ? ? KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER
? ? ? KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'
? ? ? KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
? ? depends_on:
? ? ? - zookeeper
? connect:
? ? image: debezium/connect:1.6
? ? container_name: connect
? ? hostname: connect
? ? ports:
? ? ? - 8083:8083
? ? environment:
? ? ? GROUP_ID: 1
? ? ? CONFIG_STORAGE_TOPIC: herodotus_connect_configs
? ? ? OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
? ? ? STATUS_STORAGE_TOPIC: herodotus_connect_statuses
? ? ? BOOTSTRAP_SERVERS: kafka:9092
? ? depends_on:
? ? ? - kafka
其中Kafka Listener相關(guān)的配置挡爵,是為了解決Spring Kafka連接Kafka會出現(xiàn):`Connection to node -1 could not be established. Broker may not be available.`問題竖般。
4.2 修改PostgreSQL配置
Logical Decoding功能是PostgreSQL在9.4加入的,它是一種機(jī)制茶鹃,允許提取提交到事務(wù)日志的更改涣雕,并在輸出插件的幫助下以用戶友好的方式處理這些更改艰亮。輸出插件使客戶機(jī)能夠使用更改。
PostgreSQL connector 讀取和處理數(shù)據(jù)庫變化主要包含兩個部分:
Logical Decoding 輸出插件:根據(jù)選擇可能需要安裝輸出插件胞谭。運(yùn)行PostgreSQL服務(wù)之前垃杖,必須配置`replication slot `來啟用你所選擇的輸出插件,有以下幾個輸出插件供選擇:
decoderbufs: 是基于`Protobuf`的丈屹,目前由Debezimu社區(qū)維護(hù)
wal2json :是基于`JSON`的,目前由wal2json社區(qū)維護(hù)
pgoutput:在PostgreSQL 10及以上版本中是標(biāo)準(zhǔn)的Logical Decoding 輸出插件伶棒。是由PostgreSQL社區(qū)維護(hù)旺垒,由PostgreSQL自己用于Logical Replication。這個插件是內(nèi)置安裝的肤无,所以不需要額外安裝先蒋。
Java代碼(就是連接Kafka Connect的代碼):負(fù)責(zé)讀取由Logical Decoding 輸出插件產(chǎn)生的數(shù)據(jù)。
Logical Decoding 輸出插件不支持DDL變更宛渐,這意味著Connector不能把DDL變更事件發(fā)送給消費(fèi)者
Logical Decoding Replicaiton Slots支持?jǐn)?shù)據(jù)庫的`primary`服務(wù)器竞漾。因此如果是PostgreSQL服務(wù)的集群,Connector只能在`primary`服務(wù)器激活窥翩。如果`primary`服務(wù)器出現(xiàn)問題业岁,那么connector就會停掉。
4.2.1 修改PostgreSQL配置
在${PostgreSQL_HOME}/13/data目錄下寇蚊,找到postgresql.conf笔时。
修改以下配置:
wal_level=logical
max_wal_senders=1
max_replication_slots=1
wal_level:通知數(shù)據(jù)庫使用 logical decoding 讀取預(yù)寫日志
max_wal_senders: 通知數(shù)據(jù)庫獨(dú)立處理WAL變更的獨(dú)立進(jìn)程數(shù)量
max_replication_slots: 通知數(shù)據(jù)庫處理WAL變更流所允許最大replication slots數(shù)目
配置完成后記得重啟數(shù)據(jù)庫
4.2.2 設(shè)置數(shù)據(jù)庫權(quán)限
需要給PostgreSQL 用戶分配replication權(quán)限。定義一個PostgreSQL role仗岸,至少分配REPLICATION和LOGION兩項權(quán)限允耿,示例代碼如下:
CREATE ROLE <name> REPLICATION LOGIN;
具體操作可以參考以下腳本:
-- pg新建用戶
CREATE USER user WITH PASSWORD 'pwd';
-- 給用戶復(fù)制流權(quán)限
ALTER ROLE user replication;
-- 給用戶登錄數(shù)據(jù)庫權(quán)限
grant CONNECT ON DATABASE test to user;
-- 把當(dāng)前庫public下所有表查詢權(quán)限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
4.3 創(chuàng)建Connector
把下面的payload POST到http://<ip addr of debezium>:8083/connectors/
{
? ? "name": "herodotus-connector",
? ? "config": {
? ? ? ? "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
? ? ? ? "database.hostname": "192.168.101.10",
? ? ? ? "database.port": "15432",
? ? ? ? "database.user": "athena",
? ? ? ? "database.password": "athena",
? ? ? ? "database.dbname" : "athena",
? ? ? ? "database.server.name": "herodotus",
? ? ? ? "slot.name": "herodotus_slot",
? ? ? ? "table.include.list": "public.sys_organization",
? ? ? ? "publication.name": "herodotus_public_connector",
? ? "publication.autocreate.mode": "filtered",
? ? "plugin.name": "pgoutput"
? ? }
}
postman 界面操作如下圖:
下面結(jié)合本例子中connector的配置信息對幾個重點(diǎn)屬性進(jìn)行進(jìn)一步說明:
Slot.name
按照上例Debezium會在PostgreSQL創(chuàng)建一個名為`herodotus_slot`的復(fù)制槽,本例中創(chuàng)建的connector需要通過該復(fù)制槽獲取數(shù)據(jù)變更的信息扒怖。
可以通過以下sql查看復(fù)制槽的信息:
select * from pg_replication_slots;
上圖中较锡,active_pid為14200,即進(jìn)程ID為14200的wal_sender進(jìn)程已經(jīng)在使用該復(fù)制槽與Debezium交互了
database.server.name和table.include.list
當(dāng)connector獲取到數(shù)據(jù)變更的信息后盗痒,會把該信息轉(zhuǎn)化為統(tǒng)一的數(shù)據(jù)格式蚂蕴,并發(fā)布到Kafka的topic中。Debezium規(guī)定一個表對應(yīng)一個topic积糯,topic的名字的格式為 <database.server.name>.<schema name>.<table name>掂墓,本例中的表的數(shù)據(jù)變更消息將保存到Kafka的topic
herodotus.public.sys_organization中。
可以通過以下代碼查看接收到的信息:
@KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium")
public void received(String message) {
log.info("[Herodotus] |- Recived message from Debezium : [{}]", message);
}
5. 運(yùn)行測試
現(xiàn)在看成,可以基于以上環(huán)境的配置君编,進(jìn)行Debezium捕獲數(shù)據(jù)效果的測試〈ɑ牛可以進(jìn)入到Kafka容器中吃嘿,使用使用Kafka提供的kafka-console-consumer.sh查看Topic接收到的數(shù)據(jù)祠乃。具體命令如下:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization
5.1 Insert 測試
在數(shù)據(jù)庫sys_organization 表中插入一條數(shù)據(jù)
Kafka的消費(fèi)者命令行工具收到了來自Debezium發(fā)布的數(shù)據(jù)變更消息:
格式化后的消息體如下,其中schema字段在此先忽略兑燥,重點(diǎn)放payload.before亮瓷,payload.after及payload.op字段上:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "AAAAA",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594964405,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461608\",\"63461608\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2460,
"lsn": 63461896,
"xmin": null
},
"op": "c",
"ts_ms": 1626594964846,
"transaction": null
}
}
由于是insert操作,所以op為c (create)降瞳,before為null嘱支,after為我們插入的數(shù)據(jù)。
5.2 Update 測試
在數(shù)據(jù)庫sys_organization表中修改一條數(shù)據(jù)
Kafka的消費(fèi)者命令行工具收到了來自Debezium發(fā)布的數(shù)據(jù)變更消息:
格式化后的消息體如下:
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"organization_id": "4",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": 1,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": "BBBBB",
"parent_id": null,
"partition_code": null,
"short_name": null
},
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626595173601,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63466888\",\"63466888\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2461,
"lsn": 63467176,
"xmin": null
},
"op": "u",
"ts_ms": 1626595173825,
"transaction": null
}
}
進(jìn)行更新產(chǎn)品信息的操作后挣饥,consumer將收到一條op為u (update)的信息除师,after為修改后的數(shù)據(jù)。
5.3 Delete測試
在數(shù)據(jù)庫sys_organization表中刪除一條數(shù)據(jù)
Kafka的消費(fèi)者命令行工具收到了來自Debezium發(fā)布的數(shù)據(jù)變更消息:
格式化后的消息體如下:
{
"schema": {
...
},
"payload": {
"before": {
"organization_id": "3",
"create_time": null,
"ranking": null,
"update_time": null,
"description": null,
"is_reserved": null,
"reversion": null,
"status": null,
"a4_biz_org_id": null,
"biz_org_code": null,
"biz_org_desc": null,
"biz_org_id": null,
"biz_org_name": null,
"biz_org_type": null,
"organization_name": null,
"parent_id": null,
"partition_code": null,
"short_name": null
},
"after": null,
"source": {
"version": "1.6.0.Final",
"connector": "postgresql",
"name": "herodotus",
"ts_ms": 1626594566933,
"snapshot": "false",
"db": "athena",
"sequence": "[\"63461120\",\"63461120\"]",
"schema": "public",
"table": "sys_organization",
"txId": 2458,
"lsn": 63461176,
"xmin": null
},
"op": "d",
"ts_ms": 1626594567136,
"transaction": null
}
}
進(jìn)行刪除產(chǎn)品信息的操作后扔枫,consumer將收到一條op為d (delete)的信息汛聚,before為刪除前的數(shù)據(jù),after為null短荐。
6.總結(jié)
通過Debezimu進(jìn)行數(shù)據(jù)同步倚舀,不僅解決了傳統(tǒng)ETL時效性不高的問題,還解決了基于消息隊列需要兩端編寫代碼的工程量忍宋,而且基于容器的方式更適合微服務(wù)架構(gòu)的使用痕貌,使用Kafka進(jìn)行消費(fèi)端的整合,使得整合方式更加靈活便捷讶踪、終端類型更加豐富芯侥。
示例代碼地址:
[Gitee](https://gitee.com/herodotus/eurynome-cloud)
[Github](https://github.com/herodotus-cloud/eurynome-cloud)
搜索