使用kafka connect镀琉,將數(shù)據(jù)批量寫到hdfs完整過程

版權聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉載

本文是基于hadoop 2.7.1院水,以及kafka 0.11.0.0腊徙。kafka-connect是以單節(jié)點模式運行,即standalone檬某。


一. 首先撬腾,先對kafka和kafka connect做一個簡單的介紹

kafka:Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網(wǎng)站中的所有動作流數(shù)據(jù)恢恼。比較直觀的解釋就是其有一個生產(chǎn)者(producer)和一個消費者(consumer)民傻。可以將kafka想象成一個數(shù)據(jù)容器场斑,生產(chǎn)者負責發(fā)送數(shù)據(jù)到這個容器中漓踢,而消費者從容器中取出數(shù)據(jù),在將數(shù)據(jù)做處理漏隐,如存儲到hdfs喧半。

kafka connect:Kafka Connect是一種用于在Kafka和其他系統(tǒng)之間可擴展的、可靠的流式傳輸數(shù)據(jù)的工具青责。它使得能夠快速定義將大量數(shù)據(jù)集合移入和移出Kafka的連接器變得簡單挺据。即適合批量數(shù)據(jù)導入導出操作。


二. 下面將介紹如何用kafka connect將數(shù)據(jù)寫入到hdfs中脖隶。包括在這個過程中可能碰到的一些問題說明扁耐。

首先啟動kafka-connect:

bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties

這個命令后面兩個參數(shù),

  第一個是指定啟動的模式浩村,有分布式和單節(jié)點兩種,這里是單節(jié)點占哟。kafka自帶心墅,放于config目錄下。

  第二個參數(shù)指向描述connector的屬性的文件榨乎,可以有多個怎燥,這里只有一個connector用來寫入到hdfs。需要自己創(chuàng)建蜜暑。

接下來看看connector1.properties的內(nèi)容铐姚,

name="test" #該connector的名字

#將自己按connect接口規(guī)范編寫的代碼打包后放在kafka/libs目錄下,再根據(jù)項目結構引用對應

connector connector.class=hdfs.HdfsSinkConnector

#Task是導入導出的具體實現(xiàn),這里是指定多少個task來并行運行導入導出作業(yè)隐绵,由多線程實現(xiàn)之众。由于hdfs中一個文件每次只能又一個文件操作,所以這里只能是1

tasks.max=1

#指定從哪個topic讀取數(shù)據(jù)依许,這些其實是用來在connector或者task的代碼中讀取的棺禾。 topics=test #指定key以那種方式轉換,需和Producer發(fā)送方指定的序列化方式一致 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.json.JsonConverter #同上

hdfs.url=hdfs://127.0.0.1:9000  #hdfs的url路徑峭跳,在Connector中會被讀取 hdfs.path=/test/file  #hdfs文件路徑膘婶,同樣Connector中被讀取

key.converter.schemas.enable=true  #稍后介紹,可以true也可以false蛀醉,影響傳輸格式 value.converter.schemas.enable=true  #稍后介紹悬襟,可以true也可以false

三. 接下來看代碼,connect主要是導入導出兩個概念拯刁,導入是source脊岳,導出時Sink。這里只使用Sink筛璧,不過Source和Sink的實現(xiàn)其實基本相同逸绎。

實現(xiàn)Sink其實不難,實現(xiàn)對應的接口夭谤,即SinkConnector和SinkTask兩個接口棺牧,再打包放到kafka/libs目錄下即可。其中SinkConnector只有一個朗儒,而Task可以有多

先是Connector

