JDBC Source Connector
Quickstart
數據庫環(huán)境準備
CREATE DATABASE connector;
USE connector;
CREATE TABLE `from_source` (
`fdsid` int(11) NOT NULL AUTO_INCREMENT,
`dsid` int(11) DEFAULT NULL,
`from` int(11) DEFAULT NULL,
`stype` int(11) DEFAULT NULL,
PRIMARY KEY (`fdsid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
INSERT INTO from_source VALUES(1, 2, 1, 1);
INSERT INTO from_source VALUES(2, 2, 1, 2);
INSERT INTO from_source VALUES(3, 2, 1, 5);
INSERT INTO from_source VALUES(4, 2, 1, 6);
MySQL JDBC 驅動準備
測試環(huán)境使用的mysql版本信息如下:
mysql-info.png
在mysql官網上選擇合適的驅動下載进倍,測試中下載的是mysql-connector-java-5.1.42.tar.gz背捌。
將此驅動拷貝到$CONFLUENT_PATH/share/java/kafka-connect-jdbc目錄下毡庆,使用解壓命令解壓
tar -xzvf mysql-connector-java-5.1.42.tar.gz
cd mysql-connector-java-5.1.42
cp mysql-connector-java-5.1.42-bin.jar ../
最終的目的就是將mysql-connector-java-5.1.42-bin.jar放在$CONFLUENT_PATH/share/java/kafka-connect-jdbc目錄下毅否,這樣confluecnt connector在啟動是就可以找到mysql的jdbc驅動了
配置文件準備
單機環(huán)境下運行connector的命令如下:
//INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:61)
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties
其中:connect-avro-standalone.properties可使用默認配置螟加;
mysql-source.properties的內容如下:
# tasks to create:
name=test-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# a table called 'users' will be written to the topic 'test-mysql-jdbc-users'.
connection.url=jdbc:mysql://172.24.8.114:3306/connector?user=$USER&password=$PASSWORD
mode=incrementing
incrementing.column.name=fdsid
topic.prefix=test-mysql-jdbc-
其中捆探,$USER,$PASSWORD是訪問mysql數據庫的用戶名和地址黍图。
參考:
JDBC驅動下載地址
MySQL Connector配置參考
從MySQL導入數據到Kafka中
啟動Connector
bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties
驗證產生了相應的topic
bin/kafka-topics --zookeeper localhost:2181 --list
topic列表中會包含:test-mysql-jdbc-from_source
驗證topic中的數據
bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-mysql-jdbc-from_source --from-beginning
此時,可以獲取topic:test-mysql-jdbc-from_source中的所有數據搔弄。
驗證可增量從MySQL中導入數據
向from_source表中添加一個數據
insert into from_source values(7, 2, 1, 1)
相應的消費者會接收到插入到from_source表中的數據顾犹。
總參考:
JDBC Source Connector 官網