使用Flink實(shí)現(xiàn)數(shù)據(jù)到ElasticSearch

使用Flink處理數(shù)據(jù)時(shí)象迎,可以基于Flink提供的批式處理(Batch Processing)和流式處理(Streaming Processing)API來(lái)實(shí)現(xiàn)汤徽,分別能夠滿(mǎn)足不同場(chǎng)景下應(yīng)用數(shù)據(jù)的處理。這兩種模式下辐董,輸入處理都被抽象為Source Operator雾棺,包含對(duì)應(yīng)輸入數(shù)據(jù)的處理邏輯;輸出處理都被抽象為Sink Operator层释,包含了對(duì)應(yīng)輸出數(shù)據(jù)的處理邏輯。這里快集,我們只關(guān)注輸出的Sink Operator實(shí)現(xiàn)贡羔。

Flink批式處理模式,運(yùn)行Flink Batch Job時(shí)作用在有界的輸入數(shù)據(jù)集上碍讨,所以Job運(yùn)行的時(shí)間是有時(shí)限的治力,一旦Job運(yùn)行完成蒙秒,對(duì)應(yīng)的整個(gè)數(shù)據(jù)處理應(yīng)用就已經(jīng)結(jié)束勃黍,比如,輸入是一個(gè)數(shù)據(jù)文件晕讲,或者一個(gè)Hive SQL查詢(xún)對(duì)應(yīng)的結(jié)果集覆获,等等马澈。在批式處理模式下處理數(shù)據(jù)的輸出時(shí),主要需要實(shí)現(xiàn)一個(gè)自定義的OutputFormat弄息,然后基于該OutputFormat來(lái)構(gòu)建一個(gè)Sink痊班,下面看下OutputFormat接口的定義,如下所示:

`@Public`

`public` `interface` `OutputFormat<IT>` `extends` `Serializable {`

`void` `configure(Configuration parameters);`

`void` `open(``int` `taskNumber,` `int` `numTasks)` `throws` `IOException;`

`void` `writeRecord(IT record)` `throws` `IOException;`

`void` `close()` `throws` `IOException;`

`}`

上面摹量,configure()方法用來(lái)配置一個(gè)OutputFormat的一些輸出參數(shù)涤伐;open()方法用來(lái)實(shí)現(xiàn)與外部存儲(chǔ)系統(tǒng)建立連接;writeRecord()方法用來(lái)實(shí)現(xiàn)對(duì)Flink Batch Job處理后缨称,將數(shù)據(jù)記錄輸出到外部存儲(chǔ)系統(tǒng)凝果。開(kāi)發(fā)Batch Job時(shí),通過(guò)調(diào)用DataSet的output()方法睦尽,參數(shù)值使用一個(gè)OutputFormat的具體實(shí)現(xiàn)即可器净。后面,我們會(huì)基于Elasticsearch來(lái)實(shí)現(xiàn)上面接口中的各個(gè)方法当凡。

Flink流式處理模式山害,運(yùn)行Flink Streaming Job時(shí)一般輸入的數(shù)據(jù)集為流數(shù)據(jù)集,也就是說(shuō)輸入數(shù)據(jù)元素會(huì)持續(xù)不斷地進(jìn)入到Streaming Job的處理過(guò)程中沿量,但你仍然可以使用一個(gè)HDFS數(shù)據(jù)文件作為Streaming Job的輸入浪慌,即使這樣,一個(gè)Flink Streaming Job啟動(dòng)運(yùn)行后便會(huì)永遠(yuǎn)運(yùn)行下去朴则,除非有意外故障或有計(jì)劃地操作使其終止眷射。在流式處理模式下處理數(shù)據(jù)的輸出時(shí),我們需要是實(shí)現(xiàn)一個(gè)SinkFunction佛掖,它指定了如下將流數(shù)據(jù)處理后的結(jié)果妖碉,輸出到指定的外部存儲(chǔ)系統(tǒng)中,下面看下SinkFunction的接口定義芥被,如下所示:

`@Public`

`public` `interface` `SinkFunction<IN>` `extends` `Function, Serializable {`

`@Deprecated`

`default` `void` `invoke(IN value)` `throws` `Exception {}`

`default` `void` `invoke(IN value, Context context)` `throws` `Exception {`

`invoke(value);`

`}`

`@Public`

`interface` `Context<T> {`

`long` `currentProcessingTime();`

`long` `currentWatermark();`

`Long timestamp();`

`}`

`}`

