Flink寫入數(shù)據(jù)到ElasticSearch

Flink寫入數(shù)據(jù)到ElasticSearch

前言

我們知道flink自帶了很多連接器Connector,罕模,今天我們就用Elasticsearch Connector作為sink將數(shù)據(jù)寫入到Elasticsearch(以下簡(jiǎn)稱es)竿报。
Elasticsearch Connector

es安裝略赎瞎,可以參考網(wǎng)上文章或者我之前寫過(guò)的文章

添加依賴

可以看到flink和es依賴關(guān)系如下:

Maven依賴 支持自 Elasticsearch版本
flink-connector-elasticsearch_2.11 1.0.0 1.x
flink-connector-elasticsearch2_2.11 1.0.0 2.x
flink-connector-elasticsearch5_2.11 1.3.0 5.x
flink-connector-elasticsearch6_2.11 1.6.0 6 and later versions

Elasticsearch5.x

因?yàn)橹坝眠^(guò)的es是5.6.X揍很,首先加入的maven依賴是5.x-flink-connector-elasticsearch5_2.11

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

但是運(yùn)行寫入es的程序報(bào)如下錯(cuò)誤:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at com.hfjy.bigdata.flink.sink.FlinkSinkToES.main(FlinkSinkToES.java:81)
Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:81)
    at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:48)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

對(duì)于上面的報(bào)錯(cuò)檢查了es啟動(dòng)是否正常雷则,檢查了代碼里host是否正確辆雾,也debug了代碼,但是還未找到具體的原因月劈,代碼如下:

public class FlinkSinkToES {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES.class);

    private static final String READ_TOPIC = "student";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group-1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                //這個(gè) kafka topic 需要和上面的工具類的 topic 一致
                READ_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
//                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對(duì)象
        Map<String, String> config = new HashMap<>();
        config.put("cluster.name", "elasticsearch");
// This instructs the sink to emit after every element, otherwise they would be buffered
        config.put("bulk.flush.max.actions", "1");
//        config.put("auth_user","elastic");
//        config.put("auth_password","changeme");

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
//        transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

        student.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);
                log.info("data:" + element);
                return Requests.indexRequest()
                        .index("my-index-student-0211")
                        .type("my-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

        env.execute("flink learning connectors kafka");
    }
}

Elasticsearch6.x

寫入es5.x報(bào)上面的錯(cuò)度迂,但是還未找到具體的原因藤乙,所以決定換做es6

添加flink-es6的maven依賴

<!--es6-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

安裝es6之后啟動(dòng)報(bào)了如下錯(cuò)誤:

?  elasticsearch-6.6.0 ./bin/elasticsearch
[2019-02-11T14:53:02,255][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [unknown] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
    at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:163) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:150) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.Command.main(Command.java:90) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:116) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:93) ~[elasticsearch-6.6.0.jar:6.6.0]
Caused by: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
    at org.elasticsearch.env.NodeEnvironment.<init>(NodeEnvironment.java:297) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.node.Node.<init>(Node.java:295) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.node.Node.<init>(Node.java:265) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap$5.<init>(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:333) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:159) ~[elasticsearch-6.6.0.jar:6.6.0]
    ... 6 more

