Flink讀取Kafka消息同步到ElasticSearch

一. 本文所需安裝包版本

Jdk:1.8.0_131
Zookeeper:zookeeper-3.4.14
Kafka:kafka_2.12-2.3.0
ElasticSearch:elasticsearch-6.8.1
Kibana:kibana-6.8.1-windows-x86_64
Flink:flink-1.7.2

二 window安裝flink

去flink官網下載flink
https://flink.apache.org/downloads.html

進入flink安裝bin目錄
雙擊start-cluster.bat
瀏覽器輸入http://localhost:8081
出現以下頁面則Flink安裝成功

flink安裝成功.png

三.編寫Java程序同步代碼

package com.deepexi.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.http.HttpHost;
import org.apache.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.*;

/**
 * className:KafkaDemo
 * description:
 * author:
 * date:2019-07-08 14:05
 */
public class KafkaDemo {
    private static final Logger log = Logger.getLogger(KafkaDemo.class.getSimpleName());


    public static void main(String[] args) throws Exception {
        System.out.println("===============》 flink任務開始  ==============》");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置kafka連接參數
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test-group");
        //設置時間類型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設置檢查點時間間隔
        env.enableCheckpointing(5000);
        //創(chuàng)建kafak消費者川蒙,獲取kafak中的數據
        FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<String>("topic001",new SimpleStringSchema(), properties);
        DataStreamSource<String> kafkaData = env.addSource(kafkaConsumer010);
        DataStream<String> userData = kafkaData.map(new MapFunction<String, String>() {

            @Override
            public String map(String s) {
                log.info(">>>>>>接收topic報文:"+s+"<<<<<");
                return s;
            }
        });

        List<HttpHost> esHttphost = new ArrayList<>();
        esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                esHttphost,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        log.info("data:" + element);

                        return Requests.indexRequest()
                                .index("flink_index")
                                .type("flink_type")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        esSinkBuilder.setBulkFlushMaxActions(1);
//        esSinkBuilder.setRestClientFactory(
//                restClientBuilder -> {
//                    restClientBuilder.setDefaultHeaders()
//                }
//        );
        esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

        userData.addSink(esSinkBuilder.build());
        env.execute("flink learning connectors kafka");
    }
}
package com.deepexi.flink;

import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClientBuilder;

/**
 * className:RestClientFactoryImpl
 * description:
 * author:ChenYajun
 * date:2019-07-10 15:46
 */
public class RestClientFactoryImpl implements RestClientFactory {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
        restClientBuilder.setDefaultHeaders(headers); //以數組的形式可以添加多個header
        restClientBuilder.setMaxRetryTimeoutMillis(90000);
    }
}

pom依賴

  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.build.timestamp.format>yyyyMMddHHmmss</maven.build.timestamp.format>
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.11</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.3.2</version>
            <scope>compile</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.3.2</version>
            <scope> compile</scope>
        </dependency>

        <!-- flink-streaming的jar包蚜厉,2.11為scala版本號 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.3.2</version>
            <scope> compile</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.6.1</version>
            <scope> compile</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

四.IDEA打Flink運行jar包

  1. 打開工程Project Structure目錄


    IDEA打jar包1.png
  2. 選擇jar包


    IDEA打jar包2.png

3.選擇主函數


IDEA打jar包3.png

4.選擇依賴

IDEA打jar包4.png

5.build工程


IDEA打jar包5.png

六.window創(chuàng)建kafka topic

控制臺進入kafka安裝目錄的bin\windows目錄

#1.創(chuàng)建一個名為topic001的topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic topic001
#2.控制臺進入生產者producer
kafka-console-producer.bat --broker-list localhost:9092 --topic topic001

七.上傳jar包至flink

上傳jar包至Flink.png
上傳jar包至flink2.png

八.控制臺進入生產者并發(fā)送測試報文

producer發(fā)送測試報文.png

九.Kibana查看ElasticSearch同步報文

查看Flink同步消息至ElasticSearch,在Kibana查詢頁面查看報文


查看ES數據.png
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末碍讯,一起剝皮案震驚了整個濱河市命斧,隨后出現的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖震捣,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異贱勃,居然都是意外死亡刚操,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門伶椿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辜伟,“玉大人,你說我怎么就攤上這事脊另〉冀疲” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵偎痛,是天一觀的道長旱捧。 經常有香客問我,道長踩麦,這世上最難降的妖魔是什么枚赡? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮谓谦,結果婚禮上贫橙,老公的妹妹穿的比我還像新娘。我一直安慰自己反粥,他們只是感情好卢肃,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布谓松。 她就那樣靜靜地躺著,像睡著了一般践剂。 火紅的嫁衣襯著肌膚如雪鬼譬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天逊脯,我揣著相機與錄音优质,去河邊找鬼。 笑死军洼,一個胖子當著我的面吹牛巩螃,可吹牛的內容都是我干的。 我是一名探鬼主播匕争,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼避乏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了甘桑?” 一聲冷哼從身側響起拍皮,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎跑杭,沒想到半個月后铆帽,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡德谅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年爹橱,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片窄做。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡愧驱,死狀恐怖,靈堂內的尸體忽然破棺而出椭盏,到底是詐尸還是另有隱情组砚,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布庸汗,位于F島的核電站惫确,受9級特大地震影響,放射性物質發(fā)生泄漏蚯舱。R本人自食惡果不足惜改化,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望枉昏。 院中可真熱鬧陈肛,春花似錦、人聲如沸兄裂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至谈撒,卻和暖如春腥泥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背啃匿。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工蛔外, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人溯乒。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓夹厌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親裆悄。 傳聞我的和親對象是個殘疾皇子矛纹,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355