概述
Kafka Connect 是一款可擴展并且可靠地在 Apache Kafka 和其他系統(tǒng)之間進行數(shù)據(jù)傳輸?shù)墓ぞ摺?可以很簡單的快速定義 connectors 將大量數(shù)據(jù)從 Kafka 移入和移出. Kafka Connect 可以攝取數(shù)據(jù)庫數(shù)據(jù)或者收集應用程序的 metrics 存儲到 Kafka topics姿鸿,使得數(shù)據(jù)可以用于低延遲的流處理渣磷。 一個導出的 job 可以將來自 Kafka topic 的數(shù)據(jù)傳輸?shù)蕉壌鎯啄螅糜谙到y(tǒng)查詢或者批量進行離線分析。
Kafka Connect 功能包括:
- Kafka connectors 通用框架: - Kafka Connect 將其他數(shù)據(jù)系統(tǒng)和Kafka集成標準化,簡化了 connector 的開發(fā),部署和管理
- 分布式和單機模式 - 可以擴展成一個集中式的管理服務五芝,也可以單機方便的開發(fā),測試和生產(chǎn)環(huán)境小型的部署氓英。
- REST 接口 - 通過易于使用的REST API提交和管理connectors到您的Kafka Connect集群
- offset 自動管理 - 只需要connectors 的一些信息严沥,Kafka Connect 可以自動管理offset 提交的過程缺猛,因此開發(fā)人員無需擔心開發(fā)中offset提交出錯的這部分垃帅。
- 分布式的并且可擴展 - Kafka Connect 構建在現(xiàn)有的 group 管理協(xié)議上延届。Kafka Connect 集群可以擴展添加更多的workers。
- 整合流處理/批處理 - 利用 Kafka 已有的功能挺智,Kafka Connect 是一個橋接stream 和批處理系統(tǒng)理想的方式鳍咱。
搭建kafka connect分布式集群
Kafka Connect 當前支持兩種執(zhí)行方式: 單機 (單個進程) 和 分布式.
分布式模式下會自動進行負載均衡恬砂,允許動態(tài)的擴縮容疙赠,并提供對 active task先誉,以及這個任務對應的配置和offset提交記錄的容錯凉夯。
#分布式
bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties文件配置參數(shù)可以查看官方文檔
配置connector
Connector 配置是簡單的key-value 映射的格式匿值。在分布式模式中骨宠,它們將被包含在創(chuàng)建(或修改)connector 的請求的JSON格式串中纵装。
rest api
由于Kafka Connect 旨在作為服務運行髓窜,它還提供了一個用于管理 connectors 的REST API扇苞。默認情況下,此服務在端口8083上運行寄纵。以下是當前支持的功能:
- GET /connectors - 返回一個活動的連接器的列表
- POST /connectors - 創(chuàng)建一個新的連接器鳖敷; 請求主體應為JSON對象,其中包含name字段和帶有連接器配置參數(shù)的對象config字段
- GET /connectors/{name} - 獲取有關特定連接器的信息
- GET /connectors/{name}/config
- PUT /connectors/{name}/config - 更新連接器的參數(shù)
- GET /connectors/{name}/status - 獲取連接器的當前狀態(tài)程拭,包括連接器是否正在運行定踱,發(fā)生故障,已暫停等恃鞋,將其分配給哪個工作器崖媚,如果連接器發(fā)生故障,則顯示錯誤信息恤浪,以及其所有任務的狀態(tài)
- GET /connectors/{name}/tasks
- GET /connectors/{name}/tasks/{taskid}/status
- PUT /connectors/{name}/pause - 暫停連接器及其任務畅哑,這將停止消息處理,直到恢復連接器為止
- PUT /connectors/{name}/resume - 恢復已暫停的連接器(如果連接器未暫停水由,則不執(zhí)行任何操作)
- POST /connectors/{name}/restart - 重新啟動連接器(通常是因為它失敗了)
- POST /connectors/{name}/tasks/{taskId}/restart
- DELETE /connectors/{name} - 刪除連接器荠呐,暫停所有任務并刪除其配置
Kafka Connect還提供用于獲取有關 connector plugin sss信息的REST API:
- GET /connector-plugins- 返回安裝在Kafka Connect集群中的連接器插件的列表。
- PUT /connector-plugins/{connector-type}/config/validate - 根據(jù)配置定義驗證提供的配置值砂客。
自定義connector
要在Kafka和另一個系統(tǒng)之間復制數(shù)據(jù)泥张,用戶會為想要 pull 數(shù)據(jù)或者 push 數(shù)據(jù)的系統(tǒng)創(chuàng)建一個connector。 connector 有兩類:SourceConnectors 從其他系統(tǒng)導入數(shù)據(jù)(e.g.JDBCSourceConnector 會將關系型數(shù)據(jù)庫導入到Kafka中)和SinkConnectors導出數(shù)據(jù)(e.g. HDFSSinkConnector會將Kafka topic 的內(nèi)容導出到 HDFS 文件)
Connectors 自身不執(zhí)行任何數(shù)據(jù)復制:Connector的配置描述要復制的數(shù)據(jù)鞭盟,并且Connector 負責負責將 job 分解為可分發(fā)給 worker 的一組 Tasks圾结。這些Tasks也分為兩類: SourceTask 和 SinkTask。
通過分配齿诉,每個Task 必須將數(shù)據(jù)的一部分子集復制到Kafka或者從Kafka復制筝野。在 Kafka Connect中晌姚,應該始終可以將這些分配的數(shù)據(jù)框架化為一組輸入和輸出流,這些流由具有一致結構的記錄組成歇竟。
開發(fā)一個 connector 只需要實現(xiàn)兩個接口, Connector 和 Task接口. 一個簡單的例子的源碼在Kafkafile package中挥唠。 connector 用于單機模式,并擁有 SourceConnector 和SourceTask實現(xiàn)來讀取一個文件的每行記錄焕议,并將其作為記錄發(fā)發(fā)送宝磨,SinkConnector的SinkTask將記錄寫入到文件。