查了下應(yīng)該是已經(jīng)啟動(dòng)了es,如果再次啟動(dòng)es會(huì)報(bào)上面這個(gè)錯(cuò)誤惭墓,找到es的進(jìn)程kill掉坛梁,重新啟動(dòng)
參考: [https://blog.csdn.net/qq_38977441/article/details/80406126]

?  elasticsearch-6.6.0 ps -ef | grep elastic
  501 94853 93152   0  2:37PM ttys011    0:35.04 /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home//bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/var/folders/t6/v40m6tfx1x1b1_2lg2078tf80000gn/T/elasticsearch-465341793791663807 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0 -Des.path.conf=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/config -Des.distribution.flavor=default -Des.distribution.type=tar -cp /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/lib/* org.elasticsearch.bootstrap.Elasticsearch
  501 94869 94853   0  2:37PM ttys011    0:00.03 /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/modules/x-pack-ml/platform/darwin-x86_64/bin/controller
  501 95090 93152   0  2:55PM ttys011    0:00.00 grep --color=auto elastic

安裝kibana,啟動(dòng)報(bào)如下錯(cuò)誤:

?  kibana-6.6.0-linux-x86_64 ./bin/kibana
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: cannot execute binary file
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: Undefined error: 0

可能是版本問(wèn)題腊凶,安裝的linux的划咐,換做mac后,再次啟動(dòng)kibana钧萍,啟動(dòng)日志如下:

?  kibana-6.6.0-darwin-x86_64 ./bin/kibana
  log   [08:02:11.812] [warning][plugin] Skipping non-plugin directory at /Users/zzy/Documents/zzy/software/kibana-6.6.0-darwin-x86_64/src/legacy/core_plugins/ems_util
  log   [08:02:12.877] [info][status][plugin:kibana@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:12.910] [info][status][plugin:elasticsearch@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.912] [info][status][plugin:xpack_main@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.916] [info][status][plugin:graph@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.931] [info][status][plugin:monitoring@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:12.934] [info][status][plugin:spaces@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.940] [warning][security] Generating a random key for xpack.security.encryptionKey. To prevent sessions from being invalidated on restart, please set xpack.security.encryptionKey in kibana.yml
  log   [08:02:12.944] [warning][security] Session cookies will be transmitted over insecure connections. This is not recommended.
  log   [08:02:12.949] [info][status][plugin:security@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.966] [info][status][plugin:searchprofiler@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.968] [info][status][plugin:ml@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.997] [info][status][plugin:tilemap@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.999] [info][status][plugin:watcher@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.007] [info][status][plugin:grokdebugger@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.011] [info][status][plugin:dashboard_mode@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.018] [info][status][plugin:logstash@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.022] [info][status][plugin:beats_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.038] [info][status][plugin:apm@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.173] [info][status][plugin:interpreter@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.180] [info][status][plugin:canvas@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.184] [info][status][plugin:license_management@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.188] [info][status][plugin:index_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.207] [info][status][plugin:console@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.210] [info][status][plugin:console_extensions@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.213] [info][status][plugin:notifications@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.215] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.252] [info][status][plugin:infra@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.254] [info][status][plugin:rollup@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.263] [info][status][plugin:remote_clusters@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.268] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.274] [info][status][plugin:upgrade_assistant@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.281] [info][status][plugin:metrics@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.444] [info][status][plugin:timelion@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:14.218] [warning][reporting] Generating a random key for xpack.reporting.encryptionKey. To prevent pending reports from failing on restart, please set xpack.reporting.encryptionKey in kibana.yml
  log   [08:02:14.223] [info][status][plugin:reporting@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:14.228] [info][status][plugin:elasticsearch@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.352] [info][license][xpack] Imported license information from Elasticsearch for the [data] cluster: mode: basic | status: active
  log   [08:02:14.355] [info][status][plugin:xpack_main@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.356] [info][status][plugin:graph@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:searchprofiler@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:ml@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:tilemap@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:watcher@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:grokdebugger@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:logstash@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:beats_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:index_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:rollup@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:remote_clusters@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:reporting@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.361] [info][kibana-monitoring][monitoring-ui] Starting monitoring stats collection
  log   [08:02:14.369] [info][status][plugin:security@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.431] [info][license][xpack] Imported license information from Elasticsearch for the [monitoring] cluster: mode: basic | status: active
  log   [08:02:15.707] [info][migrations] Creating index .kibana_1.
  log   [08:02:16.113] [info][migrations] Pointing alias .kibana to .kibana_1.
  log   [08:02:16.158] [info][migrations] Finished in 451ms.
  log   [08:02:16.159] [info][listening] Server running at http://127.0.0.1:5602
  log   [08:02:16.361] [info][status][plugin:spaces@6.6.0] Status changed from yellow to green - Ready

查看索引信息

GET _cat/indices?v

health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_1 8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.7kb         11.7kb

執(zhí)行flink寫es6的程序后,開(kāi)始是沒(méi)有索引index-student的數(shù)據(jù)的褐缠,需要加上如下代碼:

esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

-- RetryRejectedExecutionFailureHandler 來(lái)自flink的包package org.apache.flink.streaming.connectors.elasticsearch.util;

注意和官網(wǎng)的區(qū)別

builder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
);

Elasticsearch Connector

public class RestClientFactoryImpl implements RestClientFactory {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
        restClientBuilder.setDefaultHeaders(headers); //以數(shù)組的形式可以添加多個(gè)header
        restClientBuilder.setMaxRetryTimeoutMillis(90000);
    }
}

再次通過(guò)kibana查看索引

GET _cat/indices?v

