一. 本文所需安裝包版本
Jdk:1.8.0_131
Zookeeper:zookeeper-3.4.14
Kafka:kafka_2.12-2.3.0
ElasticSearch:elasticsearch-6.8.1
Kibana:kibana-6.8.1-windows-x86_64
Flink:flink-1.7.2
二 window安裝flink
去flink官網下載flink
https://flink.apache.org/downloads.html
進入flink安裝bin目錄
雙擊start-cluster.bat
瀏覽器輸入http://localhost:8081
出現以下頁面則Flink安裝成功
flink安裝成功.png
三.編寫Java程序同步代碼
package com.deepexi.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.http.HttpHost;
import org.apache.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.*;
/**
* className:KafkaDemo
* description:
* author:
* date:2019-07-08 14:05
*/
public class KafkaDemo {
private static final Logger log = Logger.getLogger(KafkaDemo.class.getSimpleName());
public static void main(String[] args) throws Exception {
System.out.println("===============》 flink任務開始 ==============》");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置kafka連接參數
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test-group");
//設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置檢查點時間間隔
env.enableCheckpointing(5000);
//創(chuàng)建kafak消費者川蒙,獲取kafak中的數據
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<String>("topic001",new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaData = env.addSource(kafkaConsumer010);
DataStream<String> userData = kafkaData.map(new MapFunction<String, String>() {
@Override
public String map(String s) {
log.info(">>>>>>接收topic報文:"+s+"<<<<<");
return s;
}
});
List<HttpHost> esHttphost = new ArrayList<>();
esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
esHttphost,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
log.info("data:" + element);
return Requests.indexRequest()
.index("flink_index")
.type("flink_type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
// esSinkBuilder.setRestClientFactory(
// restClientBuilder -> {
// restClientBuilder.setDefaultHeaders()
// }
// );
esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());
userData.addSink(esSinkBuilder.build());
env.execute("flink learning connectors kafka");
}
}
package com.deepexi.flink;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClientBuilder;
/**
* className:RestClientFactoryImpl
* description:
* author:ChenYajun
* date:2019-07-10 15:46
*/
public class RestClientFactoryImpl implements RestClientFactory {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
restClientBuilder.setDefaultHeaders(headers); //以數組的形式可以添加多個header
restClientBuilder.setMaxRetryTimeoutMillis(90000);
}
}
pom依賴
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.2</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
<scope> compile</scope>
</dependency>
<!-- flink-streaming的jar包蚜厉,2.11為scala版本號 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.2</version>
<scope> compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.6.1</version>
<scope> compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
四.IDEA打Flink運行jar包
-
打開工程Project Structure目錄
IDEA打jar包1.png -
選擇jar包
IDEA打jar包2.png
3.選擇主函數
IDEA打jar包3.png
4.選擇依賴
IDEA打jar包4.png
5.build工程
IDEA打jar包5.png
六.window創(chuàng)建kafka topic
控制臺進入kafka安裝目錄的bin\windows目錄
#1.創(chuàng)建一個名為topic001的topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic001
#2.控制臺進入生產者producer
kafka-console-producer.bat --broker-list localhost:9092 --topic topic001
七.上傳jar包至flink
上傳jar包至Flink.png
上傳jar包至flink2.png
八.控制臺進入生產者并發(fā)送測試報文
producer發(fā)送測試報文.png
九.Kibana查看ElasticSearch同步報文
查看Flink同步消息至ElasticSearch,在Kibana查詢頁面查看報文
查看ES數據.png