Kafka連接器深度解讀之轉(zhuǎn)換器和序列化

復(fù)制自https://liyuj.gitee.io/confluent/Kafka-ConvertersSerialization.html

Kafka連接器是Apache Kafka?的一部分甥厦,提供數(shù)據(jù)存儲與Kafka之間的流式集成巩螃。對于數(shù)據(jù)工程師來說坯辩,只需要使用JSON格式配置文件即可物遇。目前已經(jīng)有很多數(shù)據(jù)存儲的連接器,僅舉幾例來說朽砰,包括JDBC悲立,ElasticsearchIBM MQ档冬,S3BigQuery膘茎。

對于開發(fā)者,Kafka連接器有豐富的API捣郊,如有必要辽狈,可以開發(fā)自己的連接器。此外它還具有用于配置和管理連接器的REST API呛牲。

Kafka連接器本身是模塊化的刮萌,提供了非常強大的滿足集成需求的方法,部分關(guān)鍵組件包括:

連接器:定義了一組如何與數(shù)據(jù)存儲集成的JAR文件娘扩;

轉(zhuǎn)換器:處理數(shù)據(jù)的序列化和反序列化着茸;

變換:傳輸過程中的消息處理(可選)壮锻。

圍繞Kafka連接器,常見的錯誤或者誤解之一是數(shù)據(jù)的序列化涮阔,這是Kafka連接器通過轉(zhuǎn)換器進行處理的猜绣,下面會介紹它們的工作機制,并說明一些常見問題如何處理敬特。

#Kafka消息只是字節(jié)

Kafka消息是按照主題進行組織的掰邢。每條消息都是一個鍵/值對,不過Kafka就需要這些伟阔。當數(shù)據(jù)在Kafka中存儲時都只是字節(jié)辣之,這使得Kafka可以適用于各種場景,但這也意味著開發(fā)者有責(zé)任決定如何對數(shù)據(jù)進行序列化皱炉。

在配置Kafka連接器時怀估,標準步驟的關(guān)鍵之一是序列化格式,要確保主題的讀取方和寫入方使用相同的序列化格式合搅,否則會出現(xiàn)混亂和錯誤多搀!

常見的格式有很多,包括:

JSON;

Avro;

Protobuf;

字符串分割(如CSV)灾部。

每種格式都有優(yōu)點和缺點康铭。

#序列化格式的選擇

選擇序列化格式的一些原則包括:

模式:很多時候數(shù)據(jù)都會有一個模式∈嶂恚可能不喜歡這個事實麻削,但作為開發(fā)人員有責(zé)任保留和傳播此模式,因為模式提供了服務(wù)之間的契約春弥。某些消息格式(例如Avro和Protobuf)具有強大的模式支持呛哟,而其它消息格式支持較少(JSON)或根本沒有(分隔字符串);

生態(tài)系統(tǒng)兼容性:Avro是Confluent平臺的一等公民匿沛,得到了Confluent模式注冊表扫责、Kafka連接器、KSQL等的原生支持逃呼。而Protobuf則依賴于部分功能支持的社區(qū)貢獻鳖孤;

消息大小:JSON是純文本格式,消息大小依賴于Kafka本身的壓縮配置抡笼,而Avro和Protobuf都是二進制格式苏揣,因此消息較小推姻;

語言支持:Java體系對Avro有強大的支持平匈,但如果應(yīng)用不是基于Java的,那么可能會發(fā)現(xiàn)它不太容易處理。

#如果使用JSON格式寫入目標端增炭,需要在主題中使用JSON格式么忍燥?

不需要,不管是從源端讀取數(shù)據(jù)的格式隙姿,還是將數(shù)據(jù)寫入外部存儲梅垄,都不會影響Kafka中消息序列化的格式。

Kafka中的連接器負責(zé)從源端(例如數(shù)據(jù)庫)讀取數(shù)據(jù)输玷,并將其作為數(shù)據(jù)的內(nèi)部表示傳遞給轉(zhuǎn)換器,然后队丝,Kafka中的轉(zhuǎn)換器會將此源數(shù)據(jù)對象序列化到主題上。