通過(guò)上面接口可以看到欧宜,需要實(shí)現(xiàn)一個(gè)invoke()方法,實(shí)現(xiàn)該方法來(lái)將一個(gè)輸入的IN value輸出到外部存儲(chǔ)系統(tǒng)中拴魄。一般情況下冗茸,對(duì)一些主流的外部存儲(chǔ)系統(tǒng),F(xiàn)link實(shí)現(xiàn)了一下內(nèi)置(社區(qū)貢獻(xiàn))的SinkFunction匹中,我們只需要配置一下就可以直接使用夏漱。而且,對(duì)于Streaming Job來(lái)說(shuō)顶捷,實(shí)現(xiàn)的SinkFunction比較豐富一些挂绰,可以減少自己開(kāi)發(fā)的工作量。開(kāi)發(fā)Streaming Job時(shí)服赎,通過(guò)調(diào)用DataStream的addSink()方法葵蒂,參數(shù)是一個(gè)SinkFlink的具體實(shí)現(xiàn)交播。

下面,我們分別基于批式處理模式和批式處理模式践付,分別使用或?qū)崿F(xiàn)對(duì)應(yīng)組件將Streaming Job和Batch Job的處理結(jié)果輸出到Elasticsearch中:

基于Flink DataSteam API實(shí)現(xiàn)

在開(kāi)發(fā)基于Flink的應(yīng)用程序過(guò)程中秦士,發(fā)現(xiàn)Flink Streaming API對(duì)Elasticsearch的支持還是比較好的,比如永高,如果想要從Kafka消費(fèi)事件記錄隧土,經(jīng)過(guò)處理最終將數(shù)據(jù)記錄索引到Elasticsearch 5.x,可以直接在Maven的POM文件中添加如下依賴(lài)即可:

`<``dependency``>`

`<``groupId``>org.apache.flink</``groupId``>`

`<``artifactId``>flink-connector-elasticsearch5_2.11</``artifactId``>`

`<``version``>1.5.3</``version``>`

`</``dependency``>`

我們使用Flink Streaming API來(lái)實(shí)現(xiàn)將流式數(shù)據(jù)處理后命爬,寫(xiě)入到Elasticsearch中次洼。其中,輸入數(shù)據(jù)源是Kafka中的某個(gè)Topic遇骑;輸出處理結(jié)果到lasticsearch中卖毁,我們使用使用Transport API的方式來(lái)連接Elasticsearch,需要指定Transport地址和端口落萎。具體實(shí)現(xiàn)亥啦,對(duì)應(yīng)的Scala代碼,如下所示:

`def` `main(args``:` `Array[String])``:` `Unit` `=` `{`

`// parse input arguments`

`val` `params` `=` `ParameterTool.fromArgs(args)`

`if` `(params.getNumberOfParameters <` `9``) {`

`val` `cmd` `=` `getClass.getName`

`println(``"Missing parameters!\n"`

`+` `"Usage: "` `+ cmd`

`+` `" --input-topic <topic> "`

`+` `"--es-cluster-name <es cluster name> "`

`+` `"--es-transport-addresses <es address> "`

`+` `"--es-port <es port> "`

`+` `"--es-index <es index> "`

`+` `"--es-type <es type> "`

`+` `"--bootstrap.servers <kafka brokers> "`

`+` `"--zookeeper.connect <zk quorum> "`

`+` `"--group.id <some id> [--prefix <prefix>]"``)`

`return`

`}`

`val` `env` `=` `StreamExecutionEnvironment.getExecutionEnvironment`

`val` `kafkaConsumer` `=` `new` `FlinkKafkaConsumer``010``[String](`

`params.getRequired(``"input-topic"``),`

`new` `SimpleStringSchema(),`

`params.getProperties`

`)`

`val` `dataStream` `=` `env`

`.addSource(kafkaConsumer)`

`.filter(!``_``.isEmpty)`

`val` `esClusterName` `=` `params.getRequired(``"es-cluster-name"``)`

`val` `esAddresses` `=` `params.getRequired(``"es-transport-addresses"``)`

`val` `esPort` `=` `params.getInt(``"es-port"``,` `9300``)`

`val` `transportAddresses` `=` `new` `java.util.ArrayList[InetSocketAddress]`

`val` `config` `=` `new` `java.util.HashMap[String, String]`

`config.put(``"cluster.name"``, esClusterName)`

`// This instructs the sink to emit after every element, otherwise they would be buffered`

`config.put(``"bulk.flush.max.actions"``,` `"100"``)`

`esAddresses.split(``","``).foreach(address` `=``> {`

