Kafka連接器深度解讀之JDBC源連接器

復(fù)制自:https://liyuj.gitee.io/confluent/#介紹

在現(xiàn)實業(yè)務(wù)中桥滨,Kafka經(jīng)常會遇到的一個集成場景就是驻龟,從數(shù)據(jù)庫獲取數(shù)據(jù)革半,因為關(guān)系數(shù)據(jù)庫是一個非常豐富的事件源医增。數(shù)據(jù)庫中的現(xiàn)有數(shù)據(jù)以及對該數(shù)據(jù)的任何更改都可以流式傳輸?shù)終afka主題中谆构,在這里這些事件可用于驅(qū)動應(yīng)用虾攻,也可以流式傳輸?shù)狡渌鼣?shù)據(jù)存儲(比如搜索引擎或者緩存)用于分析等铡买。

實現(xiàn)這個需求有很多種做法,但是在本文中霎箍,會聚焦其中的一個解決方案奇钞,即Kafka連接器中的JDBC連接器,講述如何進行配置漂坏,以及一些問題排查的技巧景埃,至于更多的細節(jié)媒至,請參見Kafka的文檔。

介紹

Kafka連接器中的JDBC連接器包含在Confluent Platform中谷徙,也可以與Confluent Hub分開安裝拒啰。它可以作為源端從數(shù)據(jù)庫提取數(shù)據(jù)到Kafka,也可以作為接收端從一個Kafka主題中將數(shù)據(jù)推送到數(shù)據(jù)庫完慧。幾乎所有關(guān)系數(shù)據(jù)庫都提供JDBC驅(qū)動谋旦,包括Oracle、Microsoft SQL Server屈尼、DB2册着、MySQL和Postgres。


image.png

下面將從最簡單的Kafka連接器配置開始脾歧,然后進行構(gòu)建甲捏。本文中的示例是從MySQL數(shù)據(jù)庫中提取數(shù)據(jù),該數(shù)據(jù)庫有兩個模式鞭执,每個模式都有幾張表:

mysql> SELECT table_schema, table_name FROM INFORMATION_SCHEMA.tables WHERE TABLE_SCHEMA != 'information_schema';
+--------------+--------------+
| TABLE_SCHEMA | TABLE_NAME   |
+--------------+--------------+
| demo         | accounts     |
| demo         | customers    |
| demo         | transactions |
| security     | firewall     |
| security     | log_events   |
+--------------+--------------+

JDBC驅(qū)動

在進行配置之前司顿,要確保Kafka連接器可以實際連接到數(shù)據(jù)庫,即確保JDBC驅(qū)動可用蚕冬。如果使用的是SQLite或Postgres免猾,那么驅(qū)動已經(jīng)包含在內(nèi),就可以跳過此步驟囤热。對于所有其它數(shù)據(jù)庫猎提,需要將相關(guān)的JDBC驅(qū)動JAR文件放在和kafka-connect-jdbcJAR相同的文件夾中。此文件夾的標(biāo)準(zhǔn)位置為:

  1. Confluent CLI:下載的Confluent Platform文件夾中的share/java/kafka-connect-jdbc/旁蔼;
  2. Docker锨苏,DEB / RPM安裝:/usr/share/java/kafka-connect-jdbc/,關(guān)于如何將JDBC驅(qū)動添加到Kafka連接器的Docker容器棺聊,請參閱此處伞租;
  3. 如果kafka-connect-jdbcJAR位于其它位置,則可以使用plugin.path指向包含它的文件夾限佩,并確保JDBC驅(qū)動位于同一文件夾中葵诈。

還可以在啟動Kafka連接器時指定CLASSPATH,設(shè)置為可以找到JDBC驅(qū)動的位置祟同。一定要將其設(shè)置為JAR本身作喘,而不僅僅是包含它的文件夾,例如:

CLASSPATH=/u01/jdbc-drivers/mysql-connector-java-8.0.13.JAR ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

兩個事情要注意一下:

如果kafka-connect-jdbcJAR位于其它位置晕城,則Kafka連接器的plugin.path選項將無法直接指向JDBC驅(qū)動JAR文件 泞坦。根據(jù)文檔,每個JDBC驅(qū)動JAR必須與kafka-connect-jdbcJAR位于同一目錄砖顷;
如果正在運行多節(jié)點Kafka連接器集群贰锁,則需要在集群中的每個連接器工作節(jié)點上都正確安裝JDBC驅(qū)動JAR赃梧。
確認是否已加載JDBC驅(qū)動

Kafka連接器會加載與kafka-connect-jdbcJAR文件在同一文件夾中的所有JDBC驅(qū)動,還有在CLASSPATH上找到的任何JDBC驅(qū)動豌熄。如果要驗證一下授嘀,可以將連接器工作節(jié)點的日志級別調(diào)整DEBUG,然后會看到如下信息:

1.DEBUG Loading plugin urls:包含kafka-connect-jdbc-5.1.0.jar(或者對應(yīng)當(dāng)前正在運行的版本號)的一組JAR文件:

DEBUG Loading plugin urls: [file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/audience-annotations-0.5.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/common-utils-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jline-0.9.94.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jtds-1.3.1.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.13.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/netty-3.10.6.Final.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/postgresql-9.4-1206-jdbc41.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/slf4j-api-1.7.25.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/sqlite-jdbc-3.25.2.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zkclient-0.10.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zookeeper-3.4.13.jar] (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

在這個JAR列表中房轿,應(yīng)該有JDBC驅(qū)動JAR粤攒。在上面的輸出中所森,可以看到MySQL囱持、Postgres和SQLite的JAR。如果期望的JDBC驅(qū)動JAR不在焕济,可以將驅(qū)動放入kafka-connect-jdbcJAR所在的文件夾中纷妆。

2.INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector':在此之后,在記錄任何其它插件之前晴弃,可以看到JDBC驅(qū)動已注冊:

INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
DEBUG Registered java.sql.Driver: jTDS 1.3.1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@7bbbb6a8 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
DEBUG Registered java.sql.Driver: org.postgresql.Driver@ea9e141 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
DEBUG Registered java.sql.Driver: org.sqlite.JDBC@236134a1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

確認JDBC驅(qū)動包含在已注冊的列表中掩幢。如果沒有,那么就是安裝不正確上鞠。

注意际邻,雖然可能會在日志的其它地方看到驅(qū)動的Registered java.sql.Driver信息,但如果要確認其對于JDBC連接器可用芍阎,那么它必須直接出現(xiàn)在INFO Added plugin 'io.confluent.connect.jdbc消息的后面世曾。

注意,雖然JDBC URL通常允許嵌入身份驗證信息谴咸,但這些內(nèi)容將以明文形式記錄在Kafka連接器日志中轮听。因此應(yīng)該使用單獨的connection.user和connection.password配置項,這樣在記錄時會被合理地處理岭佳。

指定要提取的表
JDBC驅(qū)動安裝完成之后血巍,就可以配置Kafka連接器從數(shù)據(jù)庫中提取數(shù)據(jù)了。下面是最小的配置珊随,不過它不一定是最有用的述寡,因為它是數(shù)據(jù)的批量導(dǎo)入,在本文后面會討論如何進行增量加載叶洞。

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-01-",
"mode":"bulk"
}
}'

使用此配置鲫凶,每個表(用戶有權(quán)訪問)將完全復(fù)制到Kafka,通過使用KSQL列出Kafka集群上的主題京办,我們可以看到:
ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups

mysql-01-accounts | false | 1 | 1 | 0 | 0
mysql-01-customers | false | 1 | 1 | 0 | 0
mysql-01-firewall | false | 1 | 1 | 0 | 0
mysql-01-log_events | false | 1 | 1 | 0 | 0
mysql-01-transactions | false | 1 | 1 | 0 | 0

注意mysql-01前綴掀序,表格內(nèi)容的完整副本將每五秒刷新一次,可以通過修改poll.interval.ms進行調(diào)整惭婿,例如每小時刷新一次:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-02-",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'

找個主題確認一下不恭,顯示完整的數(shù)據(jù)叶雹,看看是不是自己想要的:

ksql> PRINT 'mysql-02-accounts' FROM BEGINNING;
Format:AVRO
12/20/18 3:18:44 PM UTC, null, {"id": 1, "first_name": "Hamel", "last_name": "Bly", "username": "Hamel Bly", "company": "Erdman-Halvorson", "created_date": 17759}
12/20/18 3:18:44 PM UTC, null, {"id": 2, "first_name": "Scottie", "last_name": "Geerdts", "username": "Scottie Geerdts", "company": "Mante Group", "created_date": 17692}
12/20/18 3:18:44 PM UTC, null, {"id": 3, "first_name": "Giana", "last_name": "Bryce", "username": "Giana Bryce", "company": "Wiza Inc", "created_date": 17627}
12/20/18 3:18:44 PM UTC, null, {"id": 4, "first_name": "Allen", "last_name": "Rengger", "username": "Allen Rengger", "company": "Terry, Jacobson and Daugherty", "created_date": 17746}
12/20/18 3:18:44 PM UTC, null, {"id": 5, "first_name": "Reagen", "last_name": "Volkes", "username": "Reagen Volkes", "company": "Feeney and Sons", "created_date": 17798}

目前會展示所有可用的表,這可能不是實際的需求换吧,可能只希望包含特定模式的表折晦,這個可以使用catalog.pattern/schema.pattern(具體哪一個取決于數(shù)據(jù)庫)配置項進行控制:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-03-",
"mode":"bulk",
"poll.interval.ms" : 3600000,
"catalog.pattern" : "demo"
}
}'

這樣就只會從demo模式中取得3張表:

ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups

[…]
mysql-03-accounts | false | 1 | 1 | 0 | 0
mysql-03-customers | false | 1 | 1 | 0 | 0
mysql-03-transactions | false | 1 | 1 | 0 | 0
[…]

也可以使用table.whitelist(白名單)或table.blacklist(黑名單)來控制連接器提取的表,下面的示例顯式地列出了希望拉取到Kafka中的表清單:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_04",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-04-",
"mode":"bulk",
"poll.interval.ms" : 3600000,
"catalog.pattern" : "demo",
"table.whitelist" : "accounts"
}
}'

這時就只有一個表從數(shù)據(jù)庫流式傳輸?shù)終afka:

ksql> LIST TOPICS;

Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups

mysql-04-accounts | false | 1 | 1 | 0 | 0

因為只有一個表沾瓦,下面的配置:

"catalog.pattern" : "demo",
"table.whitelist" : "accounts",

等同于:

"table.whitelist" : "demo.accounts",

也可以在一個模式中指定多個表满着,比如:

"catalog.pattern" : "demo",
"table.whitelist" : "accounts, customers",

或者也可以跨越多個模式:

"table.whitelist" : "demo.accounts, security.firewall",

還可以使用其它的表過濾選項,比如table.types可以選擇表之外的對象贯莺,例如視圖风喇。

過濾表時要注意,因為如果最終沒有對象匹配該模式(或者連接到數(shù)據(jù)庫的已認證用戶沒有權(quán)限訪問)缕探,那么連接器將報錯:

INFO After filtering the tables are: (io.confluent.connect.jdbc.source.TableMonitorThread)

ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
java.lang.IllegalArgumentException: Number of groups must be positive

在通過table.whitelist/table.blacklist進行過濾之前魂莫,可以將日志級別調(diào)整為DEBUG,查看用戶可以訪問的表清單:

DEBUG Got the following tables: ["demo"."accounts", "demo"."customers"] (io.confluent.connect.jdbc.source.TableMonitorThread)

然后爹耗,連接器會根據(jù)提供的白名單/黑名單過濾此列表耙考,因此要確認指定的列表位于連接器可用的列表中,還要注意連接用戶要有權(quán)限訪問這些表潭兽,因此還要檢查數(shù)據(jù)庫端的GRANT語句倦始。

增量提取

到目前為止,已經(jīng)按計劃將整張表都拉取到Kafka山卦,這雖然對于轉(zhuǎn)存數(shù)據(jù)非常有用鞋邑,不過都是批量并且并不總是適合將源數(shù)據(jù)庫集成到Kafka流系統(tǒng)中。

JDBC連接器還有一個流式傳輸?shù)終afka的選項怒坯,它只會傳輸上次拉取后的數(shù)據(jù)變更炫狱,具體可以基于自增列(例如自增主鍵)和/或時間戳(例如最后更新時間戳)來執(zhí)行此操作。在模式設(shè)計中的常見做法是使用這些中的一個或兩個剔猿,例如视译,事務(wù)表ORDERS可能有:

  • ORDER_ID:一個唯一鍵(可能是主鍵),每個新訂單遞增归敬;
  • UPDATE_TS:每次數(shù)據(jù)變更時更新的時間戳列酷含。

可以使用mode參數(shù)配置該選項,比如使用timestamp

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_08",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-08-",
                "mode":"timestamp",
                "table.whitelist" : "demo.accounts",
                "timestamp.column.name": "UPDATE_TS",
                "validate.non.null": false
                }
        }'

下面會獲取表的全部數(shù)據(jù)汪茧,外加源數(shù)據(jù)后續(xù)的更新和插入:

image

注意:

  • 可以結(jié)合使用這些方法中的(時間戳/自增)或兩者(時間戳+自增)椅亚;
  • 要使用的時間戳和/或自增列必須在連接器處理的所有表上。如果不同的表具有不同名稱的時間戳/自增列舱污,則需要創(chuàng)建單獨的連接器配置呀舔;
  • 如果只使用自增列,則不會捕獲對數(shù)據(jù)的更新扩灯,除非每次更新時自增列也會增加(在主鍵的情況下幾乎不可能)媚赖;
  • 某些表可能沒有唯一的標(biāo)識霜瘪,或者有多個組合的列表示行的唯一標(biāo)識(聯(lián)合主鍵),不過JDBC連接器只支持單個標(biāo)識列惧磺;
  • 時間戳+自增列選項為識別新行和更新行提供了最大的覆蓋范圍颖对;
  • 許多RDBMS支持聲明更新時間戳列的DDL,該列會自動更新磨隘。例如:
    • MySQL:
CREATE TABLE foo (
        …
        UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

  • Postgres:
CREATE TABLE foo (
        …
        UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Courtesy of https://techblog.covermymeds.com/databases/on-update-timestamps-mysql-vs-postgres/
CREATE FUNCTION update_updated_at_column() RETURNS trigger
    LANGUAGE plpgsql
    AS $$
  BEGIN
    NEW.update_ts = NOW();
    RETURN NEW;
  END;
$$;

CREATE TRIGGER t1_updated_at_modtime BEFORE UPDATE ON foo FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();

  • Oracle:
CREATE TABLE foo (
        …
        CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
);

CREATE OR REPLACE TRIGGER TRG_foo_UPD
BEFORE INSERT OR UPDATE ON foo
REFERENCING NEW AS NEW_ROW
  FOR EACH ROW
BEGIN
  SELECT SYSDATE
        INTO :NEW_ROW.UPDATE_TS
        FROM DUAL;
END;
/

#基于查詢的提取

有時可能想從RDBMS中提取數(shù)據(jù)缤底,但希望有比整個表更靈活的方式,原因可能包括:

  • 一個有許多列的寬表番捂,但是只希望有部分列被傳輸?shù)終afka主題上个唧;
  • 表中包含敏感信息,不希望這些信息傳輸?shù)終afka主題上(盡管也可以提取時在Kafka連接器中使用單消息轉(zhuǎn)換進行處理)白嘁;
  • 多個表之間存在依賴關(guān)系坑鱼,因此在傳輸?shù)終afka之前膘流,可能希望將其解析為一個單一的一致性視圖絮缅。

這可以使用JDBC連接器的query模式。在了解如何實現(xiàn)之前呼股,需要注意以下幾點:

  • 謹防管道的“過早優(yōu)化”耕魄,僅僅因為不需要源表中的某些列或行,而不是說在流式傳輸?shù)終afka時不應(yīng)包含它們彭谁;
  • 正如將在下面看到的吸奴,當(dāng)涉及增量攝取時,query模式可能不那么靈活缠局,因此從源中簡單地刪除列的另一種方法(無論是簡單地減少數(shù)量,還是因為敏感信息)都是在連接器本身中使用ReplaceField單消息轉(zhuǎn)換读处;
  • 隨著查詢越來越復(fù)雜(例如解析關(guān)聯(lián))唱矛,潛在的壓力和對源數(shù)據(jù)庫的影響會增加罚舱;
  • 在RDBMS(作為源頭)中關(guān)聯(lián)數(shù)據(jù)是解決關(guān)聯(lián)的一種方法管闷,另一種方法是將源表流式傳輸?shù)絾为毜腒afka主題,然后使用KSQL或Kafka Streams根據(jù)需求進行關(guān)聯(lián)(過濾和標(biāo)記數(shù)據(jù)也是如此)窃肠,KSQL是在Kafka中對數(shù)據(jù)進行后處理的絕佳方式,使管道盡可能簡單碧囊。

下面將展示如何將transactions表呕臂,再加上customers表中的數(shù)據(jù)流式傳輸?shù)終afka:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_09",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-09",
                "mode":"bulk",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;",
                "poll.interval.ms" : 3600000
                }
        }'

可能注意到已切換回bulk模式歧蒋,可以使用主鍵或者時間戳其中一個增量選項谜洽,但要確保在SELECT子句中包含相應(yīng)的主鍵/時間戳列(例如txn_id):

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_10",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-10",
                "mode":"incrementing",
                "query":"SELECT txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

如果不包括該列(即使它存在于源表中)阐虚,那么連接器會報錯并顯示org.apache.kafka.connect.errors.DataException異常(#561)或java.lang.NullPointerException異常(#560)实束,這是因為連接器需要在返回的數(shù)據(jù)中獲取值咸灿,以便可以存儲相應(yīng)偏移量的最新值避矢。

如果使用query選項审胸,除非使用mode: bulk#566)砂沛,否則無法指定自己的WHERE子句,也就是說材蛛,在查詢中使用自己的謂詞和使用Kafka進行增量提取之間是互斥的卑吭。

一個還是多個連接器豆赏?

如果需要不同的參數(shù)設(shè)定掷邦,可以創(chuàng)建新的連接器,例如或杠,可能希望有不同的參數(shù):

  • 包含自增主鍵和/或時間戳的列的名稱向抢;
  • 輪詢表的頻率挟鸠;
  • 連接數(shù)據(jù)庫的用戶不同艘希。

簡單來說覆享,如果所有表參數(shù)都一樣淹真,則可以使用單個連接器。

#為什么沒有數(shù)據(jù)啸驯?

創(chuàng)建連接器之后罚斗,可能在目標(biāo)Kafka主題中看不到任何數(shù)據(jù)针姿。下面會一步步進行診斷:

1.查詢/connectors端點距淫,可確認連接器是否創(chuàng)建成功:

$ curl -s“http:// localhost:8083 / connectors”
[ “jdbc_source_mysql_10”]

1
2

應(yīng)該看到連接器列表榕暇,如果沒有彤枢,則需要按照之前的步驟進行創(chuàng)建缴啡,然后關(guān)注Kafka連接器返回的任何錯誤业栅。

2.檢查連接器及其任務(wù)的狀態(tài):

$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/status"|jq '.'
{
"name": "jdbc_source_mysql_10",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8083"
}
],
"type": "source"
}

正常應(yīng)該看到所有的連接器和任務(wù)的state都是RUNNING式镐,不過RUNNING不總是意味著正常娘汞。

3.如果連接器或任務(wù)的狀態(tài)是FAILED你弦,或者即使?fàn)顟B(tài)是RUNNING但是沒有按照預(yù)期行為運行禽作,那么可以轉(zhuǎn)到Kafka連接器工作節(jié)點的輸出(這里有相關(guān)的說明)旷偿,這里會顯示是否存在任何實際的問題萍程。以上面的連接器為例茫负,其狀態(tài)為RUNNING忍法,但是連接器工作節(jié)點日志中實際上全是重復(fù)的錯誤:

ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `t.id` > -1 ORDER BY `t.id` ASC' at line 1
 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)

4.在這里勉失,問題是什么并不明確戴质,需要調(diào)出連接器的配置來檢查指定的查詢是否正確:

$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/config"|jq '.'
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "incrementing",
  "incrementing.column.name": "t.id",
  "topic.prefix": "mysql-10",
  "connection.password": "asgard",
  "validate.non.null": "false",
  "connection.user": "connect_user",
  "query": "SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;",
  "name": "jdbc_source_mysql_10",
  "connection.url": "jdbc:mysql://mysql:3306/demo"
}

5.在MySQL中運行此查詢發(fā)現(xiàn)能正常執(zhí)行:

mysql> SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;
+------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+
| id   | customer_id | amount | currency | txn_timestamp        | first_name | last_name | email                      | gender | comments                                             |
+------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+
|    1 |           5 | -72.97 | RUB      | 2018-12-12T13:58:37Z | Modestia   | Coltart   | mcoltart4@scribd.com       | Female | Reverse-engineered non-volatile success

6.所以肯定是Kafka連接器在執(zhí)行時做了什么戈抄。鑒于錯誤消息引用t.id划鸽,這是在incrementing.column.name參數(shù)中指定的裸诽,可能問題與此有關(guān)丈冬。通過將Kafka連接器的日志級別調(diào)整為DEBUG埂蕊,可以看到執(zhí)行的完整SQL語句:

DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]} prepared SQL query: SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)

7.看一下該prepared SQL query部分蓄氧,可能會發(fā)現(xiàn):

[…] FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC

8.注意在JOIN子句的c.id后面有語句終止符(;),后面有WHERE子句堂氯。該WHERE子句由Kafka連接器附加祖灰,用于實現(xiàn)所要求的incrementing模式,但創(chuàng)建了一個無效的SQL語句叁扫; 9.然后在GitHub中查找與看到的錯誤相關(guān)的問題莫绣,因為有時它實際上是一個已知的問題对室,例如這個問題掩宜; 10.如果連接器存在并且是RUNNING牺汤,并且Kafka連接器工作節(jié)點日志中也沒有錯誤檐迟,還應(yīng)該檢查:

