使用Flink處理數(shù)據(jù)時(shí)象迎,可以基于Flink提供的批式處理(Batch Processing)和流式處理(Streaming Processing)API來(lái)實(shí)現(xiàn)汤徽,分別能夠滿(mǎn)足不同場(chǎng)景下應(yīng)用數(shù)據(jù)的處理。這兩種模式下辐董,輸入處理都被抽象為Source Operator雾棺,包含對(duì)應(yīng)輸入數(shù)據(jù)的處理邏輯;輸出處理都被抽象為Sink Operator层释,包含了對(duì)應(yīng)輸出數(shù)據(jù)的處理邏輯。這里快集,我們只關(guān)注輸出的Sink Operator實(shí)現(xiàn)贡羔。
Flink批式處理模式,運(yùn)行Flink Batch Job時(shí)作用在有界的輸入數(shù)據(jù)集上碍讨,所以Job運(yùn)行的時(shí)間是有時(shí)限的治力,一旦Job運(yùn)行完成蒙秒,對(duì)應(yīng)的整個(gè)數(shù)據(jù)處理應(yīng)用就已經(jīng)結(jié)束勃黍,比如,輸入是一個(gè)數(shù)據(jù)文件晕讲,或者一個(gè)Hive SQL查詢(xún)對(duì)應(yīng)的結(jié)果集覆获,等等马澈。在批式處理模式下處理數(shù)據(jù)的輸出時(shí),主要需要實(shí)現(xiàn)一個(gè)自定義的OutputFormat弄息,然后基于該OutputFormat來(lái)構(gòu)建一個(gè)Sink痊班,下面看下OutputFormat接口的定義,如下所示:
`@Public`
`public` `interface` `OutputFormat<IT>` `extends` `Serializable {`
`void` `configure(Configuration parameters);`
`void` `open(``int` `taskNumber,` `int` `numTasks)` `throws` `IOException;`
`void` `writeRecord(IT record)` `throws` `IOException;`
`void` `close()` `throws` `IOException;`
`}`
上面摹量,configure()方法用來(lái)配置一個(gè)OutputFormat的一些輸出參數(shù)涤伐;open()方法用來(lái)實(shí)現(xiàn)與外部存儲(chǔ)系統(tǒng)建立連接;writeRecord()方法用來(lái)實(shí)現(xiàn)對(duì)Flink Batch Job處理后缨称,將數(shù)據(jù)記錄輸出到外部存儲(chǔ)系統(tǒng)凝果。開(kāi)發(fā)Batch Job時(shí),通過(guò)調(diào)用DataSet的output()方法睦尽,參數(shù)值使用一個(gè)OutputFormat的具體實(shí)現(xiàn)即可器净。后面,我們會(huì)基于Elasticsearch來(lái)實(shí)現(xiàn)上面接口中的各個(gè)方法当凡。
Flink流式處理模式山害,運(yùn)行Flink Streaming Job時(shí)一般輸入的數(shù)據(jù)集為流數(shù)據(jù)集,也就是說(shuō)輸入數(shù)據(jù)元素會(huì)持續(xù)不斷地進(jìn)入到Streaming Job的處理過(guò)程中沿量,但你仍然可以使用一個(gè)HDFS數(shù)據(jù)文件作為Streaming Job的輸入浪慌,即使這樣,一個(gè)Flink Streaming Job啟動(dòng)運(yùn)行后便會(huì)永遠(yuǎn)運(yùn)行下去朴则,除非有意外故障或有計(jì)劃地操作使其終止眷射。在流式處理模式下處理數(shù)據(jù)的輸出時(shí),我們需要是實(shí)現(xiàn)一個(gè)SinkFunction佛掖,它指定了如下將流數(shù)據(jù)處理后的結(jié)果妖碉,輸出到指定的外部存儲(chǔ)系統(tǒng)中,下面看下SinkFunction的接口定義芥被,如下所示:
`@Public`
`public` `interface` `SinkFunction<IN>` `extends` `Function, Serializable {`
`@Deprecated`
`default` `void` `invoke(IN value)` `throws` `Exception {}`
`default` `void` `invoke(IN value, Context context)` `throws` `Exception {`
`invoke(value);`
`}`
`@Public`
`interface` `Context<T> {`
`long` `currentProcessingTime();`
`long` `currentWatermark();`
`Long timestamp();`
`}`
`}`
通過(guò)上面接口可以看到欧宜,需要實(shí)現(xiàn)一個(gè)invoke()方法,實(shí)現(xiàn)該方法來(lái)將一個(gè)輸入的IN value輸出到外部存儲(chǔ)系統(tǒng)中拴魄。一般情況下冗茸,對(duì)一些主流的外部存儲(chǔ)系統(tǒng),F(xiàn)link實(shí)現(xiàn)了一下內(nèi)置(社區(qū)貢獻(xiàn))的SinkFunction匹中,我們只需要配置一下就可以直接使用夏漱。而且,對(duì)于Streaming Job來(lái)說(shuō)顶捷,實(shí)現(xiàn)的SinkFunction比較豐富一些挂绰,可以減少自己開(kāi)發(fā)的工作量。開(kāi)發(fā)Streaming Job時(shí)服赎,通過(guò)調(diào)用DataStream的addSink()方法葵蒂,參數(shù)是一個(gè)SinkFlink的具體實(shí)現(xiàn)交播。
下面,我們分別基于批式處理模式和批式處理模式践付,分別使用或?qū)崿F(xiàn)對(duì)應(yīng)組件將Streaming Job和Batch Job的處理結(jié)果輸出到Elasticsearch中:
基于Flink DataSteam API實(shí)現(xiàn)
在開(kāi)發(fā)基于Flink的應(yīng)用程序過(guò)程中秦士,發(fā)現(xiàn)Flink Streaming API對(duì)Elasticsearch的支持還是比較好的,比如永高,如果想要從Kafka消費(fèi)事件記錄隧土,經(jīng)過(guò)處理最終將數(shù)據(jù)記錄索引到Elasticsearch 5.x,可以直接在Maven的POM文件中添加如下依賴(lài)即可:
`<``dependency``>`
`<``groupId``>org.apache.flink</``groupId``>`
`<``artifactId``>flink-connector-elasticsearch5_2.11</``artifactId``>`
`<``version``>1.5.3</``version``>`
`</``dependency``>`
我們使用Flink Streaming API來(lái)實(shí)現(xiàn)將流式數(shù)據(jù)處理后命爬,寫(xiě)入到Elasticsearch中次洼。其中,輸入數(shù)據(jù)源是Kafka中的某個(gè)Topic遇骑;輸出處理結(jié)果到lasticsearch中卖毁,我們使用使用Transport API的方式來(lái)連接Elasticsearch,需要指定Transport地址和端口落萎。具體實(shí)現(xiàn)亥啦,對(duì)應(yīng)的Scala代碼,如下所示:
`def` `main(args``:` `Array[String])``:` `Unit` `=` `{`
`// parse input arguments`
`val` `params` `=` `ParameterTool.fromArgs(args)`
`if` `(params.getNumberOfParameters <` `9``) {`
`val` `cmd` `=` `getClass.getName`
`println(``"Missing parameters!\n"`
`+` `"Usage: "` `+ cmd`
`+` `" --input-topic <topic> "`
`+` `"--es-cluster-name <es cluster name> "`
`+` `"--es-transport-addresses <es address> "`
`+` `"--es-port <es port> "`
`+` `"--es-index <es index> "`
`+` `"--es-type <es type> "`
`+` `"--bootstrap.servers <kafka brokers> "`
`+` `"--zookeeper.connect <zk quorum> "`
`+` `"--group.id <some id> [--prefix <prefix>]"``)`
`return`
`}`
`val` `env` `=` `StreamExecutionEnvironment.getExecutionEnvironment`
`val` `kafkaConsumer` `=` `new` `FlinkKafkaConsumer``010``[String](`
`params.getRequired(``"input-topic"``),`
`new` `SimpleStringSchema(),`
`params.getProperties`
`)`
`val` `dataStream` `=` `env`
`.addSource(kafkaConsumer)`
`.filter(!``_``.isEmpty)`
`val` `esClusterName` `=` `params.getRequired(``"es-cluster-name"``)`
`val` `esAddresses` `=` `params.getRequired(``"es-transport-addresses"``)`
`val` `esPort` `=` `params.getInt(``"es-port"``,` `9300``)`
`val` `transportAddresses` `=` `new` `java.util.ArrayList[InetSocketAddress]`
`val` `config` `=` `new` `java.util.HashMap[String, String]`
`config.put(``"cluster.name"``, esClusterName)`
`// This instructs the sink to emit after every element, otherwise they would be buffered`
`config.put(``"bulk.flush.max.actions"``,` `"100"``)`
`esAddresses.split(``","``).foreach(address` `=``> {`
`transportAddresses.add(``new` `InetSocketAddress(InetAddress.getByName(address), esPort))`
`})`
`val` `esIndex` `=` `params.getRequired(``"es-index"``)`
`val` `esType` `=` `params.getRequired(``"es-type"``)`
`val` `sink` `=` `new` `ElasticsearchSink(config, transportAddresses,` `new` `ElasticsearchSinkFunction[String] {`
`def` `createIndexRequest(element``:` `String)``:` `IndexRequest` `=` `{`
`return` `Requests.indexRequest()`
`.index(esIndex)`
`.```type```(esType)`
`.source(element)`
`}`
`override` `def` `process(t``:` `String, runtimeContext``:` `RuntimeContext, requestIndexer``:` `RequestIndexer)``:` `Unit` `=` `{`
`requestIndexer.add(createIndexRequest(t))`
`}`
`})`
`dataStream.addSink(sink)`
`val` `jobName` `=` `getClass.getSimpleName`
`env.execute(jobName)`
`}`
上面有關(guān)數(shù)據(jù)索引到Elasticsearch的處理中练链, 最核心的就是創(chuàng)建一個(gè)ElasticsearchSink翔脱,然后通過(guò)DataStream的API調(diào)用addSink()添加一個(gè)Sink,實(shí)際是一個(gè)SinkFunction的實(shí)現(xiàn)媒鼓,可以參考Flink對(duì)應(yīng)DataStream類(lèi)的addSink()方法代碼届吁,如下所示:
`def` `addSink(sinkFunction``:` `SinkFunction[T])``:` `DataStreamSink[T]` `=`
`stream.addSink(sinkFunction)`
基于Flink DataSet API實(shí)現(xiàn)
目前,F(xiàn)link還沒(méi)有在Batch處理模式下實(shí)現(xiàn)對(duì)應(yīng)Elasticsearch對(duì)應(yīng)的Connector绿鸣,需要自己根據(jù)需要實(shí)現(xiàn)疚沐,所以我們基于Flink已經(jīng)存在的Streaming處理模式下已經(jīng)實(shí)現(xiàn)的Elasticsearch Connector對(duì)應(yīng)的代碼,經(jīng)過(guò)部分修改潮模,可以直接拿來(lái)在Batch處理模式下亮蛔,將數(shù)據(jù)記錄批量索引到Elasticsearch中
我們基于Flink 1.6.1版本,以及Elasticsearch 6.3.2版本擎厢,并且使用Elasticsearch推薦的High Level REST API來(lái)實(shí)現(xiàn)(為了復(fù)用Flink 1.6.1中對(duì)應(yīng)的Streaming處理模式下的Elasticsearch 6 Connector實(shí)現(xiàn)代碼究流,我們選擇使用該REST Client),需要在Maven的POM文件中添加如下依賴(lài):
`<dependency>`
`<groupId>org.elasticsearch</groupId>`
`<artifactId>elasticsearch</artifactId>`
`<version>6.3.2</version>`
`</dependency>`
`<dependency>`
`<groupId>org.elasticsearch.client</groupId>`
`<artifactId>elasticsearch-rest-high-level-client</artifactId>`
`<version>6.3.2</version>`
`</dependency>`
我們實(shí)現(xiàn)的各個(gè)類(lèi)的類(lèi)圖及其關(guān)系动遭,如下圖所示:
如果熟悉Flink Streaming處理模式下Elasticsearch對(duì)應(yīng)的Connector實(shí)現(xiàn)芬探,可以看到上面的很多類(lèi)都在org.apache.flink.streaming.connectors.elasticsearch包里面存在,其中包括批量向Elasticsearch中索引數(shù)據(jù)(內(nèi)部實(shí)現(xiàn)了使用BulkProcessor)厘惦。上圖中引入的ElasticsearchApiCallBridge偷仿,目的是能夠?qū)崿F(xiàn)對(duì)Elasticsearch不同版本的支持,只需要根據(jù)Elasticsearch不同版本中不同Client實(shí)現(xiàn),進(jìn)行一些適配炎疆,上層抽象保持不變。
如果需要在Batch處理模式下批量索引數(shù)據(jù)到Elasticsearch国裳,可以直接使用ElasticsearchOutputFormat即可實(shí)現(xiàn)形入。但是創(chuàng)建ElasticsearchOutputFormat,需要幾個(gè)參數(shù):
`private` `ElasticsearchOutputFormat(`
`Map<String, String> bulkRequestsConfig,`
`List<HttpHost> httpHosts,`
`ElasticsearchSinkFunction<T> elasticsearchSinkFunction,`
`DocWriteRequestFailureHandler failureHandler,`
`RestClientFactory restClientFactory) {`
`super``(``new` `Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);`
`}`
當(dāng)然缝左,我們可以通過(guò)代碼中提供的Builder來(lái)非常方便的創(chuàng)建一個(gè)ElasticsearchOutputFormat亿遂。下面,我們看下我們Flink Batch Job實(shí)現(xiàn)邏輯渺杉。
- 實(shí)現(xiàn)ElasticsearchSinkFunction
我們需要實(shí)現(xiàn)ElasticsearchSinkFunction接口蛇数,實(shí)現(xiàn)一個(gè)能夠索引數(shù)據(jù)到Elasticsearch中的功能,代碼如下所示:
`final` `ElasticsearchSinkFunction<String> elasticsearchSinkFunction =` `new` `ElasticsearchSinkFunction<String>() {`
`@Override`
`public` `void` `process(String element, RuntimeContext ctx, RequestIndexer indexer) {`
`indexer.add(createIndexRequest(element, parameterTool));`
`}`
`private` `IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {`
`LOG.info(``"Create index req: "` `+ element);`
`JSONObject o = JSONObject.parseObject(element);`
`return` `Requests.indexRequest()`
`.index(parameterTool.getRequired(``"es-index"``))`
`.type(parameterTool.getRequired(``"es-type"``))`
`.source(o);`
`}`
上面代碼是越,主要是把一個(gè)將要輸出的數(shù)據(jù)記錄耳舅,通過(guò)RequestIndexer來(lái)實(shí)現(xiàn)索引到Elasticsearch中。
- 讀取Elasticsearch配置參數(shù)
配置連接Elasticsearch的參數(shù)倚评。從程序輸入的ParameterTool中讀取Elasticsearch相關(guān)的配置:
- 創(chuàng)建ElasticsearchOutputFormat
創(chuàng)建一個(gè)我們實(shí)現(xiàn)的ElasticsearchOutputFormat浦徊,代碼片段如下所示:
上面很多配置項(xiàng)指定了向Elasticsearch中進(jìn)行批量寫(xiě)入的行為,在ElasticsearchOutputFormat內(nèi)部會(huì)進(jìn)行設(shè)置并創(chuàng)建Elasticsearch6BulkProcessorIndexer天梧,優(yōu)化索引數(shù)據(jù)處理的性能盔性。
- 實(shí)現(xiàn)Batch Job主控制流程
最后我們就可以構(gòu)建我們的Flink Batch應(yīng)用程序了,代碼如下所示:
我們輸入的HDFS文件中呢岗,是一些已經(jīng)加工好的JSON格式記錄行冕香,這里為了簡(jiǎn)單,直接將原始JSON字符串索引到Elasticsearch中后豫,而沒(méi)有進(jìn)行更多其他的處理操作悉尾。