當使用Kafka連接器作為接收端時欲鹏,正好相反炭玫,即轉(zhuǎn)換器將來自主題的數(shù)據(jù)反序列化為內(nèi)部表示,其會傳遞給連接器貌虾,然后使用指定方法寫入目標端。

這意味著可以在主題中比如以Avro格式保存數(shù)據(jù)裙犹,然后比如將其寫入HDFS時尽狠,再指定接收端連接器使用的格式

#配置轉(zhuǎn)換器

Kafka連接器在工作節(jié)點級別使用默認的轉(zhuǎn)換器配置叶圃,也可以在每個連接器上覆蓋它袄膏。由于在整個流水線中使用相同的序列化格式通常是一個好的做法,所以通常只需在工作節(jié)點上配置轉(zhuǎn)換器掺冠,而無需在連接器中指定沉馆。但是如果從其它主題中提取數(shù)據(jù)而它們使用不同的序列化格式時,就要在連接器配置中指定它德崭,即使在連接器的配置中覆蓋它斥黑,執(zhí)行任務(wù)的還是那個轉(zhuǎn)換器。

正確的連接器永遠不會序列化/反序列化存儲在Kafka中的消息眉厨,而是讓配置的轉(zhuǎn)換器完成這項工作锌奴。

注意Kafka消息只是鍵/值字節(jié)對,因此需要使用key.converter和value.converter配置項為鍵和值指定轉(zhuǎn)換器憾股,某些情況下鹿蜀,可以為鍵和值指定不同的轉(zhuǎn)換器。

下面是使用String轉(zhuǎn)換器的示例服球,由于它只是一個字符串茴恰,數(shù)據(jù)沒有模式,因此用于value并不是那么有用:

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

某些轉(zhuǎn)換器有其它配置項斩熊,對于Avro往枣,需要指定模式注冊表,對于JSON,需要指定是否希望Kafka連接器將模式嵌入JSON本身婉商。為轉(zhuǎn)換器指定配置項時似忧,要使用key.converter.或value.converter.前綴。例如要將Avro用于消息的內(nèi)容丈秩,需要指定以下配置項:

"value.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url": "http://schema-registry:8081",

常見的轉(zhuǎn)換器包括:

Avro:Confluent平臺的一部分

io.confluent.connect.avro.AvroConverter

String:Apache Kafka的一部分

org.apache.kafka.connect.storage.StringConverter

JSON:Apache Kafka的一部分

org.apache.kafka.connect.json.JsonConverter

ByteArray:Apache Kafka的一部分

org.apache.kafka.connect.converters.ByteArrayConverter

Protobuf:社區(qū)開源

com.blueapron.connect.protobuf.ProtobufConverter

#JSON和模式

雖然JSON默認不支持攜帶模式盯捌,但Kafka連接器確實支持嵌入模式的特定JSON格式。由于模式也包含在每個消息中蘑秽,因此生成的數(shù)據(jù)大小可能會變大饺著。

如果正在配置Kafka源連接器并且希望Kafka連接器在寫入Kafka的消息中包含模式,需要做如下的配置:

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=true

最終向Kafka寫入的消息大致如下肠牲,schema以及payload為JSON中的頂級元素:

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"registertime"},{"type":"string","optional":false,"field":"userid"},{"type":"string","optional":false,"field":"regionid"},{"type":"string","optional":false,"field":"gender"}],"optional":false,"name":"ksql.users"},"payload":{"registertime":1493819497170,"userid":"User_1","regionid":"Region_5","gender":"MALE"}}

請注意消息的大小幼衰,以及由內(nèi)容與模式組成的消息的大小∽忽ǎ考慮到在每條消息中都重復(fù)這一點渡嚣,就會看到為什么像Avro這樣的格式很有意義,因為模式是單獨存儲的肥印,而消息只包含有效內(nèi)容(并進行過壓縮)识椰。

如果從一個Kafka主題中使用Kafka接收連接器消費JSON格式的數(shù)據(jù),則需要了解數(shù)據(jù)中是否包含模式深碱,如果包含腹鹉,則要與上面的格式相同,而不能是一些任意的格式敷硅,那么配置如下:

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=true