- 連接器的提取間隔是多少追迟?也許它完全按照配置運行敦间,并且源表中的數(shù)據(jù)已經(jīng)更改金闽,但就是沒有拉取到新數(shù)據(jù)。要檢查這一點挤庇,可以在Kafka連接器工作節(jié)點的輸出中查找`JdbcSourceTaskConfig`的值和`poll.interval.ms`的值嫡秕;
- 如果正在使用的是增量攝取昆咽,Kafka連接器關(guān)于偏移量是如何存儲的掷酗?如果刪除并重建相同名稱的連接器,則將保留前一個實例的偏移量浮声∮净樱考慮這樣的場景羡洁,創(chuàng)建完連接器之后筑煮,成功地將所有數(shù)據(jù)提取到源表中的給定主鍵或時間戳值真仲,然后刪除并重新創(chuàng)建了它秸应,新版本的連接器將獲得之前版本的偏移量桑谍,因此僅提取比先前處理的數(shù)據(jù)更新的數(shù)據(jù)锣披,具體可以通過查看保存在其中的`offset.storage.topic`值和相關(guān)表來驗證這一點雹仿。

#重置JDBC源連接器讀取數(shù)據(jù)的點

當(dāng)Kafka連接器以分布式模式運行時胧辽,它會在Kafka主題(通過offset.storage.topic配置)中存儲有關(guān)它在源系統(tǒng)中讀取的位置(稱為偏移量)的信息邑商,當(dāng)連接器任務(wù)重啟時,它可以從之前的位置繼續(xù)進行處理,具體可以在連接器工作節(jié)點日志中看到:

INFO Found offset {{protocol=1, table=demo.accounts}={timestamp_nanos=0, timestamp=1547030056000}, {table=accounts}=null} for partition {protocol=1, table=demo.accounts} (io.confluent.connect.jdbc.source.JdbcSourceTask)

每次連接器輪詢時芹务,都會使用這個偏移量枣抱,它會使用預(yù)編譯的SQL語句桅狠,并且使用Kafka連接器任務(wù)傳遞的值替換?占位符:

DEBUG TimestampIncrementingTableQuerier{table="demo"."accounts", query='null', topicPrefix='mysql-08-', incrementingColumn='', timestampColumns=[UPDATE_TS]} prepared SQL query: SELECT * FROM `demo`.`accounts` WHERE `demo`.`accounts`.`UPDATE_TS` > ? AND `demo`.`accounts`.`UPDATE_TS` < ? ORDER BY `demo`.`accounts`.`UPDATE_TS` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
DEBUG Executing prepared statement with timestamp value = 2019-01-09 10:34:16.000 end time = 2019-01-09 13:23:40.000 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)

這里中跌,第一個時間戳值就是存儲的偏移量漩符,第二個時間戳值是當(dāng)前時間戳。

雖然沒有文檔記載闷沥,但可以手動更改連接器使用的偏移量舆逃,因為是在JDBC源連接器的上下文中颖侄,所以可以跨多個源連接器類型,這意味著更改時間戳或主鍵展蒂,連接器會將后續(xù)記錄視為未處理的狀態(tài)锰悼。

首先要做的是確保Kafka連接器已經(jīng)刷新了周期性的偏移量账阻,可以在工作節(jié)點日志中看到何時執(zhí)行此操作:

INFO WorkerSourceTask{id=jdbc_source_mysql_08-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

看下Kafka的主題,可以看到Kafka連接器創(chuàng)建的內(nèi)部主題,并且負責(zé)偏移量的主題也是其中之一臼婆,名字可能有所不同:

ksql> LIST TOPICS;

 Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
 docker-connect-configs | false      | 1          | 1                  | 0         | 0
 docker-connect-offsets | false      | 1          | 1                  | 0         | 0
 docker-connect-status  | false      | 5          | 1                  | 0         | 0

ksql> PRINT 'docker-connect-offsets' FROM BEGINNING;
Format:JSON
{"ROWTIME":1547038346644,"ROWKEY":"[\"jdbc_source_mysql_08\",{\"protocol\":\"1\",\"table\":\"demo.customers\"}]","timestamp_nanos":0,"timestamp":1547030057000}

當(dāng)Kafka連接器任務(wù)啟動時,它會讀取此主題并使用適當(dāng)主鍵的最新值颁独。要更改偏移量奖唯,只需插入一個新值即可丰捷。最簡單的方法是轉(zhuǎn)存當(dāng)前主題內(nèi)容,修改內(nèi)容并重新執(zhí)行停巷,因為一致性和簡單畔勤,可以考慮使用kafkacat

  • 轉(zhuǎn)存當(dāng)前的內(nèi)容:
$ kafkacat -b kafka:29092 -t docker-connect-offsets -C -K# -o-1
% Reached end of topic docker-connect-offsets [0] at offset 0
["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547030056000}
);

如果是多個連接器,可能復(fù)雜些缸榛,但是這里只有一個内颗,所以使用了-o-1標(biāo)志,它定義了返回的偏移量负懦。

  • 根據(jù)需要修改偏移量纸厉。在這里使用了mode=timestamp來監(jiān)測表中的變化。時間戳值是1547030056000躯枢,使用相關(guān)的時間戳轉(zhuǎn)換之類的工具锄蹂,可以很容易地轉(zhuǎn)換和操作,比如將其提前一小時(1547026456000)朝抖。接下來治宣,使用更新后的timestamp值準(zhǔn)備新消息:
["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}

  • 將新消息發(fā)給主題:
echo '["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}' | \
kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#

  • 如果要從頭開始重啟連接器,可以發(fā)送NULL消息值:
echo'[“jdbc_source_mysql_08”绊茧,{“protocol”:“1”,“table”:“demo.accounts”}]#'| \
kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#

  • 重啟連接器任務(wù):
curl -i -X POST -H "Accept:application/json" \
        -H "Content-Type:application/json" http://localhost:8083/connectors/jdbc_source_mysql_08/tasks/0/restart

  • 也可以只重啟Kafka連接器工作節(jié)點唯绍,重啟之后况芒,數(shù)據(jù)源中所有比新設(shè)置的偏移量更新的記錄耐版,都會被重新提取到Kafka主題中粪牲。

從指定的時間戳或者主鍵處開啟表的捕獲

當(dāng)使用時間戳或自增主鍵模式創(chuàng)建JDBC源連接器時,它會從主鍵為-1和/或時間戳為1970-01-01 00:00:00.00開始亭引,這意味著會獲得表的全部內(nèi)容焙蚓,然后在后續(xù)的輪詢中獲取任何插入/更新的數(shù)據(jù)。

但是如果不想要表的完整副本君丁,只是希望連接器從現(xiàn)在開始,該怎么辦呢印蔗?這在目前的Kafka連接器中還不支持,但可以使用前述的方法耙厚。不需要獲取現(xiàn)有的偏移量消息并對其進行定制岔霸,而是自己創(chuàng)建薛躬。消息的格式依賴于正在使用的連接器和表的名稱,一種做法是先創(chuàng)建連接器呆细,確定格式型宝,然后刪除連接器,另一種做法是使用具有相同源表名和結(jié)構(gòu)的環(huán)境,除非在該環(huán)境中沒有可供連接器提取的數(shù)據(jù)梨树,否則同樣也能得到所需的消息格式。

在創(chuàng)建連接器之前岖寞,使用適當(dāng)?shù)闹蹬渲闷屏恐黝}抡四。在這里,希望從demo.transactions表中提取自增主鍵大于42的所有行:

echo '["jdbc_source_mysql_20",{"protocol":"1","table":"demo.transactions"}]#{"incrementing":42}' | \
kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#

下面創(chuàng)建連接器:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_20",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-20-",
                "mode":"incrementing",
                "table.whitelist" : "demo.transactions",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

在生成的Kafka連接器工作日志中慎璧,可以看到:

INFO Found offset {{protocol=1, table=demo.transactions}={incrementing=42}, {table=transactions}=null} for partition {protocol=1, table=demo.transactions} (io.confluent.connect.jdbc.source.JdbcSourceTask)
…
DEBUG Executing prepared statement with incrementing value = 42 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)

和預(yù)期一樣床嫌,Kafka主題中只注入了txn_id大于42的行:

ksql> PRINT 'mysql-20x-transactions' FROM BEGINNING;
Format:AVRO
1/9/19 1:44:07 PM UTC, null, {"txn_id": 43, "customer_id": 3, "amount": {"bytes": "ús"}, "currency": "CNY", "txn_timestamp": "2018-12-15T08:23:24Z"}
1/9/19 1:44:07 PM UTC, null, {"txn_id": 44, "customer_id": 5, "amount": {"bytes": "\f!"}, "currency": "CZK", "txn_timestamp": "2018-10-04T13:10:17Z"}
1/9/19 1:44:07 PM UTC, null, {"txn_id": 45, "customer_id": 3, "amount": {"bytes": "?ò"}, "currency": "USD", "txn_timestamp": "2018-04-03T03:40:49Z"}

#配置Kafka消息鍵

Kafka消息是鍵/值對,其中值是有效內(nèi)容胸私。在JDBC連接器的上下文中,值是要被提取的表行的內(nèi)容鳖谈。Kafka消息中的鍵對于分區(qū)和下游處理非常重要岁疼,其中任何關(guān)聯(lián)(比如KSQL)都將在數(shù)據(jù)中完成。

JDBC連接器默認不設(shè)置消息鍵缆娃,但是使用Kafka連接器的單消息轉(zhuǎn)換(SMT)機制可以輕松實現(xiàn)捷绒。假設(shè)想要提取accounts表并將其ID列用作消息鍵。只需簡單地將其添加到下面的配置中即可:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_06",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-06-",
                "poll.interval.ms" : 3600000,
                "table.whitelist" : "demo.accounts",
                "mode":"bulk",
                "transforms":"createKey,extractInt",
                "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.createKey.fields":"id",
                "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.extractInt.field":"id"
                }
        }'

這時如果使用諸如kafka-avro-console-consumer之類的工具檢查數(shù)據(jù)贯要,就會看到鍵(JSON內(nèi)容之前的最左列)與id值匹配:

kafka-avro-console-consumer \
      --bootstrap-server kafka:29092 \
      --property schema.registry.url=http://schema-registry:8081 \
      --topic mysql-06-accounts --from-beginning --property print.key=true

1       {"id":{"int":1},"first_name":{"string":"Hamel"},"last_name":{"string":"Bly"},"username":{"string":"Hamel Bly"},"company":{"string":"Erdman-Halvorson"},"created_date":{"int":17759}}
2       {"id":{"int":2},"first_name":{"string":"Scottie"},"last_name":{"string":"Geerdts"},"username":{"string":"Scottie Geerdts"},"company":{"string":"Mante Group"},"created_date":{"int":17692}}