`transportAddresses.add(``new` `InetSocketAddress(InetAddress.getByName(address), esPort))`

`})`

`val` `esIndex` `=` `params.getRequired(``"es-index"``)`

`val` `esType` `=` `params.getRequired(``"es-type"``)`

`val` `sink` `=` `new` `ElasticsearchSink(config, transportAddresses,` `new` `ElasticsearchSinkFunction[String] {`

`def` `createIndexRequest(element``:` `String)``:` `IndexRequest` `=` `{`

`return` `Requests.indexRequest()`

`.index(esIndex)`

`.```type```(esType)`

`.source(element)`

`}`
`override` `def` `process(t``:` `String, runtimeContext``:` `RuntimeContext, requestIndexer``:` `RequestIndexer)``:` `Unit` `=` `{`

`requestIndexer.add(createIndexRequest(t))`

`}`

`})`

`dataStream.addSink(sink)`

`val` `jobName` `=` `getClass.getSimpleName`

`env.execute(jobName)`

`}`

上面有關(guān)數(shù)據(jù)索引到Elasticsearch的處理中练链, 最核心的就是創(chuàng)建一個(gè)ElasticsearchSink翔脱,然后通過(guò)DataStream的API調(diào)用addSink()添加一個(gè)Sink,實(shí)際是一個(gè)SinkFunction的實(shí)現(xiàn)媒鼓,可以參考Flink對(duì)應(yīng)DataStream類(lèi)的addSink()方法代碼届吁,如下所示:

`def` `addSink(sinkFunction``:` `SinkFunction[T])``:` `DataStreamSink[T]` `=`

`stream.addSink(sinkFunction)`

基于Flink DataSet API實(shí)現(xiàn)

目前,F(xiàn)link還沒(méi)有在Batch處理模式下實(shí)現(xiàn)對(duì)應(yīng)Elasticsearch對(duì)應(yīng)的Connector绿鸣,需要自己根據(jù)需要實(shí)現(xiàn)疚沐,所以我們基于Flink已經(jīng)存在的Streaming處理模式下已經(jīng)實(shí)現(xiàn)的Elasticsearch Connector對(duì)應(yīng)的代碼,經(jīng)過(guò)部分修改潮模,可以直接拿來(lái)在Batch處理模式下亮蛔,將數(shù)據(jù)記錄批量索引到Elasticsearch中

我們基于Flink 1.6.1版本,以及Elasticsearch 6.3.2版本擎厢,并且使用Elasticsearch推薦的High Level REST API來(lái)實(shí)現(xiàn)(為了復(fù)用Flink 1.6.1中對(duì)應(yīng)的Streaming處理模式下的Elasticsearch 6 Connector實(shí)現(xiàn)代碼究流,我們選擇使用該REST Client),需要在Maven的POM文件中添加如下依賴(lài):

`<dependency>`

`<groupId>org.elasticsearch</groupId>`

`<artifactId>elasticsearch</artifactId>`

`<version>6.3.2</version>`

`</dependency>`

`<dependency>`

`<groupId>org.elasticsearch.client</groupId>`

`<artifactId>elasticsearch-rest-high-level-client</artifactId>`

`<version>6.3.2</version>`

`</dependency>`

我們實(shí)現(xiàn)的各個(gè)類(lèi)的類(lèi)圖及其關(guān)系动遭,如下圖所示:

如果熟悉Flink Streaming處理模式下Elasticsearch對(duì)應(yīng)的Connector實(shí)現(xiàn)芬探,可以看到上面的很多類(lèi)都在org.apache.flink.streaming.connectors.elasticsearch包里面存在,其中包括批量向Elasticsearch中索引數(shù)據(jù)(內(nèi)部實(shí)現(xiàn)了使用BulkProcessor)厘惦。上圖中引入的ElasticsearchApiCallBridge偷仿,目的是能夠?qū)崿F(xiàn)對(duì)Elasticsearch不同版本的支持,只需要根據(jù)Elasticsearch不同版本中不同Client實(shí)現(xiàn),進(jìn)行一些適配炎疆,上層抽象保持不變。

如果需要在Batch處理模式下批量索引數(shù)據(jù)到Elasticsearch国裳,可以直接使用ElasticsearchOutputFormat即可實(shí)現(xiàn)形入。但是創(chuàng)建ElasticsearchOutputFormat,需要幾個(gè)參數(shù):

`private` `ElasticsearchOutputFormat(`

`Map<String, String> bulkRequestsConfig,`

