? ? ? ?本篇文章描述了開發(fā)人員如何為Kafka Connect編寫新的connector。
核心概念與API
Connectors和Tasks
? ? ? ?Connector的實現(xiàn)類本身不執(zhí)行數(shù)據(jù)復(fù)制:它們的配置描述了要復(fù)制的數(shù)據(jù)集穆咐,connector負(fù)責(zé)將該作業(yè)分解成一組task颤诀,這些task可以分發(fā)給Kafka Connect worker字旭。Connector的實現(xiàn)類可以監(jiān)視外部系統(tǒng)的數(shù)據(jù)更改和請求task的重新配置。
? ? ? ?有了要復(fù)制的數(shù)據(jù)的分配信息崖叫,每個task需要將其數(shù)據(jù)子集復(fù)制到Kafka或從Kafka復(fù)制到別處遗淳。connector復(fù)制的數(shù)據(jù)要表示為分區(qū)的流,類似于Kafka topic心傀,其中每個分區(qū)是帶有偏移量的有序記錄序列屈暗。每個task處理其中的一些partition。有時這種劃分是很清晰的:一組log文件中的每個文件都被視為一個分區(qū)脂男,每一行被視為一條記錄养叛,偏移量就是這條記錄在文件中的位置。其他情況下宰翅,映射到這個模型可能需要更多工作:JDBC connector可以將每個表映射到一個分區(qū)弃甥,但是偏移量不是那么清晰。一種可能的映射是使用時間戳列生成查詢汁讼,以增量地返回新數(shù)據(jù)淆攻,最后查詢的時間戳用作偏移量。
Partitions和Recoeds
? ? ? ?每個分區(qū)都是key-value記錄的有序序列掉缺。key和value都可以具有復(fù)雜結(jié)構(gòu)卜录,由org.apache.kafka.connect中的數(shù)據(jù)結(jié)構(gòu)表示。許多基本類型以及arrays眶明、structs和嵌套數(shù)據(jù)結(jié)構(gòu)都是支持的艰毒。對于結(jié)構(gòu)化數(shù)據(jù),應(yīng)該使用Struct類搜囱。
? ? ? ?為了跟蹤分區(qū)中記錄的結(jié)構(gòu)和兼容性丑瞧,可以在每個記錄中包含schema。因為schema通常是基于數(shù)據(jù)源動態(tài)生成的蜀肘,包含一個SchemaBuilder類會使得構(gòu)建schema非常容易绊汹。
? ? ? ?這種運行時數(shù)據(jù)格式不采用任何特定的序列化格式,其轉(zhuǎn)換由Converter的實現(xiàn)指定扮宠,它將org.apache.kafka.connect.data運行時格式和序列化數(shù)據(jù)表示為byte[]西乖。Connector開發(fā)人員不需要關(guān)心轉(zhuǎn)換細(xì)節(jié)。
? ? ? ?除了key和value之外坛增,record還有partition ID和offset获雕。框架使用它們定期提交已處理數(shù)據(jù)的偏移量收捣。在失敗的情況下届案,可以從最后提交的偏移量恢復(fù)處理,避免不必要的重復(fù)事件罢艾。
動態(tài)connectors
? ? ? ?并不是所有連接器都有靜態(tài)的partition楣颠,因此connector還負(fù)責(zé)監(jiān)視外部系統(tǒng)中的任何更改尽纽。例如,在JDBCSourceConnector中童漩,Connector將一組table中的每個table都分配一個task弄贿。當(dāng)一個新表創(chuàng)建時,Connector可以發(fā)現(xiàn)它并通過更新配置為其分配一個新的task睁冬。
開發(fā)一個簡單的Connector
? ? ? ?開發(fā)一個connector只需要實現(xiàn)兩個接口挎春,Connector和Task看疙。Kafka Connect源代碼中包含了一個簡單的connector示例豆拨,可以從文件中讀寫行。SourceConnector/SourceTask類實現(xiàn)從文件中讀取行的source connector能庆,SinkConnector/SinkTask實現(xiàn)將每個記錄寫入文件的sink connector施禾。
Connector Example
? ? ? ?首先創(chuàng)建從SourceConnector繼承的類,并添加兩個字段來存儲已解析的配置信息(要從中讀取的文件名和要向其發(fā)送數(shù)據(jù)的主題):
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
? ? ? ?最簡單的方法是getTaskClass()搁胆,它定義了應(yīng)該在worker進程中實例化的類弥搞,以實際讀取數(shù)據(jù):
@Override
public Class<? extends Task> getTaskClass() {
return FileStreamSourceTask.class;
}
? ? ? ?我們將在下面定義FileStreamSourceTask類。接下來渠旁,添加一些標(biāo)準(zhǔn)的生命周期方法start()和stop():
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required
}
? ? ? ?最后攀例,實現(xiàn)的真正核心是在taskConfigs()中。在這種情況下顾腊,我們只處理一個文件粤铭,所以即使我們可能被允許按照maxTasks參數(shù)生成更多的任務(wù),我們返回一個只有一個條目的列表:
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input partition makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
? ? ? ?即使有多個task杂靶,這個方法的實現(xiàn)通常也非常簡單梆惯。它只需要確定輸入task的數(shù)量,這需要與抽取數(shù)據(jù)的遠(yuǎn)程服務(wù)進行通信吗垮,將它們進行分配垛吗。由于在task間分配工作的一些模式非常常見,ConnectorUtils中提供了一些使用的工具來簡化這些情況烁登。
? ? ? ?注意怯屉,這個簡單示例不包含動態(tài)輸入。有關(guān)如何觸發(fā)對task配置的更新饵沧,請參見下一節(jié)討論锨络。
Task示例-Source Task
? ? ? ?接下來,我們來描述SourceTask的實現(xiàn)捷泞。這個類很小足删,但是對于這篇指導(dǎo)來說太長了。我們只提供部分的實現(xiàn)代碼锁右。
? ? ? ?與connector一樣失受,我們需要創(chuàng)建從適當(dāng)?shù)腡ask類繼承的類讶泰。它也有一些標(biāo)準(zhǔn)的生命周期方法:
public class FileStreamSourceTask extends SourceTask {
private String filename;
private InputStream stream;
private String topic;
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close()
}
? ? ? ?這些是稍微簡化的版本,但是顯示這些方法應(yīng)該相對簡單拂到,它們執(zhí)行的唯一工作是分配或釋放資源痪署。關(guān)于此實現(xiàn),有兩點需要注意兄旬。首先狼犯,start()方法還沒有處理從上一個偏移量恢復(fù),這將在后面的部分中討論领铐。其次悯森,stop()方法是同步的。這是必要的绪撵,因為SourceTasks被賦予了一個專用線程瓢姻,它們可以無限期地阻塞該線程,因此需要使用來自工作程序中另一個線程的調(diào)用來停止它們音诈。
? ? ? ?接下來幻碱,我們實現(xiàn)任務(wù)的主要功能:poll()方法,它從輸入系統(tǒng)獲取記錄并返回一個List<SourceRecord>:
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map sourcePartition = Collections.singletonMap("filename", filename);
Map sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
? ? ? ?同樣细溅,我們忽略了一些細(xì)節(jié)褥傍,但是我們可以看到一些重要的步驟:poll()方法將被重復(fù)調(diào)用,對于每個調(diào)用喇聊,它將循環(huán)嘗試從文件中讀取記錄恍风。對于讀取的每一行,它還跟蹤文件偏移量承疲。它使用這些信息來創(chuàng)建一個輸出SourceRecord,其包含四條信息:源partition(只有一個,單文件讀取),源offset(在文件中的位置),輸出topic名稱和輸出值(包含schema邻耕,指示其為一個String)。SourceRecord構(gòu)造函數(shù)的其他變體還可以包括特定的輸出分區(qū)和鍵燕鸽。
? ? ? ?注意兄世,此實現(xiàn)使用普通的Java InputStream接口,如果數(shù)據(jù)不可用啊研,則可能休眠御滩。這是可以接受的,因為Kafka Connect為每個任務(wù)提供了一個專用線程党远。雖然任務(wù)實現(xiàn)必須符合基本的poll()接口削解,但它們在如何實現(xiàn)方面有很大的靈活性。在這種情況下沟娱,基于nio的實現(xiàn)會更有效氛驮,但是這種簡單的方法可以工作,實現(xiàn)起來很快济似,并且與舊版本的Java兼容矫废。
? ? ? ?雖然在示例中沒有使用盏缤,但SourceTask還提供了兩個api來在源系統(tǒng)中提交偏移量:commit()和commitRecord()。這些api是為具有消息確認(rèn)機制的源系統(tǒng)提供的蓖扑。覆蓋這些方法允許源連接器在將消息寫入Kafka之后唉铜,在源系統(tǒng)中確認(rèn)消息,無論是批量的還是單獨的律杠。
? ? ? ?commit() API將偏移量存儲在源系統(tǒng)中潭流,直到poll()返回偏移量為止。此API的實現(xiàn)應(yīng)該阻塞柜去,直到提交完成灰嫉。commitRecord() API在將每個源記錄寫入Kafka之后,在源系統(tǒng)中為每個源記錄保存偏移量诡蜓。由于Kafka Connect將自動記錄偏移量熬甫,因此不需要SourceTask來實現(xiàn)它們。在連接器確實需要確認(rèn)源系統(tǒng)中的消息的情況下蔓罚,通常只需要其中一個api。
Sink Tasks
? ? ? ?SourceTask和SinkTask有非常不同的接口瞻颂,因為SourceTask使用pull接口豺谈,而SinkTask使用push接口。兩者都有相同的生命周期方法贡这,但是SinkTask接口是完全不同的:
public abstract class SinkTask implements Task {
... [ lifecycle methods omitted ] ...
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public abstract void flush(Map<TopicPartition, Long> offsets);
public void open(Collection<TopicPartition> partitions) {}
public void close(Collection<TopicPartition> partitions) {}
}
? ? ? ?put()方法應(yīng)該包含大部分實現(xiàn)茬末、接受一組SinkRecords、執(zhí)行任何所需的轉(zhuǎn)換并將它們存儲在目標(biāo)系統(tǒng)中.此流程不需要在返回之前確保數(shù)據(jù)已完全寫入目標(biāo)系統(tǒng)盖矫。事實上丽惭,在許多情況下,一些內(nèi)部緩沖是有用的辈双,這樣可以一次發(fā)送整批記錄(很像Kafka的生產(chǎn)者)责掏,從而減少向下游數(shù)據(jù)存儲插入事件的開銷。SinkRecords基本上包含與源記錄相同的信息:Kafka主題湃望、分區(qū)和偏移量以及事件鍵和值换衬。
? ? ? ?flush()方法在偏移量提交過程中使用,它允許任務(wù)從失敗中恢復(fù)证芭,并從安全點恢復(fù)瞳浦,這樣就不會錯過任何事件。該方法應(yīng)該將任何未完成的數(shù)據(jù)推送到目標(biāo)系統(tǒng)废士,然后阻塞叫潦,直到確認(rèn)寫入。偏移量參數(shù)通彻傧酰可以忽略矗蕊,但在實現(xiàn)希望在目標(biāo)存儲中存儲偏移量信息以提供精確的一次交付的某些情況下四敞,該參數(shù)非常有用。例如拔妥,HDFS連接器可以這樣做忿危,并使用原子移動操作來確保flush()操作以原子方式將數(shù)據(jù)和偏移量提交到HDFS中的最終位置。
? ? ? ?在內(nèi)部没龙,SinkTask使用Kafka消費者來輪詢數(shù)據(jù)铺厨。connector的task中使用的consumer實例屬于同一consumer組。任務(wù)重新配置或失敗將觸發(fā)使用者組的重新平衡硬纤。在重新平衡期間解滓,主題分區(qū)將被重新分配到新的任務(wù)集。有關(guān)kafka消費者再平衡的更多解釋筝家,請參見消費者部分洼裤。
? ? ? ?請注意,由于consumer是單線程的溪王,應(yīng)該確保put()或flush()花費的時間不會超過使用者會話超時時間腮鞍。否則,consumer會被踢出組莹菱,從而觸發(fā)分區(qū)的rebalance移国,組織所有其他task在rebalance完成之前取得進展。
? ? ? ?為了確保在重新平衡期間正確地釋放和分配資源道伟,SinkTask提供了兩個額外的方法:close()和open()迹缀,它們與驅(qū)動SinkTask的KafkaConsumer的底層再平衡回調(diào)相關(guān)聯(lián)。
? ? ? ?close()方法用于關(guān)閉分配給SinkTask的分區(qū)的writers蜜徽。此方法將在使用者重新平衡操作啟動之前和SinkTask停止獲取數(shù)據(jù)之后調(diào)用祝懂。在關(guān)閉之后,Connect將不會向任務(wù)寫入任何記錄拘鞋,直到打開了一組新的分區(qū)砚蓬。close()方法可以在重新平衡開始之前訪問分配給SinkTask的所有主題分區(qū)。通常掐禁,我們建議關(guān)閉所有主題分區(qū)的writers怜械,并確保所有主題分區(qū)的狀態(tài)都得到了正確維護。然而傅事,你可以在實現(xiàn)中關(guān)閉topic partition的子集的writer缕允。
? ? ? ?open()方法用于在使用者重新平衡時為新分配的分區(qū)創(chuàng)建writers。此方法將在分區(qū)重新分配完成后以及SinkTask開始獲取數(shù)據(jù)之前調(diào)用蹭越。
? ? ? ?注意障本,從close()或open()引發(fā)的任何錯誤都將導(dǎo)致任務(wù)停止,報告失敗狀態(tài),并關(guān)閉相應(yīng)的consumer實例驾霜。此consumer關(guān)閉觸發(fā)重新平衡案训,此任務(wù)的主題分區(qū)將被重新分配到此connector的其他任務(wù)。
從之前的偏移量恢復(fù)
? ? ? ?SourceTask實現(xiàn)包含每條record的分區(qū)ID(輸入文件名)和偏移量(文件中的位置)粪糙∏况框架會定期的提交偏移量,這樣,在失敗的情況下,任務(wù)可以恢復(fù)和減少事件的數(shù)量再加工蓉冈。這個提交過程由框架完全自動化城舞,但是只有connector知道如何在輸入中查找要從該位置恢復(fù)的正確位置。
? ? ? ?要在啟動時正確恢復(fù)寞酿,任務(wù)可以使用傳遞到其initialize()方法中的SourceContext來訪問偏移量數(shù)據(jù)家夺。在initialize()中,我們將添加更多的代碼來讀取偏移量(如果它存在)伐弹,并尋找到那個位置:
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Long lastRecordedOffset = (Long) offset.get("position");
if (lastRecordedOffset != null)
seekToOffset(stream, lastRecordedOffset);
}
? ? ? ?關(guān)于此實現(xiàn)拉馋,有兩點需要注意。首先惨好,這個連接器的偏移量是long煌茴,這是一種基本類型。然而昧狮,包括映射和列表在內(nèi)的更復(fù)雜的結(jié)構(gòu)也可以用作偏移量景馁。其次,返回的數(shù)據(jù)是“無模式的”逗鸣。這是必要的,因為不能保證序列化偏移量的底層轉(zhuǎn)換器能夠跟蹤模式绰精。這使得可靠的偏移量解析對于連接器開發(fā)人員來說更加具有挑戰(zhàn)性撒璧,但是使得序列化格式的選擇更加靈活。
動態(tài)輸入/輸出分區(qū)
? ? ? ?Kafka Connect旨在定義批量數(shù)據(jù)復(fù)制作業(yè)笨使,例如復(fù)制整個數(shù)據(jù)庫卿樱,而不是創(chuàng)建多個作業(yè)來單獨復(fù)制每個表。這種設(shè)計的一個結(jié)果是連接器的輸入或輸出分區(qū)集可能隨時間而變化硫椰。
? ? ? ?源連接器需要監(jiān)視源系統(tǒng)中的更改繁调,例如數(shù)據(jù)庫中的表添加/刪除。當(dāng)它們接收到更改時靶草,它們應(yīng)該通過ConnectorContext對象通知框架需要重新配置蹄胰。例如,在SourceConnector中:
if (inputsChanged()) {
this.context.requestTaskReconfiguration();
}
? ? ? ?框架將迅速請求新的配置信息并更新任務(wù)奕翔,允許它們在重新配置之前優(yōu)雅地提交進度裕寨。注意,在SourceConnector中,這種監(jiān)視目前由connector實現(xiàn)來完成宾袜。如果執(zhí)行此監(jiān)視需要一個額外的線程捻艳,connector必須自己分配它。
? ? ? ?理想情況下庆猫,用于監(jiān)視變更的代碼應(yīng)該被connector隔離认轨,task不需要擔(dān)心它們。然而月培,變更也會影響到task嘁字,最常見的情況是當(dāng)其中一個輸入partition在輸入系統(tǒng)中被銷毀時,例如节视,一個表從數(shù)據(jù)庫中刪除拳锚。如果task在Connector之前遇到了問題,則task需要處理隨后的錯誤寻行。幸運的是霍掺,這通常可以通過捕獲和處理適當(dāng)?shù)漠惓硖幚怼?br>
? ? ? ?SinkConnector通常只需要處理添加的partition拌蜘,這些partition可能會轉(zhuǎn)換為其輸出中的新條目杆烁。Kafka Connect框架管理Kafka輸入的任何更改。SinkTask管理新的輸入partition简卧,這可能需要再下游系統(tǒng)中創(chuàng)建新的資源兔魂,例如數(shù)據(jù)庫中的新表。這種情況下举娩,最棘手的情況是多個SinkTask在看到新的輸入partition時析校,同時嘗試創(chuàng)建新資源,并發(fā)生沖突铜涉。
處理Schema
? ? ? ?FileStream連接器是很好的例子智玻,因為它們很簡單,但是它們也有一些結(jié)構(gòu)簡單的數(shù)據(jù)——每行只是一個字符串芙代。幾乎所有連接器都需要具有更復(fù)雜數(shù)據(jù)格式的模式吊奢。
? ? ? ?要創(chuàng)建更復(fù)雜的數(shù)據(jù),您需要使用org.apache.kafka.connect API纹烹。除了基本類型之外页滚,大多數(shù)結(jié)構(gòu)化記錄還需要與兩個類交互:Schema和Struct。
? ? ? ?API文檔提供了完整的參考铺呵,下面是一個創(chuàng)建Schema和Struct的簡單示例:
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75)
.build();
? ? ? ?如果您正在實現(xiàn)source connector裹驰,則需要決定何時以及如何創(chuàng)建schema。如果可能的話陪蜻,應(yīng)該避免重新計算它們邦马。例如,如果connector保證有固定的schema,那么靜態(tài)地創(chuàng)建它并重用單個實例滋将。
? ? ? ?然而邻悬,許多connector將具有動態(tài)模式。其中一個例子是數(shù)據(jù)庫連接器随闽。即使只考慮單個表父丰,在連接器的生命周期中也不會為單個表固定模式,因為用戶可能會執(zhí)行ALTER table命令掘宪。連接器必須能夠檢測這些更改蛾扇,并通過創(chuàng)建更新的模式作出適當(dāng)?shù)捻憫?yīng)。
? ? ? ?接收器連接器通常更簡單魏滚,因為它們正在使用數(shù)據(jù)镀首,因此不需要創(chuàng)建模式。但是鼠次,他們應(yīng)該同樣小心地驗證接收到的模式是否具有預(yù)期的格式更哄。當(dāng)模式不匹配時——通常表示上游生產(chǎn)者正在生成無法正確轉(zhuǎn)換到目標(biāo)系統(tǒng)的無效數(shù)據(jù)——接收器連接器應(yīng)該拋出異常,將此錯誤指示給Kafka Connect框架腥寇。
? ? ? ?在使用Confluent平臺中包含的AvroConverter時成翩,模式在Confluent的schema registry下注冊,因此任何新模式都必須滿足目標(biāo)主題的兼容性要求赦役。
Schema進化
? ? ? ?Kafka Connect在SchemaProjector中提供了schema的兼容模式以及不兼容時如何拋出異常麻敌。SchemaProjector的使用很簡單。下面的示例展示了如何將sourceStruct從版本2的源模式投射到版本3的目標(biāo)模式掂摔,這將添加一個具有默認(rèn)值的字段术羔。由于這兩個模式是兼容的,我們看到targetStruct有兩個字段乙漓,field2填充123聂示,這是該字段的默認(rèn)值。
Schema source = SchemaBuilder.struct()
.version(2)
.field("field", Schema.INT32_SCHEMA)
.build();
Struct sourceStruct = new Struct(source);
sourceStruct.put("field", 1);
Schema target = SchemaBuilder.struct()
.version(3)
.field("field", Schema.INT32_SCHEMA)
.field("field2", SchemaBuilder.int32().defaultValue(123).build())
.build();
Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
? ? ? ?該實用程序?qū)τ谛枰幚砟J窖莼途S護模式兼容性的連接器非常有用簇秒。例如,如果我們希望HDFS連接器保持向后兼容性秀鞭,因為每個文件只能有一個模式趋观,那么我們需要在將消息寫入HDFS之前,將具有舊模式的消息投影到連接器看到的最新模式锋边。這確保寫入HDFS的最新文件將具有可用于查詢整個數(shù)據(jù)的最新模式皱坛,從而保持向后兼容性。