如果要在數(shù)據(jù)中設(shè)置鍵以便與KSQL一起使用暖侨,則需要將其創(chuàng)建為字符串類型,因為KSQL目前不支持其它鍵類型崇渗,具體可以在連接器配置中添加如下內(nèi)容:

"key.converter": "org.apache.kafka.connect.storage.StringConverter"

然后就可以在KSQL中使用了:

ksql> CREATE STREAM ACCOUNTS WITH (KAFKA_TOPIC='mysql-06X-accounts', VALUE_FORMAT='AVRO');
ksql> SELECT ROWKEY, ID, FIRST_NAME + ' ' + LAST_NAME FROM ACCOUNTS;
1 | 1 | Hamel Bly
2 | 2 | Scottie Geerdts
3 | 3 | Giana Bryce

#更改主題名稱

JDBC連接器要求指定topic.prefix字逗,但如果不想要,或者想將主題名更改為其它模式宅广,SMT可以實現(xiàn)葫掉。

假設(shè)要刪除mysql-07-前綴,那么需要一點正則表達式的技巧:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_07",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-07-",
                "poll.interval.ms" : 3600000,
                "catalog.pattern" : "demo",
                "table.whitelist" : "accounts",
                "mode":"bulk",
                "transforms":"dropTopicPrefix",
                "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
                "transforms.dropTopicPrefix.regex":"mysql-07-(.*)",
                "transforms.dropTopicPrefix.replacement":"$1"
                }
        }'

這樣主題名就和表名一致了:

ksql> LIST TOPICS;

 Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
 accounts               | false      | 1          | 1                  | 0         | 0

#Bytes, Decimals, Numerics和自定義類型

這是話題比較深入跟狱。

  • numeric.mapping: best_fit如果源中包含NUMERIC/NUMBER類型的數(shù)據(jù)俭厚,則可能需要這個配置項;
  • 如果需要驶臊,可以在JDBC連接器中使用query選項挪挤,用于對源表中的數(shù)據(jù)進行轉(zhuǎn)換;
  • 如果字段以JDBCDECIMAL類型暴露关翎,則numeric.mapping無法處理:
    • MySQL將所有數(shù)值存儲為DECIMAL扛门;
    • SQL Server將DECIMALNUMERIC原生存儲,因此必須將DECIMAL字段轉(zhuǎn)換為NUMERIC笤休;
  • 在Oracle中尖飞,要在NUMBER字段中指定長度和標(biāo)度,例如NUMBER(5,0),不能是NUMBER政基;
  • NUMERICDECIMAL都被視為NUMBER贞铣,INT也是;

完成之后沮明,下面會做一個解釋:

Kafka連接器是一個可以將數(shù)據(jù)注入Kafka辕坝、與特定源技術(shù)無關(guān)的框架。無論是來自SQL Server荐健、DB2酱畅、MQTT、文本文件江场、REST還是Kafka連接器支持的任何其它數(shù)十種來源纺酸,它發(fā)送給Kafka的數(shù)據(jù)格式都為AvroJSON,這通常是一個透明的過程址否,只是在處理數(shù)值數(shù)據(jù)類型時有些特別餐蔬,比如DECIMALNUMBER等等佑附,以下面的MySQL查詢?yōu)槔?/p>

mysql> SELECT * FROM transactions LIMIT 1;
+--------+-------------+--------+----------+----------------------+
| txn_id | customer_id | amount | currency | txn_timestamp        |
+--------+-------------+--------+----------+----------------------+
|      1 |           5 | -72.97 | RUB      | 2018-12-12T13:58:37Z |
+--------+-------------+--------+----------+----------------------+

挺正常是吧樊诺?其實,amount列是DECIMAL(5,2)

mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id        | int(11)      | YES  |     | NULL    |       |
| customer_id   | int(11)      | YES  |     | NULL    |       |
| amount        | decimal(5,2) | YES  |     | NULL    |       |
| currency      | varchar(50)  | YES  |     | NULL    |       |
| txn_timestamp | varchar(50)  | YES  |     | NULL    |       |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.00 sec)

但是當(dāng)使用JDBC連接器的默認設(shè)置提取到Kafka中時音同,最終會是這樣:

ksql> PRINT 'mysql-02-transactions' FROM BEGINNING;
Format:AVRO
1/4/19 5:38:45 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": {"bytes": "?\u007F"}, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}

DECIMAL變成了一個看似亂碼的bytes值词爬,連接器默認會使用自己的DECIMAL邏輯類型,該類型在Avro中被序列化為字節(jié)权均,可以通過查看Confluent模式注冊表中的相關(guān)條目來看到這一點:

$ curl -s "http://localhost:8081/subjects/mysql-02-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")'
{
  "name": "amount",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

當(dāng)連接器使用AvroConverter消費時顿膨,這會正常處理并保存為DECIMAL(并且在Java中也可以反序列化為BigDecimal),但對于反序列化Avro的其它消費者螺句,它們只會得到字節(jié)虽惭。在使用啟用了模式的JSON時,也會看到這一點蛇尚,amount值會是Base64編碼的字節(jié)字符串:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "2"
        },
        "field": "amount"
      },
  },
  "payload": {
    "txn_id": 1000,
    "customer_id": 5,
    "amount": "Cv8="
  }
}

因此芽唇,不管使用的是JSON還是Avro,這都是numeric.mapping配置項的來源取劫。它默認設(shè)置為none(即使用連接器的DECIMAL類型)匆笤,但通常希望連接器將類型實際轉(zhuǎn)換為更兼容的類型,以適合數(shù)字的精度谱邪,更具體的說明炮捧,可以參見相關(guān)的文檔