不過功咒,如果使用的JSON數(shù)據(jù)沒有schema/payload結(jié)構(gòu),像下面這樣:

{"registertime":1489869013625,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}

則必須通過配置通知Kafka連接器不要尋找模式绞蹦,如下:

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false

和以前一樣力奋,要記住轉(zhuǎn)換器配置項(此處schemas.enable)需要合適的前綴key.converter或value.converter。

#常見錯誤

如果Kafka連接器中轉(zhuǎn)換器配置不正確幽七,可能遇到以下一些常見錯誤刊侯。這些消息會出現(xiàn)在Kafka連接器配置的接收端中,因為這里是對存儲在Kafka中的消息進行反序列化的點锉走。轉(zhuǎn)換器問題通常不會在源端發(fā)生滨彻,因為源端已經(jīng)配置了序列化。其中每個都會導(dǎo)致連接器失敗挪蹭,開始的錯誤為:

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)

? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)

這個錯誤之后亭饵,會看到一個異常堆棧,其中描述了錯誤的原因梁厉。注意對于連接器中的任何嚴重錯誤辜羊,都會拋出上述錯誤踏兜,因此可能會看到與序列化無關(guān)的錯誤。要快速定位錯誤是由哪個錯誤配置導(dǎo)致的八秃,可參考下表:

問題:使用JsonConverter讀取非JSON格式數(shù)據(jù)

如果源端主題上有非JSON格式的數(shù)據(jù)碱妆,但是使用JsonConverter進行讀取,就會看到:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

這可能是因為源端主題以Avro或其它格式序列化引起的昔驱。

解決方案:如果數(shù)據(jù)實際上是Avro格式疹尾,則需要按照如下方式修改Kafka連接器的接收端:

"value.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url": "http://schema-registry:8081",

或者,如果主題由Kafka連接器注入骤肛,也可以調(diào)整上游源端纳本,讓其輸出JSON格式數(shù)據(jù):

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter.schemas.enable": "false",

問題:使用AvroConverter讀取非Avro格式數(shù)據(jù)

這是最常見的錯誤,當嘗試使用AvroConverter從非Avro格式的主題讀取數(shù)據(jù)時腋颠,會發(fā)生這種情況繁成,還包括使用非Confluent模式注冊表的Avro序列化器寫入的數(shù)據(jù):

org.apache.kafka.connect.errors.DataException: my-topic-name

at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解決方案:檢查源端主題的序列化格式,調(diào)整Kafka連接器接收端使用正確的轉(zhuǎn)換器淑玫,或?qū)⑸嫌胃袷叫薷臑锳vro(這樣最好)巾腕。如果上游主題由Kafka連接器注入,也可以按如下方式配置源端連接器的轉(zhuǎn)換器:

"value.converter": "io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url": "http://schema-registry:8081",

問題:讀取沒有期望的schema/payload結(jié)構(gòu)的JSON數(shù)據(jù)

如前所述絮蒿,Kafka連接器支持包含有效內(nèi)容和模式的特殊JSON格式消息結(jié)構(gòu)祠墅,如果讀取的JSON數(shù)據(jù)不是這樣的結(jié)構(gòu),會有下面的錯誤:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

要知道歌径,對于schemas.enable=true唯一有效的JSON結(jié)構(gòu)是,schema和payload作為頂級元素(如上所示)亲茅。

正如錯誤消息本身所述回铛,如果只是簡單的JSON數(shù)據(jù),則應(yīng)將連接器的配置更改為:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter.schemas.enable": "false",

如果要在數(shù)據(jù)中包含模式克锣,要么切換到使用Avro(推薦)茵肃,要么配置上游的Kafka連接器以在消息中包含模式:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"value.converter.schemas.enable": "true",

#解決問題的提示

查看連接器工作節(jié)點的日志

要查看Kafka連接器的錯誤日志,需要定位到Kafka連接器工作節(jié)點的輸出袭祟。這個位置取決于Kafka連接器是如何啟動的验残,安裝Kafka連接器有好幾種方法,包括Docker巾乳、Confluent CLI您没、systemd和手動下載的壓縮包,然后工作節(jié)點的日志分別位于:

Docker:docker logs container_name胆绊;

Confluent CLI:confluent log connect氨鹏;

systemd:日志文件寫入/var/log/confluent/kafka-connect;

其它:默認情況下压状,Kafka連接器會將其輸出發(fā)送到stdout仆抵,因此可以在啟動Kafka連接器的終端會話中看到。

查看Kafka連接器的配置文件

要更改Kafka連接器工作節(jié)點的配置項(適用于所有運行的連接器),需要相應(yīng)地做出如下的修改:

Docker:配置環(huán)境變量镣丑,比如在Docker Compose中:

CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter

CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter

CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Confluent CLI:使用配置文件/etc/schema-registry/connect-avro-distributed.properties舔糖;

systemd(deb/rpm):使用配置文件/etc/kafka/connect-distributed.properties;

其它:啟動Kafka連接器時莺匠,可以指定工作節(jié)點的屬性文件金吗,例如:

$cdconfluent-5.0.0$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

檢查Kafka主題

假設(shè)遇到了上面提到過的錯誤,并且想要解決為什么Kafka連接器的接收端無法從主題中讀取數(shù)據(jù)慨蛙。

這時需要檢查正在讀取的主題的數(shù)據(jù)辽聊,并確認它采用了期望的序列化格式。另外期贫,要記住所有消息都必須采用這種格式跟匆,所以不要只是假設(shè)因為現(xiàn)在以正確的格式向主題發(fā)送消息,所以不會出現(xiàn)問題通砍。Kafka連接器和其它消費者也會讀取該主題的已有消息玛臂。

下面將使用命令行來描述如何進行故障排除,但還有一些其它工具也可以做:

Confluent控制中心有通過可視化的方式查看主題內(nèi)容的功能封孙,包括自動確定序列化格式迹冤;

KSQL的PRINT命令會將主題的內(nèi)容打印到控制臺,包括自動確定序列化格式虎忌;

Confluent CLI工具有consume命令泡徙,其可被用于讀取字符串和Avro格式的數(shù)據(jù)。

如果認為是字符串/JSON格式數(shù)據(jù)