publicclassHdfsSinkConnectorextends SinkConnector {

? ? //這兩項為配置hdfs的urlh和路徑的配置項颊乘,需要在connector1.properties中指定publicstaticfinalString HDFS_URL = "hdfs.url";

? ? publicstaticfinalString HDFS_PATH = "hdfs.path";

? ? privatestaticfinalConfigDef CONFIG_DEF =new ConfigDef()

? ? ? ? ? ? .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")

? ? ? ? ? ? .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");

? ? private String hdfsUrl;

? ? private String hdfsPath;

? ? @Override

? ? public String version() {

? ? ? ? returnAppInfoParser.getVersion();? ? }

//start方法會再初始的時候執(zhí)行一次,這里主要用于配置? ? @Override

publicvoidstart(Map props) {

? ? ? ? hdfsUrl = props.get(HDFS_URL);

? ? ? ? hdfsPath = props.get(HDFS_PATH);

? ? }

  //這里指定了Task的類

? ? @Override

? ? publicClass taskClass() {

? ? ? ? returnHdfsSinkTask.class;

? ? }

  //用于配置Task的config醉锄,這些都是會在Task中用到

? ? @Override

? ? publicList> taskConfigs(int maxTasks) {

? ? ? ? ArrayList> configs =newArrayList<>();

? ? ? ? for(inti = 0; i < maxTasks; i++) {

? ? ? ? ? ? Map config =newHashMap<>();

? ? ? ? ? ? if(hdfsUrl !=null)

? ? ? ? ? ? ? ? config.put(HDFS_URL, hdfsUrl);

? ? ? ? ? ? if(hdfsPath !=null)

? ? ? ? ? ? ? ? config.put(HDFS_PATH, hdfsPath);

? ? ? ? ? ? configs.add(config);

? ? ? ? }

? ? ? ? return configs;

? ? }

  //關閉時的操作乏悄,一般是關閉資源。

? ? @Override

? ? publicvoid stop() {

? ? ? ? // Nothing to do since FileStreamSinkConnector has no background monitoring.? ? }

? ? @Override

? ? public ConfigDef config() {

? ? ? ? return CONFIG_DEF;

? ? }

}

接下來是Task

publicclassHdfsSinkTaskextends SinkTask {

? ? privatestaticfinalLogger log = LoggerFactory.getLogger(HdfsSinkTask.class);

? ? private String filename;

? ? publicstatic String hdfsUrl;

? ? publicstatic String hdfsPath;

? ? private Configuration conf;

? ? private FSDataOutputStream os;

? ? private FileSystem hdfs;

? ? public HdfsSinkTask(){

? ? }

? ? @Override

? ? public String version() {

? ? ? ? returnnew HdfsSinkConnector().version();

? ? }

  //Task開始會執(zhí)行的代碼恳不,可能有多個Task檩小,所以每個Task都會執(zhí)行一次

? ? @Override

? ? publicvoidstart(Map props) {

? ? ? ? hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);

? ? ? ? hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);

? ? ? ? System.out.println("----------------------------------- start--------------------------------");

? ? ? ? conf =new Configuration();conf.set("fs.defaultFS", hdfsUrl);

? ? ? ? //這兩個是與hdfs append相關的設置conf.setBoolean("dfs.support.append",true);

? ? ? ? conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");

? ? ? ? try{

? ? ? ? ? ? hdfs = FileSystem.get(conf);//? ? ? ? ? ? connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);os = hdfs.append(new Path(hdfsPath));

? ? ? ? }catch (IOException e){

? ? ? ? ? ? System.out.println(e.toString());

? ? ? ? }

? ? }

  //核心操作,put就是將數(shù)據(jù)從kafka中取出烟勋,存放到其他地方去

? ? @Override

