當(dāng)我們遇到需要捕獲數(shù)據(jù)庫中數(shù)據(jù)變化的時候,總是會想到通過消息隊列來實現(xiàn)該需求全释,通過把數(shù)據(jù)變化發(fā)布到消息隊列,來完成系統(tǒng)上下游的解耦。關(guān)心這些數(shù)據(jù)變化的應(yīng)用可以從消息隊列上獲取這些數(shù)據(jù)幔托。
Bottledwater-pg是針對PostgreSQL數(shù)據(jù)庫的一種消息生產(chǎn)者,可以將PostgreSQL數(shù)據(jù)庫的數(shù)據(jù)寫入confluent Kafka,從而實時的分享給消息訂閱者重挑。支持PostgreSQL 9.4以及以上版本嗓化,支持全量快照,以及持續(xù)解析數(shù)據(jù)WAL日志中的增量數(shù)據(jù)并寫入Kafka谬哀。每一張數(shù)據(jù)庫表為一個topic刺覆。數(shù)據(jù)在使用decode從WAL取出后,使用Avro將數(shù)據(jù)格式化(通常格式化為JSON)再寫入Kafka史煎。
Bottledwater-pg有docker谦屑、源碼編譯、Ubuntu三種使用方式篇梭,本文以源碼編譯方式說明如何部署氢橙。
一. 環(huán)境說明
源端
PostgreSQL :postgresql-10.5
Kafka:kafka_2.11-2.3.0
Bottledwater-pg依賴以下軟件包:
avro-c > =1.8.0
jansson
libcurl
librdkafka > =0.9.1
Bottledwater-pg可選以下軟件包:
libsnappy
boost
另外編譯要求較高的cmake版本,操作系統(tǒng)自帶的cmake會出現(xiàn)編譯錯誤恬偷,本文使用:
cmake-3.8.0
二. 安裝前準備
2.1postgresql安裝
配置用戶和組
groupadd postgres
useradd postgres -g postgres
環(huán)境準備
yum install -y perl-ExtUtils-Embed readline-devel zlib-devel pam-devel libxml2-devel libxslt-devel openldap-devel python-devel gcc-c++ openssl-devel cmake gcc* readline-devel
權(quán)限配置
mkdir /opt/postgres
chown -R postgres:postgres /opt/postgres/
配置環(huán)境變量
vi /etc/profile
#在文件末尾將以下環(huán)境變量添加進去
export PATH=/opt/postgres/bin:$PATH
export PGHOME=/opt/postgres
export PGDATA=/opt/postgres/data/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PGHOME/lib/
export PATH=$PGHOME/bin:$PATH:$HOME/bin```
2.1.1安裝過程
安裝數(shù)據(jù)庫
cd 下載好的壓縮包存放路徑
#解壓文件
tar -zxvf postgresql-10.5.tar.gz
cd postgresql-10.5
#參數(shù)根據(jù)自己需求配置
./configure --prefix=/opt/postgres/ --with-python --with-libxml --with-libxslt make make install
****悍手,安裝過程內(nèi)容太長就不截圖了,從屏顯的信息最后看到PostgreSQL installation complete. 就說明安裝好了喉磁,如果報錯大多數(shù)都是安裝包問題或者依賴沒下載谓苟,看下錯誤信息基本都能解決****
初始化數(shù)據(jù)庫
su postgres
#初始化數(shù)據(jù)庫的參數(shù)也是根據(jù)自己需要添加 ,可以通過--help查看
/opt/postgres/bin/initdb -D $PGDATA -E UTF8
#如果出現(xiàn)以下message就說明初始化成功了
********************************************************************* creating directory /opt/postgres/data ... ok creating subdirectories ... ok selecting default max_connections ... 100 selecting default shared_buffers ... 128MB selecting dynamic shared memory implementation ... posix creating configuration files ... ok running bootstrap script ... ok performing post-bootstrap initialization ... ok syncing data to disk ... ok WARNING: enabling "trust" authentication for local connections You can change this by editing pg_hba.conf or using the option -A, or --auth-local and --auth-host, the next time you run initdb. Success. You can now start the database server using: /opt/postgres/bin/pg_ctl -D /opt/postgres/data -l logfile start ********************************************************************
#啟動數(shù)據(jù)庫服務(wù) #數(shù)據(jù)文件和日志文件的路徑根據(jù)自己需求指定协怒。
/opt/postgres/bin/pg_ctl -D $PGDATA -l /opt/postgres/server.log start
到此postgres服務(wù)就安裝完畢了涝焙。
2.1.2使用數(shù)據(jù)庫
進入數(shù)據(jù)庫
su postgres
#進入數(shù)據(jù)庫
[postgres@localhost postgres]
$ psql Type "help" for help.
postgres=#
修改數(shù)據(jù)庫配置,允許其他服務(wù)器連接
#postgres安裝好以后需要修改2個配置文件才能允許別的服務(wù)器訪問孕暇。
cd /opt/postgres/data
vi postgresql.conf
#找到listen_addresses和port參數(shù)仑撞,修改如下,也可根據(jù)自己需求修改
listen_addresses = '*'
port = 5432
#根據(jù)自己的網(wǎng)段設(shè)置下放行的ip規(guī)則
vi pg_hba.conf
# IPv4 local connections:
host all all 192.168.0.0/16 md5
配置好以后就可以使用postgres數(shù)據(jù)庫了妖滔。
2.2編譯安裝cmake
(root用戶操作)
# cd /opt # tar zxvf cmake-3.8.0.tar.gz
# cd cmake-3.8.0
# ./bootstrap
# make
# make install
# cmake -version
cmake version 3.8.0
CMake suite maintained and supported by Kitware ([kitware.com/cmake](http://kitware.com/cmake)).
2.3編譯安裝jansson
(root用戶操作)
# cd /opt
# tar jxvf jansson-2.9.tar.bz2
# cd jansson-2.9
# ./configure
# make
# make install
# ls /usr/local/lib/pkgconfig jansson.pc
#export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH
2.4編譯安裝avro
(root用戶操作)
# cd /opt
# yum install -y xz-*
# yum install -y zlib-devel.x86_64
# tar zxvf avro-src-1.8.1.tar.gz
# cd avro-src-1.8.1/lang/c
# mkdir build # cd build
# cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true
# make
# make test
# make install
導(dǎo)入庫文件
# vi /etc/[ld.so](http://ld.so/).conf /opt/avro/lib # ldconfig
配置臨時環(huán)境變量
# export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
# export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH
2.5安裝libcurl
(root用戶操作)
# yum install -y libcurl-devel.x86_64
2.6編譯安裝librdkafka
(root用戶操作)
# cd /opt # unzip librdkafka-master.zip
# cd librdkafka-master
# ./configure
# make
# make install
# ls /usr/local/lib/pkgconfig
jansson.pc rdkafka.pc rdkafka++.pc
2.7添加引用庫
(root用戶操作)
# vi /etc/[ld.so](http://ld.so/).conf.d/bottledwater.conf
/opt/avro/lib
/usr/local/lib
/opt/postgres/lib #初始化數(shù)據(jù)庫位置的引用庫
# ldconfig
2.8編譯安裝bottledwater-pg
(root用戶操作)
# chown -R postgres:postgres /opt/
(postgres用戶操作)
配置環(huán)境變量
$ vi ~/.bash_profile
# .bash_profile
# Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi
# User specific environment and startup programs export
PG_HOME=/opt/postgres #初始化數(shù)據(jù)庫位置
export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH
export PKG_CONFIG_PATH=/opt/avro/lib/[pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH](http://pkgconfig/usr/local/lib/pkgconfig:%24PKG_CONFIG_PATH)
PATH=$PG_HOME/bin:$PATH:$HOME/bin
export PATH
準備安裝包
$ unzip bottledwater-pg-master.zip
$ cd bottledwater-pg-master
這里不知道是操作系統(tǒng)環(huán)境的問題還是開源軟件本身的問題隧哮,需要修改源碼包里的2處Makefile才能通過編譯。
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
修改kafka/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)
編譯并安裝bottledwater-pg
$ make
$ make install
安裝完成后會自動在PostgreSQL數(shù)據(jù)庫擴展包目錄下生成擴展庫文件和擴展庫控制文件座舍。
$ ls /opt/postgres/lib/bottledwater*
/opt/postgres/lib/[bottledwater.so](http://bottledwater.so/)
$ ls /opt/postgres/share/extension/bottledwater*
/opt/postgres/share/extension/bottledwater--0.1.sql
/opt/postgres/share/extension/bottledwater.control
2.9 kafka安裝
Step 1: 下載代碼
下載kafka_2.11-2.3.0版本并且解壓它沮翔。
tar -xzf kafka_2.11-2.3.0.tgz
cd kafka_2.11-2.3.0
Step 2: 啟動服務(wù)
運行kafka需要使用Zookeeper,所以你需要先啟動Zookeeper曲秉,如果你沒有Zookeeper采蚀,你可以使用kafka自帶打包和配置好的Zookeeper。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2019-10-23 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... 現(xiàn)在啟動kafka服務(wù)
bin/kafka-server-start.sh config/server.properties &
[2019-10-23 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
2.9數(shù)據(jù)庫配置
試驗環(huán)境中PostgreSQL已有數(shù)據(jù)庫msc
msc庫中有一張表:
CREATE TABLE COMPANY (
ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR (50),
SALARY REAL,
JOIN_DATE DATE
);
修改數(shù)據(jù)庫配置文件
$ vi /opt/postgres/data/postgresql.conf
max_worker_processes = 8 # 至少為8
wal_level = logical # 至少為logical,可以更高
max_wal_senders = 8 # 至少為8
wal_keep_segments = 256 # 至少為256
max_replication_slots = 4 # 至少為4
修改數(shù)據(jù)庫白名單配置文件
*這部分權(quán)限可能過大承二,可以精簡
$ vi /opt/postgres/data//pg_hba.conf
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all 0.0.0.0/0 md5
host all postgres 10.19.100.23/32 trust
重啟數(shù)據(jù)庫并對要監(jiān)控的庫創(chuàng)建bottledwater擴展
$ pg_ctl restart
$ psql -U postgres -d mas -c "create extension bottledwater;"
CREATE EXTENSION
三 測試同步
$cd /opt/bottledwater-pg-master
假設(shè)一切都在localhost的默認端口上運行榆鼠,則可以按以下方式啟動bottledwater:
$./kafka/bottledwater --postgres=postgres://localhost
第一次運行時,它將創(chuàng)建一個名為的復(fù)制插槽bottledwater亥鸠,為數(shù)據(jù)庫創(chuàng)建一致的快照妆够,然后將其發(fā)送到Kafka识啦。(您可以使用--slot 命令行標志來更改復(fù)制插槽的名稱。)快照完成后神妹,它將切換為使用復(fù)制流颓哮。
當(dāng)您不再希望運行瓶裝水時,必須刪除其復(fù)制插槽(否則最終將用完磁盤空間鸵荠,因為開放的復(fù)制插槽會阻止WAL進行垃圾回收)题翻。您可以通過psql 再次打開并運行以下命令來執(zhí)行此操作:
select pg_drop_replication_slot('bottledwater');
第二次運行則發(fā)送到kafka中
$./kafka/bottledwater --postgres=postgres://postgres@39.108.83.108:5432/msc --broker=39.108.83.108:9092 -f json
[INFO] Writing messages to Kafka in JSON format [INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1". INFO: bottledwater_export: Table mas.mastest is keyed by index mastest_pkey [INFO] Snapshot complete, streaming changes from 0/1749F58.
向源端數(shù)據(jù)庫寫入數(shù)據(jù)
INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY,JOIN_DATE)
VALUES(6,'Paul',32,'Cal6',20000.00,'2001-08-13');
DELETE from COMPANY where ID= 1
UPDATE COMPANY set NAME='joker' WHERE ID= 6
目的端查看消息事件
查看topics列表
# bin/kafka-topics.sh --list --zookeeper localhost:2181
company
#./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic company --from-beginning --property print.key=true
{"id": {"int": 1}} {"id": {"int": 1}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}
{"id": {"int": 2}} {"id": {"int": 2}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}
{"id": {"int": 3}} {"id": {"int": 3}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}
null
null
{"id": {"int": 4}} {"id": {"int": 4}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}
{"id": {"int": 5}} {"id": {"int": 5}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California5 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}
{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}
{"id": {"int": 1}} null
{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "zyc"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}
{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "joker"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}
說明:
1. 目的端Kafka配置文件中,Listener的配置不能用默認值localhost
2. 每一張PostgreSQL中的表都會被做為一個topic腰鬼,每個topic是自動創(chuàng)建的不需要人工干預(yù),即使后期新建的表也是如此塑荒。
3. 消息事件以“主鍵列 + 變更后的所有列”作為消息內(nèi)容
4. 主從復(fù)制模式下熄赡,slave不能啟動bottledwater,因為備庫不產(chǎn)生wal日志
四bottledwater命令行選項
bottledwater客戶端接受的各種命令行選項的參考,并帶有指向相關(guān)文檔區(qū)域的鏈接齿税。如果這與的輸出不一致bottledwater --help彼硫,則--help是正確的(請?zhí)峤徽埱笳埱笠愿麓艘茫。?
-d凌箕,--postgres=[postgres://user:pass@host:port/dbname](postgres://user:pass@hostport)(必需):PostgreSQL服務(wù)器的連接字符串或URI拧篮。
-s,--slot=slotname (默認值:bottledwater):[復(fù)制插槽的](https://github.com/confluentinc/bottledwater-pg#configuration)名稱牵舱。該插槽是在首次使用時自動創(chuàng)建的串绩。
-b,--broker=host1[:port1],host2[:port2]... (默認值:localhost:9092):Kafka代理主機/端口的列表芜壁,以逗號分隔礁凡。
-r,--schema-registry=[http://hostname:port](http://hostnameport/)(默認值:[http:// localhost:8081](http://localhost:8081/)):注冊Avro模式的服務(wù)的URL慧妄。(僅用于 --output-format=avro顷牌。時省略--output-format=json。)
-f塞淹,--output-format=[avro|json] (默認值:avro):如何編碼用于寫入Kafka的消息窟蓝。請參見[輸出格式的](https://github.com/confluentinc/bottledwater-pg#output-formats)討論。
-u饱普,--allow-unkeyed:允許導(dǎo)出沒有主鍵的表运挫。[默認情況下不允許](https://github.com/confluentinc/bottledwater-pg#consuming-data)這樣做,因為更新和刪除需要主鍵來標識其行费彼。
-p滑臊,--topic-prefix=prefix:字符串,以加在所有[主題名稱之前](https://github.com/confluentinc/bottledwater-pg#topic-names)箍铲。例如雇卷,使用 --topic-prefix=postgres,來自表“ users”的更新將被寫入主題“ postgres.users”。
-e关划,--on-error=[log|exit] (默認值:exit):如果出現(xiàn)暫時性錯誤(例如無法發(fā)布到Kafka)小染,該怎么辦。請參閱[錯誤處理的](https://github.com/confluentinc/bottledwater-pg#error-handling)討論贮折。
-x裤翩,--skip-snapshot:跳過采取[一致的快照](https://github.com/confluentinc/bottledwater-pg#configuration)的現(xiàn)有數(shù)據(jù)庫的內(nèi)容和剛開始流的任何新的更新。(忽略復(fù)制插槽是否已存在调榄。)
-C踊赠,--kafka-config property=value:kafka制片設(shè)置全局配置屬性(見[librdkafka文檔](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md))。
-T每庆,--topic-config property=value:kafka生產(chǎn)者將主題配置屬性(見[librdkafka文檔](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md))筐带。
--config-help:打印Kafka配置屬性列表。
-h缤灵,--help:打印此幫助文本伦籍。