可以使用控制臺工具膜蠢,包括kafkacat和kafka-console-consumer堪藐,以kafkacat為例:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1{"registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}

使用jq命令,還可以驗證和格式化JSON:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq'.'{"registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}

如果你看到了下面這樣的亂碼字符挑围,其很可能是二進制數(shù)據(jù)礁竞,比如通過Avro或Protobuf格式寫入就是這樣的:

$ kafkacat -b localhost:9092 -t users-avro -C -c1

????VUser_9Region_MALE

如果認為是Avro格式數(shù)據(jù)

需要使用專為讀取和反序列化Avro數(shù)據(jù)而設(shè)計的控制臺工具,這里會使用kafka-avro-console-consumer杉辙。先要確認指定了正確的模式注冊表URL:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092\--property schema.registry.url=http://localhost:8081\--topic users-avro\--from-beginning --max-messages1{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}

和之前一樣模捂,如果要對其格式化,可以通過管道輸出結(jié)果給jq:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092\--property schema.registry.url=http://localhost:8081\--topic users-avro\--from-beginning --max-messages1|\jq'.'{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}

#內(nèi)部轉(zhuǎn)換器

當運行在分布式模式時蜘矢,Kafka連接器使用Kafka本身來存儲有關(guān)其操作的元數(shù)據(jù)狂男,包括連接器配置,偏移量等品腹。

通過internal.key.converter/internal.value.converter配置項并淋,這些Kafka主題本身可以配置使用不同的轉(zhuǎn)換器。但是這些配置項只供內(nèi)部使用珍昨,實際上從Kafka 2.0版本開始就已被棄用县耽。不再需要修改這些句喷,如果還要修改這些配置項,從Kafka的2.0版本開始兔毙,將會收到警告唾琼。

#將模式應(yīng)用于沒有模式的消息

很多時候Kafka連接器會從已經(jīng)存在模式的地方引入數(shù)據(jù),這時只要保留該模式然后使用合適的序列化格式(例如Avro)澎剥,加上比如模式注冊表等提供的兼容性保證锡溯,該數(shù)據(jù)的所有下游用戶就都可以從可用的模式中受益。但是如果沒有明確的模式呢哑姚?

可能正使用FileSourceConnector從純文本文件中讀取數(shù)據(jù)(不建議用于生產(chǎn)祭饭,但通常用于PoC),或者可能正在使用REST連接器從REST端點提取數(shù)據(jù)叙量。由于這兩者以及其它的都沒有固有的模式倡蝙,因此需要進行聲明。

有時可能只是想從源端讀取字節(jié)然后將它們寫入一個主題上绞佩,但大多數(shù)情況下需要做正確的事情并應(yīng)用模式以便數(shù)據(jù)可以正確地處理寺鸥。作為數(shù)據(jù)提取的一部分處理一次,而不是將問題推送到每個消費者(可能是多個)品山,這是一個更好的做法胆建。

可以編寫自己的Kafka流式應(yīng)用以將模式應(yīng)用于Kafka主題中的數(shù)據(jù),但也可以使用KSQL肘交。這篇文章展示了如何對從REST端點提取的JSON數(shù)據(jù)執(zhí)行此操作笆载。下面會看一下將模式應(yīng)用于某些CSV數(shù)據(jù)的簡單示例,顯然是可以做到的涯呻。

假設(shè)有一個名為testdata-csv的Kafka主題凉驻,其中有一些CSV數(shù)據(jù),大致如下:

$ kafkacat -b localhost:9092 -t testdata-csv -C1,Rick Astley,Never Gonna Give You Up2,Johnny Cash,Ring of Fire

通過觀察魄懂,可以猜測它有三個字段,可能為:

ID闯第;

藝術(shù)家市栗;

歌曲。

如果將數(shù)據(jù)保留在這樣的主題中咳短,那么任何想要使用該數(shù)據(jù)的應(yīng)用程序(可能是Kafka連接器接收端填帽、定制的Kafka應(yīng)用或者其它),都需要每次猜測這個模式咙好〈垭纾或者更糟糕的是,每個消費端應(yīng)用的開發(fā)者都需要不斷向數(shù)據(jù)提供方確認模式及其任何變更勾效。正如Kafka解耦系統(tǒng)一樣嘹悼,這種模式依賴性迫使團隊之間存在硬性耦合叛甫,這并不是一件好事。

因此要做的只是使用KSQL將模式應(yīng)用于數(shù)據(jù)杨伙,并填充一個新的派生主題其监,其中保存模式。在KSQL中限匣,可以像下面這樣查看主題數(shù)據(jù):

ksql> PRINT 'testdata-csv' FROM BEGINNING;

Format:STRING

11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up

11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire

這里的前兩個字段(11/6/18 2:41:23 PM UTC和NULL)分別是Kafka消息的時間戳和鍵抖苦,而其余字段來自CSV文件。下面用KSQL注冊這個主題并聲明模式:

ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \

WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');

Message

----------------

Stream created

----------------

通過KSQL可以查看現(xiàn)在有一個數(shù)據(jù)流模式:

ksql> DESCRIBE TESTDATA_CSV;

Name? ? ? ? ? ? ? ? : TESTDATA_CSV

Field? | Type

-------------------------------------

ROWTIME | BIGINT (system)

ROWKEY? | VARCHAR(STRING) (system)

ID? ? ? | INTEGER

ARTIST? | VARCHAR(STRING)

SONG? ? | VARCHAR(STRING)

-------------------------------------

For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

通過查詢KSQL流確認數(shù)據(jù)是否符合預(yù)期米死。注意對于已有的Kafka主題锌历,此時只是作為Kafka的消費者,尚未更改或復(fù)制任何數(shù)據(jù)峦筒。

ksql> SET 'auto.offset.reset' = 'earliest';

Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;

1 | Rick Astley | Never Gonna Give You Up

2 | Johnny Cash | Ring of Fire

最后究西,創(chuàng)建一個新的Kafka主題,由具有模式的重新序列化的數(shù)據(jù)填充勘天。KSQL查詢是連續(xù)的怔揩,因此除了將任何已有的數(shù)據(jù)從源端主題發(fā)送到目標端主題之外,KSQL還將向主題發(fā)送任何未來的數(shù)據(jù)脯丝。

ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;

Message

----------------------------

Stream created and running

----------------------------

這時使用Avro格式的控制臺消費者對數(shù)據(jù)進行驗證:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --property schema.registry.url=http://localhost:8081 \

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --topic TESTDATA \

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? --from-beginning | \

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? jq '.'

{

? "ID": {

? ? "int": 1

},

? "ARTIST": {

? ? "string": "Rick Astley"

},

? "SONG": {

? ? "string": "Never Gonna Give You Up"

? }

}

[…]

甚至可以在模式注冊表中查看已經(jīng)注冊的模式:

$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'

{

? "type": "record",

? "name": "KsqlDataSourceSchema",

? "namespace": "io.confluent.ksql.avro_schemas",

? "fields": [

? ? {

? ? ? "name": "ID",

? ? ? "type": [

? ? ? ? "null",

? ? ? ? "int"

? ? ? ],

? ? ? "default": null

? ? },

? ? {

? ? ? "name": "ARTIST",

? ? ? "type": [

? ? ? ? "null",

? ? ? ? "string"

? ? ? ],

? ? ? "default": null

? ? },

? ? {

? ? ? "name": "SONG",

? ? ? "type": [

? ? ? ? "null",

? ? ? ? "string"

? ? ? ],

? ? ? "default": null

? ? }

? ]

}

任何寫入原始主題(testdata-csv)的新消息都由KSQL自動處理商膊,并寫入Avro格式的名為TESTDATA的新主題。現(xiàn)在宠进,任何想要使用此數(shù)據(jù)的應(yīng)用或團隊都可以簡單地處理TESTDATA主題晕拆,并利用聲明模式的Avro序列化數(shù)據(jù)。還可以使用此技術(shù)更改主題中的分區(qū)數(shù)材蹬,分區(qū)鍵和復(fù)制因子实幕。

#結(jié)論

Kafka連接器是一個非常簡單但功能強大的工具,可用于將其它系統(tǒng)與Kafka集成堤器,最常見的誤解是Kafka連接器提供的轉(zhuǎn)換器昆庇。之前已經(jīng)介紹過Kafka消息只是鍵/值對,了解應(yīng)該使用哪個序列化機制然后在Kafka連接器中對其進行標準化非常重要闸溃。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末整吆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子辉川,更是在濱河造成了極大的恐慌表蝙,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乓旗,死亡現(xiàn)場離奇詭異府蛇,居然都是意外死亡,警方通過查閱死者的電腦和手機屿愚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門汇跨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來务荆,“玉大人,你說我怎么就攤上這事扰法∮己” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵塞颁,是天一觀的道長浦箱。 經(jīng)常有香客問我诵棵,道長迫横,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任耕捞,我火速辦了婚禮伴网,結(jié)果婚禮上蓬推,老公的妹妹穿的比我還像新娘。我一直安慰自己澡腾,他們只是感情好沸伏,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著动分,像睡著了一般毅糟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上澜公,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天姆另,我揣著相機與錄音,去河邊找鬼坟乾。 笑死迹辐,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的甚侣。 我是一名探鬼主播明吩,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼殷费!你這毒婦竟也來了印荔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤宗兼,失蹤者是張志新(化名)和其女友劉穎躏鱼,沒想到半個月后氮采,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體殷绍,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年鹊漠,在試婚紗的時候發(fā)現(xiàn)自己被綠了主到。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茶行。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖登钥,靈堂內(nèi)的尸體忽然破棺而出畔师,到底是詐尸還是另有隱情,我是刑警寧澤牧牢,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布看锉,位于F島的核電站,受9級特大地震影響塔鳍,放射性物質(zhì)發(fā)生泄漏伯铣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一轮纫、第九天 我趴在偏房一處隱蔽的房頂上張望腔寡。 院中可真熱鬧,春花似錦掌唾、人聲如沸放前。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凭语。三九已至,卻和暖如春情连,著一層夾襖步出監(jiān)牢的瞬間叽粹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工却舀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留虫几,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓挽拔,卻偏偏與公主長得像辆脸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子螃诅,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353