health status index         uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   index-student VUUIiS2fQX2p5-JyzxEa7A   5   1        300            0    152.4kb        152.4kb
green  open   .kibana_1     8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.9kb         11.9kb

看到是有index-student這個(gè)索引的,說(shuō)明flink寫入es成功风瘦,查看索引數(shù)據(jù)

GET /index-student/_search?pretty

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 300,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "3cy_22gBPPMO6TTdKysy",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":123,"id":105,"name":"itzzy105","password":"password105"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "3sy_22gBPPMO6TTdKytx",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":130,"id":112,"name":"itzzy112","password":"password112"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "5sy_22gBPPMO6TTdKyt7",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":138,"id":120,"name":"itzzy120","password":"password120"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "7cy_22gBPPMO6TTdKyuA",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":142,"id":124,"name":"itzzy124","password":"password124"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "9sy_22gBPPMO6TTdKyuK",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":151,"id":133,"name":"itzzy133","password":"password133"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "-My_22gBPPMO6TTdKyuK",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":156,"id":138,"name":"itzzy138","password":"password138"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "-8y_22gBPPMO6TTdKyuP",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":155,"id":137,"name":"itzzy137","password":"password137"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "_sy_22gBPPMO6TTdKyuS",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":162,"id":144,"name":"itzzy144","password":"password144"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "AMy_22gBPPMO6TTdKyyS",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":159,"id":141,"name":"itzzy141","password":"password141"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "Acy_22gBPPMO6TTdKyyU",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":160,"id":142,"name":"itzzy142","password":"password142"}"""
        }
      }
    ]
  }
}

附上flink寫es的代碼

public class FlinkSinkToES6 {

    private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES6.class);

    private static final String READ_TOPIC = "student-1";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group-1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                //這個(gè) kafka topic 需要和上面的工具類的 topic 一致
                READ_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
//                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對(duì)象
        student.print();
        log.info("student:" + student);
        List<HttpHost> esHttphost = new ArrayList<>();
        esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                esHttphost,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        log.info("data:" + element);

                        return Requests.indexRequest()
                                .index("index-student")
                                .type("student")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        esSinkBuilder.setBulkFlushMaxActions(1);
//        esSinkBuilder.setRestClientFactory(
//                restClientBuilder -> {
//                    restClientBuilder.setDefaultHeaders()
//                }
//        );
        esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

        student.addSink(esSinkBuilder.build());
        env.execute("flink learning connectors kafka");
    }
}

kafka生產(chǎn)者代碼

public class KafkaUtils {
    private static final String broker_list = "localhost:9092";
    private static final String topic = "student-1";  //kafka topic 需要和 flink 程序用同一個(gè) topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        try {
            for (int i = 1; i <= 100; i++) {
                Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
                ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
                producer.send(record);
                System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(student));
            }
            Thread.sleep(3000);
        }catch (Exception e){

        }

        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {

    private int id;
    private String name;
    private String password;
    private int age;

}

參考:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末队魏,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子万搔,更是在濱河造成了極大的恐慌胡桨,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瞬雹,死亡現(xiàn)場(chǎng)離奇詭異昧谊,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)酗捌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門呢诬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人意敛,你說(shuō)我怎么就攤上這事馅巷。” “怎么了草姻?”我有些...
    開(kāi)封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵钓猬,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我撩独,道長(zhǎng)敞曹,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任综膀,我火速辦了婚禮澳迫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘剧劝。我一直安慰自己橄登,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著拢锹,像睡著了一般谣妻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上卒稳,一...
    開(kāi)封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天蹋半,我揣著相機(jī)與錄音,去河邊找鬼充坑。 笑死减江,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捻爷。 我是一名探鬼主播辈灼,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼役衡!你這毒婦竟也來(lái)了茵休?” 一聲冷哼從身側(cè)響起薪棒,我...
    開(kāi)封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤手蝎,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后俐芯,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體棵介,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年吧史,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了邮辽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片和泌。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡镐躲,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出歧寺,到底是詐尸還是另有隱情钞脂,我是刑警寧澤揣云,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站冰啃,受9級(jí)特大地震影響邓夕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜阎毅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一焚刚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扇调,春花似錦矿咕、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)雌团。三九已至,卻和暖如春士聪,著一層夾襖步出監(jiān)牢的瞬間锦援,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工剥悟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留灵寺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓区岗,卻偏偏與公主長(zhǎng)得像略板,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子慈缔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容