此選項目前不支持DECIMAL類型惦银,因此這里是在Postgres中具有NUMERIC類型的相同原理的示例:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
              "name": "jdbc_source_postgres_12",
              "config": {
                      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                      "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                      "connection.user": "connect_user",
                      "connection.password": "asgard",
                      "topic.prefix": "postgres-12-",
                      "numeric.mapping": "best_fit",
                      "table.whitelist" : "demo.transactions",
                      "mode":"bulk",
                      "poll.interval.ms" : 3600000
                      }
              }'

結(jié)果如下所示:

ksql> PRINT 'postgres-12-transactions' FROM BEGINNING;
Format:AVRO
1/7/19 6:27:16 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": -72.97, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}

可以在這里看到有關(guān)此內(nèi)容的更多詳細信息咆课,以及Postgres末誓、Oracle和MS SQL Server中的示例。

#處理多個表

如果需要從多個表中提取數(shù)據(jù)书蚪,則可以通過并行處理來減少總提取時間喇澡,這在Kafka的JDBC連接器有兩種方法:

  1. 定義多個連接器,每個連接器都處理單獨的表殊校;
  2. 定義單個連接器晴玖,但增加任務(wù)數(shù)。每個Kafka連接器的工作由一個或多個任務(wù)來執(zhí)行为流,每個連接器默認只有一個任務(wù)呕屎,這意味著從數(shù)據(jù)庫中提取數(shù)據(jù)是單進程處理的。

前者具有更高的管理開銷敬察,但確實提供了每個表自定義設(shè)置的靈活性秀睛。如果可以使用相同的連接器配置提取所有表,則增加單個連接器中的任務(wù)數(shù)是一種好方法莲祸。

當(dāng)增加從數(shù)據(jù)庫中提取數(shù)據(jù)的并發(fā)性時琅催,要從整體上考慮。因為運行一百個并發(fā)任務(wù)雖然可能會更快虫给,但數(shù)百個與數(shù)據(jù)庫的連接可能會對數(shù)據(jù)庫產(chǎn)生負面影響。

以下是同一連接器的兩個示例侠碧。兩者都將從數(shù)據(jù)庫中提取所有表抹估,總共6個。在第一個連接器中弄兜,未指定最大任務(wù)數(shù)药蜻,因此為默認值1。在第二個中替饿,指定了最多運行三個任務(wù)("tasks.max":3):

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_01",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-01-",
                "mode":"bulk"
                }
        }'

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_11",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://mysql:3306/demo",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "mysql-11-",
                "mode":"bulk",
                "tasks.max":3
                }
        }'

當(dāng)查詢連接器的Kafka連接器RESTAPI時语泽,可以看到每個連接器正在運行的任務(wù)數(shù)以及它們已分配的表。第一個連接器有一個任務(wù)負責(zé)所有6張表:

$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_01/tasks"|jq '.'
[
  {
    "id": {
      "connector": "jdbc_source_mysql_01",
      "task": 0
    },
    "config": {
      "tables": "`demo`.`NUM_TEST`,`demo`.`accounts`,`demo`.`customers`,`demo`.`transactions`,`security`.`firewall`,`security`.`log_events`",
      …
    }
  }
]

第二個連接器有3個任務(wù)视卢,每個任務(wù)分配2張表:

$ curl -s“http:// localhost:8083 / connectors / jdbc_source_mysql_11 / tasks”| jq'踱卵。'
[
  {
    “ID”: {
      “connector”:“jdbc_source_mysql_11”,“任務(wù)”:0
    }据过,
    “config”:{
      “tables”:“`demo` .NUM_TEST`惋砂,`demo` .accounts`”,
      ...
    }
  }绳锅,
  {
    “ID”: {
      “connector”:“jdbc_source_mysql_11”西饵,“任務(wù)”:1
    },
    “config”:{
      “tables”:“`demo``customers`鳞芙,`demo` .transactions`”眷柔,
      ...
    }
  }期虾,
  {
    “ID”: {
      “connector”:“jdbc_source_mysql_11”,“任務(wù)”:2
    }驯嘱,
    “config”:{
      “tables”:“`security``firewall`镶苞,`security``log_events`”,
      ...
    }
  }
]
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末宙拉,一起剝皮案震驚了整個濱河市宾尚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌谢澈,老刑警劉巖煌贴,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異锥忿,居然都是意外死亡牛郑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門敬鬓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來淹朋,“玉大人,你說我怎么就攤上這事钉答〈∩郑” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵数尿,是天一觀的道長仑性。 經(jīng)常有香客問我,道長右蹦,這世上最難降的妖魔是什么诊杆? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮何陆,結(jié)果婚禮上晨汹,老公的妹妹穿的比我還像新娘。我一直安慰自己贷盲,他們只是感情好淘这,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著晃洒,像睡著了一般慨灭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上球及,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天氧骤,我揣著相機與錄音,去河邊找鬼吃引。 笑死筹陵,一個胖子當(dāng)著我的面吹牛刽锤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播朦佩,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼并思,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了语稠?” 一聲冷哼從身側(cè)響起宋彼,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎仙畦,沒想到半個月后输涕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡慨畸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年莱坎,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片寸士。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡檐什,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弱卡,到底是詐尸還是另有隱情乃正,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布婶博,位于F島的核電站烫葬,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏凡蜻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一垢箕、第九天 我趴在偏房一處隱蔽的房頂上張望划栓。 院中可真熱鬧,春花似錦条获、人聲如沸忠荞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽委煤。三九已至,卻和暖如春修档,著一層夾襖步出監(jiān)牢的瞬間碧绞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工吱窝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留讥邻,地道東北人迫靖。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像兴使,于是被迫代替她去往敵國和親系宜。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容