Flink寫入數(shù)據(jù)到ElasticSearch
前言
我們知道flink自帶了很多連接器Connector,罕模,今天我們就用Elasticsearch Connector作為sink將數(shù)據(jù)寫入到Elasticsearch(以下簡(jiǎn)稱es)竿报。
Elasticsearch Connector
es安裝略赎瞎,可以參考網(wǎng)上文章或者我之前寫過(guò)的文章
添加依賴
可以看到flink和es依賴關(guān)系如下:
Maven依賴 | 支持自 | Elasticsearch版本 |
---|---|---|
flink-connector-elasticsearch_2.11 | 1.0.0 | 1.x |
flink-connector-elasticsearch2_2.11 | 1.0.0 | 2.x |
flink-connector-elasticsearch5_2.11 | 1.3.0 | 5.x |
flink-connector-elasticsearch6_2.11 | 1.6.0 | 6 and later versions |
Elasticsearch5.x
因?yàn)橹坝眠^(guò)的es是5.6.X揍很,首先加入的maven依賴是5.x-flink-connector-elasticsearch5_2.11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
但是運(yùn)行寫入es的程序報(bào)如下錯(cuò)誤:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.hfjy.bigdata.flink.sink.FlinkSinkToES.main(FlinkSinkToES.java:81)
Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:81)
at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:48)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
對(duì)于上面的報(bào)錯(cuò)檢查了es啟動(dòng)是否正常雷则,檢查了代碼里host是否正確辆雾,也debug了代碼,但是還未找到具體的原因月劈,代碼如下:
public class FlinkSinkToES {
private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES.class);
private static final String READ_TOPIC = "student";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "student-group-1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
//這個(gè) kafka topic 需要和上面的工具類的 topic 一致
READ_TOPIC,
new SimpleStringSchema(),
props)).setParallelism(1);
// .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對(duì)象
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "elasticsearch");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
// config.put("auth_user","elastic");
// config.put("auth_password","changeme");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
// transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
student.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
log.info("data:" + element);
return Requests.indexRequest()
.index("my-index-student-0211")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
env.execute("flink learning connectors kafka");
}
}
Elasticsearch6.x
寫入es5.x報(bào)上面的錯(cuò)度迂,但是還未找到具體的原因藤乙,所以決定換做es6
添加flink-es6的maven依賴
<!--es6-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
安裝es6之后啟動(dòng)報(bào)了如下錯(cuò)誤:
? elasticsearch-6.6.0 ./bin/elasticsearch
[2019-02-11T14:53:02,255][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [unknown] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:163) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:150) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
at org.elasticsearch.cli.Command.main(Command.java:90) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:116) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:93) ~[elasticsearch-6.6.0.jar:6.6.0]
Caused by: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
at org.elasticsearch.env.NodeEnvironment.<init>(NodeEnvironment.java:297) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.node.Node.<init>(Node.java:295) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.node.Node.<init>(Node.java:265) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Bootstrap$5.<init>(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:333) ~[elasticsearch-6.6.0.jar:6.6.0]
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:159) ~[elasticsearch-6.6.0.jar:6.6.0]
... 6 more
查了下應(yīng)該是已經(jīng)啟動(dòng)了es,如果再次啟動(dòng)es會(huì)報(bào)上面這個(gè)錯(cuò)誤惭墓,找到es的進(jìn)程kill掉坛梁,重新啟動(dòng)
參考: [https://blog.csdn.net/qq_38977441/article/details/80406126]
? elasticsearch-6.6.0 ps -ef | grep elastic
501 94853 93152 0 2:37PM ttys011 0:35.04 /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home//bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/var/folders/t6/v40m6tfx1x1b1_2lg2078tf80000gn/T/elasticsearch-465341793791663807 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0 -Des.path.conf=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/config -Des.distribution.flavor=default -Des.distribution.type=tar -cp /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/lib/* org.elasticsearch.bootstrap.Elasticsearch
501 94869 94853 0 2:37PM ttys011 0:00.03 /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/modules/x-pack-ml/platform/darwin-x86_64/bin/controller
501 95090 93152 0 2:55PM ttys011 0:00.00 grep --color=auto elastic
安裝kibana,啟動(dòng)報(bào)如下錯(cuò)誤:
? kibana-6.6.0-linux-x86_64 ./bin/kibana
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: cannot execute binary file
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: Undefined error: 0
可能是版本問(wèn)題腊凶,安裝的linux的划咐,換做mac后,再次啟動(dòng)kibana钧萍,啟動(dòng)日志如下:
? kibana-6.6.0-darwin-x86_64 ./bin/kibana
log [08:02:11.812] [warning][plugin] Skipping non-plugin directory at /Users/zzy/Documents/zzy/software/kibana-6.6.0-darwin-x86_64/src/legacy/core_plugins/ems_util
log [08:02:12.877] [info][status][plugin:kibana@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:12.910] [info][status][plugin:elasticsearch@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.912] [info][status][plugin:xpack_main@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.916] [info][status][plugin:graph@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.931] [info][status][plugin:monitoring@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:12.934] [info][status][plugin:spaces@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.940] [warning][security] Generating a random key for xpack.security.encryptionKey. To prevent sessions from being invalidated on restart, please set xpack.security.encryptionKey in kibana.yml
log [08:02:12.944] [warning][security] Session cookies will be transmitted over insecure connections. This is not recommended.
log [08:02:12.949] [info][status][plugin:security@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.966] [info][status][plugin:searchprofiler@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.968] [info][status][plugin:ml@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.997] [info][status][plugin:tilemap@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:12.999] [info][status][plugin:watcher@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.007] [info][status][plugin:grokdebugger@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.011] [info][status][plugin:dashboard_mode@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.018] [info][status][plugin:logstash@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.022] [info][status][plugin:beats_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.038] [info][status][plugin:apm@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.173] [info][status][plugin:interpreter@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.180] [info][status][plugin:canvas@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.184] [info][status][plugin:license_management@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.188] [info][status][plugin:index_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.207] [info][status][plugin:console@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.210] [info][status][plugin:console_extensions@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.213] [info][status][plugin:notifications@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.215] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.252] [info][status][plugin:infra@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.254] [info][status][plugin:rollup@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.263] [info][status][plugin:remote_clusters@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.268] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:13.274] [info][status][plugin:upgrade_assistant@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.281] [info][status][plugin:metrics@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:13.444] [info][status][plugin:timelion@6.6.0] Status changed from uninitialized to green - Ready
log [08:02:14.218] [warning][reporting] Generating a random key for xpack.reporting.encryptionKey. To prevent pending reports from failing on restart, please set xpack.reporting.encryptionKey in kibana.yml
log [08:02:14.223] [info][status][plugin:reporting@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [08:02:14.228] [info][status][plugin:elasticsearch@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.352] [info][license][xpack] Imported license information from Elasticsearch for the [data] cluster: mode: basic | status: active
log [08:02:14.355] [info][status][plugin:xpack_main@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.356] [info][status][plugin:graph@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.357] [info][status][plugin:searchprofiler@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.357] [info][status][plugin:ml@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.357] [info][status][plugin:tilemap@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.358] [info][status][plugin:watcher@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.358] [info][status][plugin:grokdebugger@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.358] [info][status][plugin:logstash@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.359] [info][status][plugin:beats_management@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.359] [info][status][plugin:index_management@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.359] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.360] [info][status][plugin:rollup@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.360] [info][status][plugin:remote_clusters@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.360] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.360] [info][status][plugin:reporting@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.361] [info][kibana-monitoring][monitoring-ui] Starting monitoring stats collection
log [08:02:14.369] [info][status][plugin:security@6.6.0] Status changed from yellow to green - Ready
log [08:02:14.431] [info][license][xpack] Imported license information from Elasticsearch for the [monitoring] cluster: mode: basic | status: active
log [08:02:15.707] [info][migrations] Creating index .kibana_1.
log [08:02:16.113] [info][migrations] Pointing alias .kibana to .kibana_1.
log [08:02:16.158] [info][migrations] Finished in 451ms.
log [08:02:16.159] [info][listening] Server running at http://127.0.0.1:5602
log [08:02:16.361] [info][status][plugin:spaces@6.6.0] Status changed from yellow to green - Ready
查看索引信息
GET _cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana_1 8nwhLfRcTmWOeNp5UTQGOQ 1 0 3 0 11.7kb 11.7kb
執(zhí)行flink寫es6的程序后,開(kāi)始是沒(méi)有索引index-student的數(shù)據(jù)的褐缠,需要加上如下代碼:
esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
-- RetryRejectedExecutionFailureHandler 來(lái)自flink的包package org.apache.flink.streaming.connectors.elasticsearch.util;
注意和官網(wǎng)的區(qū)別
builder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
);
public class RestClientFactoryImpl implements RestClientFactory {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
restClientBuilder.setDefaultHeaders(headers); //以數(shù)組的形式可以添加多個(gè)header
restClientBuilder.setMaxRetryTimeoutMillis(90000);
}
}
再次通過(guò)kibana查看索引
GET _cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open index-student VUUIiS2fQX2p5-JyzxEa7A 5 1 300 0 152.4kb 152.4kb
green open .kibana_1 8nwhLfRcTmWOeNp5UTQGOQ 1 0 3 0 11.9kb 11.9kb
看到是有index-student這個(gè)索引的,說(shuō)明flink寫入es成功风瘦,查看索引數(shù)據(jù)
GET /index-student/_search?pretty
{
"took" : 6,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 300,
"max_score" : 1.0,
"hits" : [
{
"_index" : "index-student",
"_type" : "student",
"_id" : "3cy_22gBPPMO6TTdKysy",
"_score" : 1.0,
"_source" : {
"data" : """{"age":123,"id":105,"name":"itzzy105","password":"password105"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "3sy_22gBPPMO6TTdKytx",
"_score" : 1.0,
"_source" : {
"data" : """{"age":130,"id":112,"name":"itzzy112","password":"password112"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "5sy_22gBPPMO6TTdKyt7",
"_score" : 1.0,
"_source" : {
"data" : """{"age":138,"id":120,"name":"itzzy120","password":"password120"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "7cy_22gBPPMO6TTdKyuA",
"_score" : 1.0,
"_source" : {
"data" : """{"age":142,"id":124,"name":"itzzy124","password":"password124"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "9sy_22gBPPMO6TTdKyuK",
"_score" : 1.0,
"_source" : {
"data" : """{"age":151,"id":133,"name":"itzzy133","password":"password133"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "-My_22gBPPMO6TTdKyuK",
"_score" : 1.0,
"_source" : {
"data" : """{"age":156,"id":138,"name":"itzzy138","password":"password138"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "-8y_22gBPPMO6TTdKyuP",
"_score" : 1.0,
"_source" : {
"data" : """{"age":155,"id":137,"name":"itzzy137","password":"password137"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "_sy_22gBPPMO6TTdKyuS",
"_score" : 1.0,
"_source" : {
"data" : """{"age":162,"id":144,"name":"itzzy144","password":"password144"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "AMy_22gBPPMO6TTdKyyS",
"_score" : 1.0,
"_source" : {
"data" : """{"age":159,"id":141,"name":"itzzy141","password":"password141"}"""
}
},
{
"_index" : "index-student",
"_type" : "student",
"_id" : "Acy_22gBPPMO6TTdKyyU",
"_score" : 1.0,
"_source" : {
"data" : """{"age":160,"id":142,"name":"itzzy142","password":"password142"}"""
}
}
]
}
}
附上flink寫es的代碼
public class FlinkSinkToES6 {
private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES6.class);
private static final String READ_TOPIC = "student-1";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "student-group-1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
//這個(gè) kafka topic 需要和上面的工具類的 topic 一致
READ_TOPIC,
new SimpleStringSchema(),
props)).setParallelism(1);
// .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對(duì)象
student.print();
log.info("student:" + student);
List<HttpHost> esHttphost = new ArrayList<>();
esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
esHttphost,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
log.info("data:" + element);
return Requests.indexRequest()
.index("index-student")
.type("student")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
// esSinkBuilder.setRestClientFactory(
// restClientBuilder -> {
// restClientBuilder.setDefaultHeaders()
// }
// );
esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
student.addSink(esSinkBuilder.build());
env.execute("flink learning connectors kafka");
}
}
kafka生產(chǎn)者代碼
public class KafkaUtils {
private static final String broker_list = "localhost:9092";
private static final String topic = "student-1"; //kafka topic 需要和 flink 程序用同一個(gè) topic
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
try {
for (int i = 1; i <= 100; i++) {
Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
producer.send(record);
System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(student));
}
Thread.sleep(3000);
}catch (Exception e){
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
private int id;
private String name;
private String password;
private int age;
}
參考: