Bottledwater同步PostgreSQL中的數(shù)據(jù)變化到Kafka消息隊列

當(dāng)我們遇到需要捕獲數(shù)據(jù)庫中數(shù)據(jù)變化的時候,總是會想到通過消息隊列來實現(xiàn)該需求全释,通過把數(shù)據(jù)變化發(fā)布到消息隊列,來完成系統(tǒng)上下游的解耦。關(guān)心這些數(shù)據(jù)變化的應(yīng)用可以從消息隊列上獲取這些數(shù)據(jù)幔托。

Bottledwater-pg是針對PostgreSQL數(shù)據(jù)庫的一種消息生產(chǎn)者,可以將PostgreSQL數(shù)據(jù)庫的數(shù)據(jù)寫入confluent Kafka,從而實時的分享給消息訂閱者重挑。支持PostgreSQL 9.4以及以上版本嗓化,支持全量快照,以及持續(xù)解析數(shù)據(jù)WAL日志中的增量數(shù)據(jù)并寫入Kafka谬哀。每一張數(shù)據(jù)庫表為一個topic刺覆。數(shù)據(jù)在使用decode從WAL取出后,使用Avro將數(shù)據(jù)格式化(通常格式化為JSON)再寫入Kafka史煎。

Bottledwater-pg有docker谦屑、源碼編譯、Ubuntu三種使用方式篇梭,本文以源碼編譯方式說明如何部署氢橙。

一. 環(huán)境說明

源端


PostgreSQL :postgresql-10.5

Kafka:kafka_2.11-2.3.0

Bottledwater-pg依賴以下軟件包:

avro-c > =1.8.0

jansson

libcurl

librdkafka > =0.9.1

Bottledwater-pg可選以下軟件包:

libsnappy

boost


另外編譯要求較高的cmake版本,操作系統(tǒng)自帶的cmake會出現(xiàn)編譯錯誤恬偷,本文使用:

cmake-3.8.0

二. 安裝前準備

2.1postgresql安裝

配置用戶和組

groupadd postgres

useradd postgres -g postgres

環(huán)境準備

yum install -y perl-ExtUtils-Embed readline-devel zlib-devel pam-devel libxml2-devel libxslt-devel openldap-devel python-devel gcc-c++ openssl-devel cmake gcc* readline-devel

權(quán)限配置

mkdir /opt/postgres

chown -R postgres:postgres /opt/postgres/

配置環(huán)境變量

vi /etc/profile

#在文件末尾將以下環(huán)境變量添加進去

export PATH=/opt/postgres/bin:$PATH

export PGHOME=/opt/postgres

export PGDATA=/opt/postgres/data/

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PGHOME/lib/

export PATH=$PGHOME/bin:$PATH:$HOME/bin```

2.1.1安裝過程

安裝數(shù)據(jù)庫

cd 下載好的壓縮包存放路徑

#解壓文件

tar -zxvf postgresql-10.5.tar.gz

cd postgresql-10.5

#參數(shù)根據(jù)自己需求配置

./configure --prefix=/opt/postgres/ --with-python --with-libxml --with-libxslt make make install

****悍手,安裝過程內(nèi)容太長就不截圖了,從屏顯的信息最后看到PostgreSQL installation complete. 就說明安裝好了喉磁,如果報錯大多數(shù)都是安裝包問題或者依賴沒下載谓苟,看下錯誤信息基本都能解決****

初始化數(shù)據(jù)庫

su postgres

#初始化數(shù)據(jù)庫的參數(shù)也是根據(jù)自己需要添加 ,可以通過--help查看

/opt/postgres/bin/initdb -D $PGDATA -E UTF8

#如果出現(xiàn)以下message就說明初始化成功了

********************************************************************* creating directory /opt/postgres/data ... ok creating subdirectories ... ok selecting default max_connections ... 100 selecting default shared_buffers ... 128MB selecting dynamic shared memory implementation ... posix creating configuration files ... ok running bootstrap script ... ok performing post-bootstrap initialization ... ok syncing data to disk ... ok WARNING: enabling "trust" authentication for local connections You can change this by editing pg_hba.conf or using the option -A, or --auth-local and --auth-host, the next time you run initdb. Success. You can now start the database server using: /opt/postgres/bin/pg_ctl -D /opt/postgres/data -l logfile start ********************************************************************

#啟動數(shù)據(jù)庫服務(wù) #數(shù)據(jù)文件和日志文件的路徑根據(jù)自己需求指定协怒。

/opt/postgres/bin/pg_ctl -D $PGDATA -l /opt/postgres/server.log start

到此postgres服務(wù)就安裝完畢了涝焙。

2.1.2使用數(shù)據(jù)庫

進入數(shù)據(jù)庫

su postgres

#進入數(shù)據(jù)庫

[postgres@localhost postgres]

$ psql Type "help" for help.

postgres=#

修改數(shù)據(jù)庫配置,允許其他服務(wù)器連接

#postgres安裝好以后需要修改2個配置文件才能允許別的服務(wù)器訪問孕暇。

cd /opt/postgres/data

vi postgresql.conf

#找到listen_addresses和port參數(shù)仑撞,修改如下,也可根據(jù)自己需求修改

listen_addresses = '*'

port = 5432

#根據(jù)自己的網(wǎng)段設(shè)置下放行的ip規(guī)則

vi pg_hba.conf 
# IPv4 local connections:

host   all    all    192.168.0.0/16    md5

配置好以后就可以使用postgres數(shù)據(jù)庫了妖滔。

2.2編譯安裝cmake

(root用戶操作)

# cd /opt # tar zxvf cmake-3.8.0.tar.gz

# cd cmake-3.8.0

# ./bootstrap

# make

# make install

# cmake -version

cmake version 3.8.0

CMake suite maintained and supported by Kitware ([kitware.com/cmake](http://kitware.com/cmake)).

2.3編譯安裝jansson

(root用戶操作)

# cd /opt

# tar jxvf jansson-2.9.tar.bz2

# cd jansson-2.9

# ./configure

# make

# make install

# ls /usr/local/lib/pkgconfig jansson.pc

#export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.4編譯安裝avro

(root用戶操作)

# cd /opt

# yum install -y xz-*

# yum install -y zlib-devel.x86_64

# tar zxvf avro-src-1.8.1.tar.gz

# cd avro-src-1.8.1/lang/c

# mkdir build # cd build

# cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true

# make

# make test

# make install

導(dǎo)入庫文件

# vi /etc/[ld.so](http://ld.so/).conf /opt/avro/lib # ldconfig

配置臨時環(huán)境變量

# export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH

# export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.5安裝libcurl

(root用戶操作)

# yum install -y libcurl-devel.x86_64

2.6編譯安裝librdkafka

(root用戶操作)

# cd /opt # unzip librdkafka-master.zip

# cd librdkafka-master

# ./configure

# make

# make install

# ls /usr/local/lib/pkgconfig

jansson.pc   rdkafka.pc    rdkafka++.pc

2.7添加引用庫

(root用戶操作)

# vi /etc/[ld.so](http://ld.so/).conf.d/bottledwater.conf

/opt/avro/lib

/usr/local/lib

/opt/postgres/lib #初始化數(shù)據(jù)庫位置的引用庫

# ldconfig

2.8編譯安裝bottledwater-pg

(root用戶操作)

# chown -R postgres:postgres /opt/

(postgres用戶操作)

配置環(huán)境變量

$ vi ~/.bash_profile

# .bash_profile

# Get the aliases and functions if [ -f ~/.bashrc ]; then . ~/.bashrc fi

# User specific environment and startup programs export

PG_HOME=/opt/postgres #初始化數(shù)據(jù)庫位置

export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH

export PKG_CONFIG_PATH=/opt/avro/lib/[pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH](http://pkgconfig/usr/local/lib/pkgconfig:%24PKG_CONFIG_PATH)

PATH=$PG_HOME/bin:$PATH:$HOME/bin

export PATH

準備安裝包

$ unzip bottledwater-pg-master.zip

$ cd bottledwater-pg-master

這里不知道是操作系統(tǒng)環(huán)境的問題還是開源軟件本身的問題隧哮,需要修改源碼包里的2處Makefile才能通過編譯。


     PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)

修改kafka/Makefile

    PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)

    LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)

編譯并安裝bottledwater-pg

$ make

$ make install

安裝完成后會自動在PostgreSQL數(shù)據(jù)庫擴展包目錄下生成擴展庫文件和擴展庫控制文件座舍。

$ ls /opt/postgres/lib/bottledwater*

/opt/postgres/lib/[bottledwater.so](http://bottledwater.so/)

$ ls /opt/postgres/share/extension/bottledwater*

/opt/postgres/share/extension/bottledwater--0.1.sql

/opt/postgres/share/extension/bottledwater.control

2.9 kafka安裝

Step 1: 下載代碼

下載kafka_2.11-2.3.0版本并且解壓它沮翔。

tar -xzf kafka_2.11-2.3.0.tgz

cd kafka_2.11-2.3.0

Step 2: 啟動服務(wù)

運行kafka需要使用Zookeeper,所以你需要先啟動Zookeeper曲秉,如果你沒有Zookeeper采蚀,你可以使用kafka自帶打包和配置好的Zookeeper。

bin/zookeeper-server-start.sh config/zookeeper.properties

[2019-10-23 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... 現(xiàn)在啟動kafka服務(wù)

bin/kafka-server-start.sh config/server.properties &

[2019-10-23 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...

2.9數(shù)據(jù)庫配置

試驗環(huán)境中PostgreSQL已有數(shù)據(jù)庫msc

msc庫中有一張表:

CREATE TABLE COMPANY (

ID INT PRIMARY KEY NOT NULL,

NAME TEXT NOT NULL,

AGE INT NOT NULL,

ADDRESS CHAR (50),

SALARY REAL,

JOIN_DATE DATE

);

修改數(shù)據(jù)庫配置文件

$ vi /opt/postgres/data/postgresql.conf

max_worker_processes = 8 # 至少為8

wal_level = logical # 至少為logical,可以更高

max_wal_senders = 8 # 至少為8

wal_keep_segments = 256 # 至少為256

max_replication_slots = 4 # 至少為4

修改數(shù)據(jù)庫白名單配置文件

  *這部分權(quán)限可能過大承二,可以精簡

$ vi /opt/postgres/data//pg_hba.conf

local replication all trust

host replication all 127.0.0.1/32 trust

host replication all 0.0.0.0/0 md5

host all postgres 10.19.100.23/32 trust

重啟數(shù)據(jù)庫并對要監(jiān)控的庫創(chuàng)建bottledwater擴展

$ pg_ctl restart 
$ psql -U postgres -d mas -c "create extension bottledwater;"

CREATE EXTENSION

三 測試同步

$cd /opt/bottledwater-pg-master

假設(shè)一切都在localhost的默認端口上運行榆鼠,則可以按以下方式啟動bottledwater:

$./kafka/bottledwater --postgres=postgres://localhost

第一次運行時,它將創(chuàng)建一個名為的復(fù)制插槽bottledwater亥鸠,為數(shù)據(jù)庫創(chuàng)建一致的快照妆够,然后將其發(fā)送到Kafka识啦。(您可以使用--slot 命令行標志來更改復(fù)制插槽的名稱。)快照完成后神妹,它將切換為使用復(fù)制流颓哮。

當(dāng)您不再希望運行瓶裝水時,必須刪除其復(fù)制插槽(否則最終將用完磁盤空間鸵荠,因為開放的復(fù)制插槽會阻止WAL進行垃圾回收)题翻。您可以通過psql 再次打開并運行以下命令來執(zhí)行此操作:

select pg_drop_replication_slot('bottledwater');

第二次運行則發(fā)送到kafka中

$./kafka/bottledwater --postgres=postgres://postgres@39.108.83.108:5432/msc --broker=39.108.83.108:9092 -f json

[INFO] Writing messages to Kafka in JSON format [INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1". INFO: bottledwater_export: Table mas.mastest is keyed by index mastest_pkey [INFO] Snapshot complete, streaming changes from 0/1749F58.

向源端數(shù)據(jù)庫寫入數(shù)據(jù)

INSERT INTO COMPANY (ID,NAME,AGE,ADDRESS,SALARY,JOIN_DATE)

VALUES(6,'Paul',32,'Cal6',20000.00,'2001-08-13');

DELETE from COMPANY where ID= 1

UPDATE COMPANY set NAME='joker' WHERE ID= 6

目的端查看消息事件

查看topics列表

# bin/kafka-topics.sh --list --zookeeper localhost:2181

company

#./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic company --from-beginning --property print.key=true

{"id": {"int": 1}} {"id": {"int": 1}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}

{"id": {"int": 2}} {"id": {"int": 2}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}

{"id": {"int": 3}} {"id": {"int": 3}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}

null

null

{"id": {"int": 4}} {"id": {"int": 4}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 7, "day": 13}}}

{"id": {"int": 5}} {"id": {"int": 5}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "California5 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}

{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "Paul"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}

{"id": {"int": 1}} null

{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "zyc"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}

{"id": {"int": 6}} {"id": {"int": 6}, "name": {"string": "joker"}, "age": {"int": 32}, "address": {"string": "Cal6 "}, "salary": {"float": 20000.0}, "join_date": {"Date": {"year": 2001, "month": 8, "day": 13}}}

說明:

1. 目的端Kafka配置文件中,Listener的配置不能用默認值localhost

2. 每一張PostgreSQL中的表都會被做為一個topic腰鬼,每個topic是自動創(chuàng)建的不需要人工干預(yù),即使后期新建的表也是如此塑荒。

3. 消息事件以“主鍵列 + 變更后的所有列”作為消息內(nèi)容

4. 主從復(fù)制模式下熄赡,slave不能啟動bottledwater,因為備庫不產(chǎn)生wal日志

四bottledwater命令行選項

bottledwater客戶端接受的各種命令行選項的參考,并帶有指向相關(guān)文檔區(qū)域的鏈接齿税。如果這與的輸出不一致bottledwater --help彼硫,則--help是正確的(請?zhí)峤徽埱笳埱笠愿麓艘茫。?
-d凌箕,--postgres=[postgres://user:pass@host:port/dbname](postgres://user:pass@hostport)(必需):PostgreSQL服務(wù)器的連接字符串或URI拧篮。

-s,--slot=slotname (默認值:bottledwater):[復(fù)制插槽的](https://github.com/confluentinc/bottledwater-pg#configuration)名稱牵舱。該插槽是在首次使用時自動創(chuàng)建的串绩。

-b,--broker=host1[:port1],host2[:port2]... (默認值:localhost:9092):Kafka代理主機/端口的列表芜壁,以逗號分隔礁凡。

-r,--schema-registry=[http://hostname:port](http://hostnameport/)(默認值:[http:// localhost:8081](http://localhost:8081/)):注冊Avro模式的服務(wù)的URL慧妄。(僅用于 --output-format=avro顷牌。時省略--output-format=json。)

-f塞淹,--output-format=[avro|json] (默認值:avro):如何編碼用于寫入Kafka的消息窟蓝。請參見[輸出格式的](https://github.com/confluentinc/bottledwater-pg#output-formats)討論。

-u饱普,--allow-unkeyed:允許導(dǎo)出沒有主鍵的表运挫。[默認情況下不允許](https://github.com/confluentinc/bottledwater-pg#consuming-data)這樣做,因為更新和刪除需要主鍵來標識其行费彼。

-p滑臊,--topic-prefix=prefix:字符串,以加在所有[主題名稱之前](https://github.com/confluentinc/bottledwater-pg#topic-names)箍铲。例如雇卷,使用 --topic-prefix=postgres,來自表“ users”的更新將被寫入主題“ postgres.users”。

-e关划,--on-error=[log|exit] (默認值:exit):如果出現(xiàn)暫時性錯誤(例如無法發(fā)布到Kafka)小染,該怎么辦。請參閱[錯誤處理的](https://github.com/confluentinc/bottledwater-pg#error-handling)討論贮折。

-x裤翩,--skip-snapshot:跳過采取[一致的快照](https://github.com/confluentinc/bottledwater-pg#configuration)的現(xiàn)有數(shù)據(jù)庫的內(nèi)容和剛開始流的任何新的更新。(忽略復(fù)制插槽是否已存在调榄。)

-C踊赠,--kafka-config property=value:kafka制片設(shè)置全局配置屬性(見[librdkafka文檔](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md))。

-T每庆,--topic-config property=value:kafka生產(chǎn)者將主題配置屬性(見[librdkafka文檔](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md))筐带。

--config-help:打印Kafka配置屬性列表。

-h缤灵,--help:打印此幫助文本伦籍。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市腮出,隨后出現(xiàn)的幾起案子帖鸦,更是在濱河造成了極大的恐慌,老刑警劉巖胚嘲,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件作儿,死亡現(xiàn)場離奇詭異,居然都是意外死亡慢逾,警方通過查閱死者的電腦和手機立倍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來侣滩,“玉大人口注,你說我怎么就攤上這事【椋” “怎么了寝志?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長策添。 經(jīng)常有香客問我材部,道長,這世上最難降的妖魔是什么唯竹? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任乐导,我火速辦了婚禮,結(jié)果婚禮上浸颓,老公的妹妹穿的比我還像新娘物臂。我一直安慰自己旺拉,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布棵磷。 她就那樣靜靜地躺著蛾狗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪仪媒。 梳的紋絲不亂的頭發(fā)上沉桌,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音算吩,去河邊找鬼留凭。 笑死,一個胖子當(dāng)著我的面吹牛偎巢,可吹牛的內(nèi)容都是我干的冰抢。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼艘狭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了翠订?” 一聲冷哼從身側(cè)響起巢音,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎尽超,沒想到半個月后官撼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡似谁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年傲绣,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巩踏。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡秃诵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出塞琼,到底是詐尸還是另有隱情菠净,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布彪杉,位于F島的核電站毅往,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏派近。R本人自食惡果不足惜攀唯,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一榕栏、第九天 我趴在偏房一處隱蔽的房頂上張望噪窘。 院中可真熱鬧,春花似錦询张、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至溪食,卻和暖如春囊卜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背错沃。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工栅组, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人枢析。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓玉掸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親醒叁。 傳聞我的和親對象是個殘疾皇子司浪,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容