? ? publicvoidput(Collection sinkRecords) {

? ? ? ? for (SinkRecord record : sinkRecords) {

? ? ? ? ? ? log.trace("Writing line to {}: {}", logFilename(), record.value());

? ? ? ? ? ? try{

? ? ? ? ? ? ? ? System.out.println("write info------------------------" + record.value().toString() + "-----------------");

? ? ? ? ? ? ? ? os.write((record.value().toString()).getBytes("UTF-8"));

? ? ? ? ? ? ? ? os.hsync();

? ? ? ? ? ? }catch(Exception e){

? ? ? ? ? ? ? ? System.out.print(e.toString());

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? @Override

? ? publicvoidflush(Map offsets) {

? ? ? ? try{

? ? ? ? ? ? os.hsync();

? ? ? ? }catch(Exception e){? ? ? ? ? ? System.out.print(e.toString());? ? ? ? }? ? }

//同樣是結束時候所執(zhí)行的代碼规求,這里用于關閉hdfs資源? ? @Override

publicvoid stop() {

? ? ? ? try {

? ? ? ? ? ? os.close();

? ? ? ? }catch(IOException e){

? ? ? ? ? ? System.out.println(e.toString());

? ? ? ? }

? ? }

? ? private String logFilename() {

? ? ? ? returnfilename ==null? "stdout" : filename;

? ? }

}

這里重點提一下,因為在connector1.propertise中設置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter卵惦,所以不能用命令行形式的

producer發(fā)送數(shù)據(jù)阻肿,而是要用程序的方式,并且在producer總也要設置key的序列化形式為org.apache.kafka.common.serialization.ByteArraySerializer沮尿。

編碼完成丛塌,先用idea以開發(fā)程序與依賴包分離的形式打包成jar包,然后將程序對應的jar包(一般就是“項目名.jar”)放到kafka/libs目錄下面,這樣就能被找到赴邻。

四. 接下來對這個過程的問題做一個匯總印衔。

1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。

這個選項默認在connect-standalone.properties中是true的乍楚,這個時候發(fā)送給topic的Json格式是需要使用avro格式当编,具體情況可以百度,這里給出一個樣例徒溪。

{

? ? "schema": {

? ? ? ? "type": "struct",

? ? ? ? "fields": [{

? ? ? ? ? ? "type": "int32",

? ? ? ? ? ? "optional": true,

? ? ? ? ? ? "field": "c1"

? ? ? ? }, {

? ? ? ? ? ? "type": "string",

? ? ? ? ? ? "optional": true,

? ? ? ? ? ? "field": "c2"

? ? ? ? }, {

? ? ? ? ? ? "type": "int64",

? ? ? ? ? ? "optional": false,

? ? ? ? ? ? "name": "org.apache.kafka.connect.data.Timestamp",

? ? ? ? ? ? "version": 1,

? ? ? ? ? ? "field": "create_ts"

? ? ? ? }, {

? ? ? ? ? ? "type": "int64",

? ? ? ? ? ? "optional": false,

? ? ? ? ? ? "name": "org.apache.kafka.connect.data.Timestamp",

? ? ? ? ? ? "version": 1,

? ? ? ? ? ? "field": "update_ts"

? ? ? ? }],

? ? ? ? "optional": false,

? ? ? ? "name": "foobar"

? ? },

? ? "payload": {

? ? ? ? "c1": 10000,

? ? ? ? "c2": "bar",

? ? ? ? "create_ts": 1501834166000,

? ? ? ? "update_ts": 1501834166000

? ? }

}?

主要就是schema和payload這兩個忿偷,不按照這個格式會報錯如下

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

? at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

如果想發(fā)送普通的json格式而不是avro格式的話,很簡單key.converter.schemas.enable和value.converter.schemas.enable設置為false就行臊泌。這樣就能發(fā)送普通的json格式數(shù)據(jù)鲤桥。

2.在啟動的過程中出現(xiàn)各種各樣的java.lang.ClassNotFoundException。

在啟動connector的時候渠概,一開始總是會報各個各樣的ClassNotFoundException茶凳,不是這個包就是那個包,查找問題一直說要么缺少包要么是包沖突播揪。這個是什么原因呢贮喧?

其實歸根結底還是依賴沖突的問題,因為kafka程序自定義的類加載器加載類的目錄是在kafka/libs中猪狈,而寫到hdfs需要hadoop的包箱沦。

我一開始的做法是將hadoop下的包路徑添加到CLASSPATH中,這樣子問題就來了雇庙,因為kafka和hadoop的依賴包是有沖突的谓形,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar疆前,兩個jar包版本不同寒跳,而我們是在kafka程序中調用hdfs,所以當jar包沖突時應該優(yōu)先調用kafka的竹椒。但是注意kafka用的是程序自定義的類加載器童太,其優(yōu)先級是低于CLASSPATH路徑下的類的,就是說加載類時會優(yōu)先加載CLASSPATH下的類胸完。這樣子就有問題了书释。

我的解決方案時將kafka和hadoop加載的jar包路徑都添加到CLASSPATH中,并且kafka的路徑寫在hadoop前面舶吗,這樣就可以啟動connector成功征冷。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末择膝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌腹侣,老刑警劉巖叔收,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異傲隶,居然都是意外死亡饺律,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門跺株,熙熙樓的掌柜王于貴愁眉苦臉地迎上來复濒,“玉大人,你說我怎么就攤上這事乒省∏删保” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵袖扛,是天一觀的道長砸泛。 經(jīng)常有香客問我,道長蛆封,這世上最難降的妖魔是什么唇礁? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮惨篱,結果婚禮上盏筐,老公的妹妹穿的比我還像新娘。我一直安慰自己妒蛇,他們只是感情好机断,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著绣夺,像睡著了一般吏奸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上陶耍,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天奋蔚,我揣著相機與錄音,去河邊找鬼烈钞。 笑死泊碑,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的毯欣。 我是一名探鬼主播馒过,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼酗钞!你這毒婦竟也來了腹忽?” 一聲冷哼從身側響起来累,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窘奏,沒想到半個月后嘹锁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡着裹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年领猾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骇扇。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡摔竿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出少孝,到底是詐尸還是另有隱情拯坟,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布韭山,位于F島的核電站郁季,受9級特大地震影響,放射性物質發(fā)生泄漏钱磅。R本人自食惡果不足惜梦裂,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望盖淡。 院中可真熱鬧年柠,春花似錦、人聲如沸褪迟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽味赃。三九已至掀抹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間心俗,已是汗流浹背傲武。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留城榛,地道東北人揪利。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像狠持,于是被迫代替她去往敵國和親疟位。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理喘垂,服務發(fā)現(xiàn)甜刻,斷路器敢订,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,810評論 4 54
  • =========================================================...
    lavor閱讀 3,484評論 0 5
  • Kafka官網(wǎng):http://kafka.apache.org/入門1.1 介紹Kafka? 是一個分布式流處理系...
    it_zzy閱讀 3,877評論 3 53
  • 秋雨涼入心意亂情何處 飽受它非議只待寒雪中
    高興旺閱讀 118評論 0 0