`List<HttpHost> httpHosts,`

`ElasticsearchSinkFunction<T> elasticsearchSinkFunction,`

`DocWriteRequestFailureHandler failureHandler,`

`RestClientFactory restClientFactory) {`

`super``(``new` `Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);`

`}`

當(dāng)然缝左,我們可以通過(guò)代碼中提供的Builder來(lái)非常方便的創(chuàng)建一個(gè)ElasticsearchOutputFormat亿遂。下面,我們看下我們Flink Batch Job實(shí)現(xiàn)邏輯渺杉。

  • 實(shí)現(xiàn)ElasticsearchSinkFunction

我們需要實(shí)現(xiàn)ElasticsearchSinkFunction接口蛇数,實(shí)現(xiàn)一個(gè)能夠索引數(shù)據(jù)到Elasticsearch中的功能,代碼如下所示:

`final` `ElasticsearchSinkFunction<String> elasticsearchSinkFunction =` `new` `ElasticsearchSinkFunction<String>() {`

`@Override`

`public` `void` `process(String element, RuntimeContext ctx, RequestIndexer indexer) {`

`indexer.add(createIndexRequest(element, parameterTool));`

`}`

`private` `IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {`

`LOG.info(``"Create index req: "` `+ element);`

`JSONObject o = JSONObject.parseObject(element);`

`return` `Requests.indexRequest()`

`.index(parameterTool.getRequired(``"es-index"``))`

`.type(parameterTool.getRequired(``"es-type"``))`

`.source(o);`

`}`

上面代碼是越,主要是把一個(gè)將要輸出的數(shù)據(jù)記錄耳舅,通過(guò)RequestIndexer來(lái)實(shí)現(xiàn)索引到Elasticsearch中。

  • 讀取Elasticsearch配置參數(shù)

配置連接Elasticsearch的參數(shù)倚评。從程序輸入的ParameterTool中讀取Elasticsearch相關(guān)的配置:

  • 創(chuàng)建ElasticsearchOutputFormat

創(chuàng)建一個(gè)我們實(shí)現(xiàn)的ElasticsearchOutputFormat浦徊,代碼片段如下所示:

上面很多配置項(xiàng)指定了向Elasticsearch中進(jìn)行批量寫(xiě)入的行為,在ElasticsearchOutputFormat內(nèi)部會(huì)進(jìn)行設(shè)置并創(chuàng)建Elasticsearch6BulkProcessorIndexer天梧,優(yōu)化索引數(shù)據(jù)處理的性能盔性。

  • 實(shí)現(xiàn)Batch Job主控制流程

最后我們就可以構(gòu)建我們的Flink Batch應(yīng)用程序了,代碼如下所示:

我們輸入的HDFS文件中呢岗,是一些已經(jīng)加工好的JSON格式記錄行冕香,這里為了簡(jiǎn)單,直接將原始JSON字符串索引到Elasticsearch中后豫,而沒(méi)有進(jìn)行更多其他的處理操作悉尾。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市挫酿,隨后出現(xiàn)的幾起案子焕襟,更是在濱河造成了極大的恐慌,老刑警劉巖饭豹,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸵赖,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡拄衰,警方通過(guò)查閱死者的電腦和手機(jī)它褪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)翘悉,“玉大人茫打,你說(shuō)我怎么就攤上這事。” “怎么了老赤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵轮洋,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我抬旺,道長(zhǎng)弊予,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任开财,我火速辦了婚禮汉柒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘责鳍。我一直安慰自己碾褂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布历葛。 她就那樣靜靜地躺著正塌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪恤溶。 梳的紋絲不亂的頭發(fā)上传货,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音宏娄,去河邊找鬼问裕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛孵坚,可吹牛的內(nèi)容都是我干的粮宛。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼卖宠,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼巍杈!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起扛伍,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤筷畦,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后刺洒,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體鳖宾,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年逆航,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鼎文。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡因俐,死狀恐怖拇惋,靈堂內(nèi)的尸體忽然破棺而出周偎,到底是詐尸還是另有隱情,我是刑警寧澤撑帖,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布蓉坎,位于F島的核電站,受9級(jí)特大地震影響胡嘿,放射性物質(zhì)發(fā)生泄漏蛉艾。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一灶平、第九天 我趴在偏房一處隱蔽的房頂上張望伺通。 院中可真熱鬧箍土,春花似錦逢享、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至沟堡,卻和暖如春侧但,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背航罗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工禀横, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人粥血。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓柏锄,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親复亏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子趾娃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容