復(fù)制自https://liyuj.gitee.io/confluent/Kafka-ConvertersSerialization.html
Kafka連接器是Apache Kafka?的一部分甥厦,提供數(shù)據(jù)存儲與Kafka之間的流式集成巩螃。對于數(shù)據(jù)工程師來說坯辩,只需要使用JSON格式配置文件即可物遇。目前已經(jīng)有很多數(shù)據(jù)存儲的連接器,僅舉幾例來說朽砰,包括JDBC悲立,Elasticsearch,IBM MQ档冬,S3和BigQuery膘茎。
對于開發(fā)者,Kafka連接器有豐富的API捣郊,如有必要辽狈,可以開發(fā)自己的連接器。此外它還具有用于配置和管理連接器的REST API呛牲。
Kafka連接器本身是模塊化的刮萌,提供了非常強大的滿足集成需求的方法,部分關(guān)鍵組件包括:
連接器:定義了一組如何與數(shù)據(jù)存儲集成的JAR文件娘扩;
轉(zhuǎn)換器:處理數(shù)據(jù)的序列化和反序列化着茸;
變換:傳輸過程中的消息處理(可選)壮锻。
圍繞Kafka連接器,常見的錯誤或者誤解之一是數(shù)據(jù)的序列化涮阔,這是Kafka連接器通過轉(zhuǎn)換器進行處理的猜绣,下面會介紹它們的工作機制,并說明一些常見問題如何處理敬特。
#Kafka消息只是字節(jié)
Kafka消息是按照主題進行組織的掰邢。每條消息都是一個鍵/值對,不過Kafka就需要這些伟阔。當數(shù)據(jù)在Kafka中存儲時都只是字節(jié)辣之,這使得Kafka可以適用于各種場景,但這也意味著開發(fā)者有責(zé)任決定如何對數(shù)據(jù)進行序列化皱炉。
在配置Kafka連接器時怀估,標準步驟的關(guān)鍵之一是序列化格式,要確保主題的讀取方和寫入方使用相同的序列化格式合搅,否則會出現(xiàn)混亂和錯誤多搀!
常見的格式有很多,包括:
JSON;
Avro;
Protobuf;
字符串分割(如CSV)灾部。
每種格式都有優(yōu)點和缺點康铭。
#序列化格式的選擇
選擇序列化格式的一些原則包括:
模式:很多時候數(shù)據(jù)都會有一個模式∈嶂恚可能不喜歡這個事實麻削,但作為開發(fā)人員有責(zé)任保留和傳播此模式,因為模式提供了服務(wù)之間的契約春弥。某些消息格式(例如Avro和Protobuf)具有強大的模式支持呛哟,而其它消息格式支持較少(JSON)或根本沒有(分隔字符串);
生態(tài)系統(tǒng)兼容性:Avro是Confluent平臺的一等公民匿沛,得到了Confluent模式注冊表扫责、Kafka連接器、KSQL等的原生支持逃呼。而Protobuf則依賴于部分功能支持的社區(qū)貢獻鳖孤;
消息大小:JSON是純文本格式,消息大小依賴于Kafka本身的壓縮配置抡笼,而Avro和Protobuf都是二進制格式苏揣,因此消息較小推姻;
語言支持:Java體系對Avro有強大的支持平匈,但如果應(yīng)用不是基于Java的,那么可能會發(fā)現(xiàn)它不太容易處理。
#如果使用JSON格式寫入目標端增炭,需要在主題中使用JSON格式么忍燥?
不需要,不管是從源端讀取數(shù)據(jù)的格式隙姿,還是將數(shù)據(jù)寫入外部存儲梅垄,都不會影響Kafka中消息序列化的格式。
Kafka中的連接器負責(zé)從源端(例如數(shù)據(jù)庫)讀取數(shù)據(jù)输玷,并將其作為數(shù)據(jù)的內(nèi)部表示傳遞給轉(zhuǎn)換器,然后队丝,Kafka中的轉(zhuǎn)換器會將此源數(shù)據(jù)對象序列化到主題上。
當使用Kafka連接器作為接收端時欲鹏,正好相反炭玫,即轉(zhuǎn)換器將來自主題的數(shù)據(jù)反序列化為內(nèi)部表示,其會傳遞給連接器貌虾,然后使用指定方法寫入目標端。
這意味著可以在主題中比如以Avro格式保存數(shù)據(jù)裙犹,然后比如將其寫入HDFS時尽狠,再指定接收端連接器使用的格式。
#配置轉(zhuǎn)換器
Kafka連接器在工作節(jié)點級別使用默認的轉(zhuǎn)換器配置叶圃,也可以在每個連接器上覆蓋它袄膏。由于在整個流水線中使用相同的序列化格式通常是一個好的做法,所以通常只需在工作節(jié)點上配置轉(zhuǎn)換器掺冠,而無需在連接器中指定沉馆。但是如果從其它主題中提取數(shù)據(jù)而它們使用不同的序列化格式時,就要在連接器配置中指定它德崭,即使在連接器的配置中覆蓋它斥黑,執(zhí)行任務(wù)的還是那個轉(zhuǎn)換器。
正確的連接器永遠不會序列化/反序列化存儲在Kafka中的消息眉厨,而是讓配置的轉(zhuǎn)換器完成這項工作锌奴。
注意Kafka消息只是鍵/值字節(jié)對,因此需要使用key.converter和value.converter配置項為鍵和值指定轉(zhuǎn)換器憾股,某些情況下鹿蜀,可以為鍵和值指定不同的轉(zhuǎn)換器。
下面是使用String轉(zhuǎn)換器的示例服球,由于它只是一個字符串茴恰,數(shù)據(jù)沒有模式,因此用于value并不是那么有用:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
某些轉(zhuǎn)換器有其它配置項斩熊,對于Avro往枣,需要指定模式注冊表,對于JSON,需要指定是否希望Kafka連接器將模式嵌入JSON本身婉商。為轉(zhuǎn)換器指定配置項時似忧,要使用key.converter.或value.converter.前綴。例如要將Avro用于消息的內(nèi)容丈秩,需要指定以下配置項:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
常見的轉(zhuǎn)換器包括:
Avro:Confluent平臺的一部分
io.confluent.connect.avro.AvroConverter
String:Apache Kafka的一部分
org.apache.kafka.connect.storage.StringConverter
JSON:Apache Kafka的一部分
org.apache.kafka.connect.json.JsonConverter
ByteArray:Apache Kafka的一部分
org.apache.kafka.connect.converters.ByteArrayConverter
Protobuf:社區(qū)開源
com.blueapron.connect.protobuf.ProtobufConverter
#JSON和模式
雖然JSON默認不支持攜帶模式盯捌,但Kafka連接器確實支持嵌入模式的特定JSON格式。由于模式也包含在每個消息中蘑秽,因此生成的數(shù)據(jù)大小可能會變大饺著。
如果正在配置Kafka源連接器并且希望Kafka連接器在寫入Kafka的消息中包含模式,需要做如下的配置:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
最終向Kafka寫入的消息大致如下肠牲,schema以及payload為JSON中的頂級元素:
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"registertime"},{"type":"string","optional":false,"field":"userid"},{"type":"string","optional":false,"field":"regionid"},{"type":"string","optional":false,"field":"gender"}],"optional":false,"name":"ksql.users"},"payload":{"registertime":1493819497170,"userid":"User_1","regionid":"Region_5","gender":"MALE"}}
請注意消息的大小幼衰,以及由內(nèi)容與模式組成的消息的大小∽忽ǎ考慮到在每條消息中都重復(fù)這一點渡嚣,就會看到為什么像Avro這樣的格式很有意義,因為模式是單獨存儲的肥印,而消息只包含有效內(nèi)容(并進行過壓縮)识椰。
如果從一個Kafka主題中使用Kafka接收連接器消費JSON格式的數(shù)據(jù),則需要了解數(shù)據(jù)中是否包含模式深碱,如果包含腹鹉,則要與上面的格式相同,而不能是一些任意的格式敷硅,那么配置如下:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
不過功咒,如果使用的JSON數(shù)據(jù)沒有schema/payload結(jié)構(gòu),像下面這樣:
{"registertime":1489869013625,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
則必須通過配置通知Kafka連接器不要尋找模式绞蹦,如下:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
和以前一樣力奋,要記住轉(zhuǎn)換器配置項(此處schemas.enable)需要合適的前綴key.converter或value.converter。
#常見錯誤
如果Kafka連接器中轉(zhuǎn)換器配置不正確幽七,可能遇到以下一些常見錯誤刊侯。這些消息會出現(xiàn)在Kafka連接器配置的接收端中,因為這里是對存儲在Kafka中的消息進行反序列化的點锉走。轉(zhuǎn)換器問題通常不會在源端發(fā)生滨彻,因為源端已經(jīng)配置了序列化。其中每個都會導(dǎo)致連接器失敗挪蹭,開始的錯誤為:
ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)
? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)
這個錯誤之后亭饵,會看到一個異常堆棧,其中描述了錯誤的原因梁厉。注意對于連接器中的任何嚴重錯誤辜羊,都會拋出上述錯誤踏兜,因此可能會看到與序列化無關(guān)的錯誤。要快速定位錯誤是由哪個錯誤配置導(dǎo)致的八秃,可參考下表:
問題:使用JsonConverter讀取非JSON格式數(shù)據(jù)
如果源端主題上有非JSON格式的數(shù)據(jù)碱妆,但是使用JsonConverter進行讀取,就會看到:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)
這可能是因為源端主題以Avro或其它格式序列化引起的昔驱。
解決方案:如果數(shù)據(jù)實際上是Avro格式疹尾,則需要按照如下方式修改Kafka連接器的接收端:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
或者,如果主題由Kafka連接器注入骤肛,也可以調(diào)整上游源端纳本,讓其輸出JSON格式數(shù)據(jù):
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
問題:使用AvroConverter讀取非Avro格式數(shù)據(jù)
這是最常見的錯誤,當嘗試使用AvroConverter從非Avro格式的主題讀取數(shù)據(jù)時腋颠,會發(fā)生這種情況繁成,還包括使用非Confluent模式注冊表的Avro序列化器寫入的數(shù)據(jù):
org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
解決方案:檢查源端主題的序列化格式,調(diào)整Kafka連接器接收端使用正確的轉(zhuǎn)換器淑玫,或?qū)⑸嫌胃袷叫薷臑锳vro(這樣最好)巾腕。如果上游主題由Kafka連接器注入,也可以按如下方式配置源端連接器的轉(zhuǎn)換器:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
問題:讀取沒有期望的schema/payload結(jié)構(gòu)的JSON數(shù)據(jù)
如前所述絮蒿,Kafka連接器支持包含有效內(nèi)容和模式的特殊JSON格式消息結(jié)構(gòu)祠墅,如果讀取的JSON數(shù)據(jù)不是這樣的結(jié)構(gòu),會有下面的錯誤:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
要知道歌径,對于schemas.enable=true唯一有效的JSON結(jié)構(gòu)是,schema和payload作為頂級元素(如上所示)亲茅。
正如錯誤消息本身所述回铛,如果只是簡單的JSON數(shù)據(jù),則應(yīng)將連接器的配置更改為:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
如果要在數(shù)據(jù)中包含模式克锣,要么切換到使用Avro(推薦)茵肃,要么配置上游的Kafka連接器以在消息中包含模式:
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
#解決問題的提示
查看連接器工作節(jié)點的日志
要查看Kafka連接器的錯誤日志,需要定位到Kafka連接器工作節(jié)點的輸出袭祟。這個位置取決于Kafka連接器是如何啟動的验残,安裝Kafka連接器有好幾種方法,包括Docker巾乳、Confluent CLI您没、systemd和手動下載的壓縮包,然后工作節(jié)點的日志分別位于:
Docker:docker logs container_name胆绊;
Confluent CLI:confluent log connect氨鹏;
systemd:日志文件寫入/var/log/confluent/kafka-connect;
其它:默認情況下压状,Kafka連接器會將其輸出發(fā)送到stdout仆抵,因此可以在啟動Kafka連接器的終端會話中看到。
查看Kafka連接器的配置文件
要更改Kafka連接器工作節(jié)點的配置項(適用于所有運行的連接器),需要相應(yīng)地做出如下的修改:
Docker:配置環(huán)境變量镣丑,比如在Docker Compose中:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
Confluent CLI:使用配置文件/etc/schema-registry/connect-avro-distributed.properties舔糖;
systemd(deb/rpm):使用配置文件/etc/kafka/connect-distributed.properties;
其它:啟動Kafka連接器時莺匠,可以指定工作節(jié)點的屬性文件金吗,例如:
$cdconfluent-5.0.0$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
檢查Kafka主題
假設(shè)遇到了上面提到過的錯誤,并且想要解決為什么Kafka連接器的接收端無法從主題中讀取數(shù)據(jù)慨蛙。
這時需要檢查正在讀取的主題的數(shù)據(jù)辽聊,并確認它采用了期望的序列化格式。另外期贫,要記住所有消息都必須采用這種格式跟匆,所以不要只是假設(shè)因為現(xiàn)在以正確的格式向主題發(fā)送消息,所以不會出現(xiàn)問題通砍。Kafka連接器和其它消費者也會讀取該主題的已有消息玛臂。
下面將使用命令行來描述如何進行故障排除,但還有一些其它工具也可以做:
Confluent控制中心有通過可視化的方式查看主題內(nèi)容的功能封孙,包括自動確定序列化格式迹冤;
KSQL的PRINT命令會將主題的內(nèi)容打印到控制臺,包括自動確定序列化格式虎忌;
Confluent CLI工具有consume命令泡徙,其可被用于讀取字符串和Avro格式的數(shù)據(jù)。
如果認為是字符串/JSON格式數(shù)據(jù)
可以使用控制臺工具膜蠢,包括kafkacat和kafka-console-consumer堪藐,以kafkacat為例:
$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1{"registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}
使用jq命令,還可以驗證和格式化JSON:
$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq'.'{"registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}
如果你看到了下面這樣的亂碼字符挑围,其很可能是二進制數(shù)據(jù)礁竞,比如通過Avro或Protobuf格式寫入就是這樣的:
$ kafkacat -b localhost:9092 -t users-avro -C -c1
????VUser_9Region_MALE
如果認為是Avro格式數(shù)據(jù)
需要使用專為讀取和反序列化Avro數(shù)據(jù)而設(shè)計的控制臺工具,這里會使用kafka-avro-console-consumer杉辙。先要確認指定了正確的模式注冊表URL:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092\--property schema.registry.url=http://localhost:8081\--topic users-avro\--from-beginning --max-messages1{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}
和之前一樣模捂,如果要對其格式化,可以通過管道輸出結(jié)果給jq:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092\--property schema.registry.url=http://localhost:8081\--topic users-avro\--from-beginning --max-messages1|\jq'.'{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}
#內(nèi)部轉(zhuǎn)換器
當運行在分布式模式時蜘矢,Kafka連接器使用Kafka本身來存儲有關(guān)其操作的元數(shù)據(jù)狂男,包括連接器配置,偏移量等品腹。
通過internal.key.converter/internal.value.converter配置項并淋,這些Kafka主題本身可以配置使用不同的轉(zhuǎn)換器。但是這些配置項只供內(nèi)部使用珍昨,實際上從Kafka 2.0版本開始就已被棄用县耽。不再需要修改這些句喷,如果還要修改這些配置項,從Kafka的2.0版本開始兔毙,將會收到警告唾琼。
#將模式應(yīng)用于沒有模式的消息
很多時候Kafka連接器會從已經(jīng)存在模式的地方引入數(shù)據(jù),這時只要保留該模式然后使用合適的序列化格式(例如Avro)澎剥,加上比如模式注冊表等提供的兼容性保證锡溯,該數(shù)據(jù)的所有下游用戶就都可以從可用的模式中受益。但是如果沒有明確的模式呢哑姚?
可能正使用FileSourceConnector從純文本文件中讀取數(shù)據(jù)(不建議用于生產(chǎn)祭饭,但通常用于PoC),或者可能正在使用REST連接器從REST端點提取數(shù)據(jù)叙量。由于這兩者以及其它的都沒有固有的模式倡蝙,因此需要進行聲明。
有時可能只是想從源端讀取字節(jié)然后將它們寫入一個主題上绞佩,但大多數(shù)情況下需要做正確的事情并應(yīng)用模式以便數(shù)據(jù)可以正確地處理寺鸥。作為數(shù)據(jù)提取的一部分處理一次,而不是將問題推送到每個消費者(可能是多個)品山,這是一個更好的做法胆建。
可以編寫自己的Kafka流式應(yīng)用以將模式應(yīng)用于Kafka主題中的數(shù)據(jù),但也可以使用KSQL肘交。這篇文章展示了如何對從REST端點提取的JSON數(shù)據(jù)執(zhí)行此操作笆载。下面會看一下將模式應(yīng)用于某些CSV數(shù)據(jù)的簡單示例,顯然是可以做到的涯呻。
假設(shè)有一個名為testdata-csv的Kafka主題凉驻,其中有一些CSV數(shù)據(jù),大致如下:
$ kafkacat -b localhost:9092 -t testdata-csv -C1,Rick Astley,Never Gonna Give You Up2,Johnny Cash,Ring of Fire
通過觀察魄懂,可以猜測它有三個字段,可能為:
ID闯第;
藝術(shù)家市栗;
歌曲。
如果將數(shù)據(jù)保留在這樣的主題中咳短,那么任何想要使用該數(shù)據(jù)的應(yīng)用程序(可能是Kafka連接器接收端填帽、定制的Kafka應(yīng)用或者其它),都需要每次猜測這個模式咙好〈垭纾或者更糟糕的是,每個消費端應(yīng)用的開發(fā)者都需要不斷向數(shù)據(jù)提供方確認模式及其任何變更勾效。正如Kafka解耦系統(tǒng)一樣嘹悼,這種模式依賴性迫使團隊之間存在硬性耦合叛甫,這并不是一件好事。
因此要做的只是使用KSQL將模式應(yīng)用于數(shù)據(jù)杨伙,并填充一個新的派生主題其监,其中保存模式。在KSQL中限匣,可以像下面這樣查看主題數(shù)據(jù):
ksql> PRINT 'testdata-csv' FROM BEGINNING;
Format:STRING
11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up
11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire
這里的前兩個字段(11/6/18 2:41:23 PM UTC和NULL)分別是Kafka消息的時間戳和鍵抖苦,而其余字段來自CSV文件。下面用KSQL注冊這個主題并聲明模式:
ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \
WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');
Message
----------------
Stream created
----------------
通過KSQL可以查看現(xiàn)在有一個數(shù)據(jù)流模式:
ksql> DESCRIBE TESTDATA_CSV;
Name? ? ? ? ? ? ? ? : TESTDATA_CSV
Field? | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY? | VARCHAR(STRING) (system)
ID? ? ? | INTEGER
ARTIST? | VARCHAR(STRING)
SONG? ? | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
通過查詢KSQL流確認數(shù)據(jù)是否符合預(yù)期米死。注意對于已有的Kafka主題锌历,此時只是作為Kafka的消費者,尚未更改或復(fù)制任何數(shù)據(jù)峦筒。
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;
1 | Rick Astley | Never Gonna Give You Up
2 | Johnny Cash | Ring of Fire
最后究西,創(chuàng)建一個新的Kafka主題,由具有模式的重新序列化的數(shù)據(jù)填充勘天。KSQL查詢是連續(xù)的怔揩,因此除了將任何已有的數(shù)據(jù)從源端主題發(fā)送到目標端主題之外,KSQL還將向主題發(fā)送任何未來的數(shù)據(jù)脯丝。
ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;
Message
----------------------------
Stream created and running
----------------------------
這時使用Avro格式的控制臺消費者對數(shù)據(jù)進行驗證:
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --property schema.registry.url=http://localhost:8081 \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --topic TESTDATA \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --from-beginning | \
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? jq '.'
{
? "ID": {
? ? "int": 1
},
? "ARTIST": {
? ? "string": "Rick Astley"
},
? "SONG": {
? ? "string": "Never Gonna Give You Up"
? }
}
[…]
甚至可以在模式注冊表中查看已經(jīng)注冊的模式:
$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'
{
? "type": "record",
? "name": "KsqlDataSourceSchema",
? "namespace": "io.confluent.ksql.avro_schemas",
? "fields": [
? ? {
? ? ? "name": "ID",
? ? ? "type": [
? ? ? ? "null",
? ? ? ? "int"
? ? ? ],
? ? ? "default": null
? ? },
? ? {
? ? ? "name": "ARTIST",
? ? ? "type": [
? ? ? ? "null",
? ? ? ? "string"
? ? ? ],
? ? ? "default": null
? ? },
? ? {
? ? ? "name": "SONG",
? ? ? "type": [
? ? ? ? "null",
? ? ? ? "string"
? ? ? ],
? ? ? "default": null
? ? }
? ]
}
任何寫入原始主題(testdata-csv)的新消息都由KSQL自動處理商膊,并寫入Avro格式的名為TESTDATA的新主題。現(xiàn)在宠进,任何想要使用此數(shù)據(jù)的應(yīng)用或團隊都可以簡單地處理TESTDATA主題晕拆,并利用聲明模式的Avro序列化數(shù)據(jù)。還可以使用此技術(shù)更改主題中的分區(qū)數(shù)材蹬,分區(qū)鍵和復(fù)制因子实幕。
#結(jié)論
Kafka連接器是一個非常簡單但功能強大的工具,可用于將其它系統(tǒng)與Kafka集成堤器,最常見的誤解是Kafka連接器提供的轉(zhuǎn)換器昆庇。之前已經(jīng)介紹過Kafka消息只是鍵/值對,了解應(yīng)該使用哪個序列化機制然后在Kafka連接器中對其進行標準化非常重要闸溃。