elasticsearch-spark用法

Hadoop允許Elasticsearch在Spark中以兩種方式使用:通過自2.1以來的原生RDD支持课舍,或者通過自2.0以來的Map/Reduce橋接器塌西。從5.0版本開始,elasticsearch-hadoop就支持Spark 2.0筝尾。目前spark支持的數(shù)據(jù)源有:
(1)文件系統(tǒng):LocalFS雨让、HDFS、Hive忿等、text栖忠、parquet、orc贸街、json庵寞、csv
(2)數(shù)據(jù)RDBMS:mysql、oracle薛匪、mssql
(3)NOSQL數(shù)據(jù)庫:HBase捐川、ES、Redis
(4)消息對象:Redis

elasticsearch相對hdfs來說逸尖,容易搭建古沥、并且有可視化kibana支持,非常方便spark的初學(xué)入門娇跟,本文主要講解用elasticsearch-spark的入門岩齿。

image.png

一、原生RDD支持

1.1 基礎(chǔ)配置

相關(guān)庫引入:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.13</artifactId>
            <version>8.1.3</version>
        </dependency>

SparkConf配置苞俘,更多詳細(xì)的請點擊這里或者源碼ConfigurationOptions盹沈。

public static SparkConf getSparkConf() {
    SparkConf sparkConf = new SparkConf().setAppName("elasticsearch-spark-demo");
    sparkConf.set("es.nodes", "host")
            .set("es.port", "xxxxxx")
            .set("es.nodes.wan.only", "true")
            .set("es.net.http.auth.user", "elxxxxastic")
            .set("es.net.http.auth.pass", "xxxx")
            .setMaster("local[*]");
    return sparkConf;
}

1.2 讀取es數(shù)據(jù)

這里用的是kibana提供的sample data里面的索引kibana_sample_data_ecommerce,也可以替換成自己的索引吃谣。

public static void main(String[] args) {
    SparkConf conf = getSparkConf();
    try (JavaSparkContext jsc = new JavaSparkContext(conf)) {

        JavaPairRDD<String, Map<String, Object>> esRDD =
                JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce");
        esRDD.collect().forEach(System.out::println);
    }
}

esRDD同時也支持query語句esRDD(final JavaSparkContext jsc, final String resource, final String query)乞封,一般對es的查詢都需要根據(jù)時間篩選一下做裙,不過相對于es的官方sdk,并沒有那么友好的api肃晚,只能直接使用原生的dsl語句锚贱。

1.3 寫數(shù)據(jù)

支持序列化對象、json关串,并且能夠使用占位符動態(tài)索引寫入數(shù)據(jù)(使用較少)惋鸥,不過多介紹了。

public static void jsonWrite(){
    String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
    String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
    JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
    JavaEsSpark.saveJsonToEs(stringRDD, "spark-json");
}

比較常用的讀寫也就這些悍缠,更多可以看下官網(wǎng)相關(guān)介紹。

二耐量、Spark Streaming

spark的實時處理飞蚓,es5.0的時候開始支持,Spark Streaming中的DStream編程接口是RDD廊蜒,我們需要對RDD進(jìn)行處理趴拧,處理起來較為費勁且不美觀。

在spark streaming中山叮,如果我們需要修改流程序的代碼著榴,在修改代碼重新提交任務(wù)時,是不能從checkpoint中恢復(fù)數(shù)據(jù)的(程序就跑不起來)屁倔,是因為spark不認(rèn)識修改后的程序了脑又。

public class EsSparkStreaming extends EsBaseConfig {
    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkConf conf = getSparkConf();
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Seconds.apply(1));

        Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
        Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

        JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
        Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
        microbatches.add(javaRDD);
        JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

        JavaEsSparkStreaming.saveToEs(javaDStream, "spark-streaming");

        jssc.start();
    }
}

這里沒有執(zhí)行awaitTermination,執(zhí)行代碼后沒有卡住锐借,即可在es上查看

image.png

三问麸、Spark SQL

elasticsearch-hadoop也提供了spark sql的插件,換言之钞翔,elasticsearch變成了Spark SQL的原生數(shù)據(jù)源严卖,可以通過Spark SQL顯示調(diào)用,下面的例子將kibana_sample_data_ecommerce索引讀取布轿,然后轉(zhuǎn)化成dataset哮笆,在用sql來統(tǒng)計出當(dāng)前貨幣。

public class EsToMysqlDemo extends EsBaseConfig {
    public static void main(String[] args) {
        SparkConf conf = getSparkConf();
        try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
            SparkSession sparkSession = SparkSession.builder()
                    .config(conf)
                    .getOrCreate();
            JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce").values();
            JavaRDD<Row> map = esRDD.map(v -> {
                String currency = v.get("currency").toString();
                String customerFullName = v.get("customer_full_name").toString();
                String productsSku = v.getOrDefault("products", "").toString();

                return RowFactory.create(currency, customerFullName, productsSku);
            });
            Dataset<Row> dataset = sparkSession.createDataFrame(map, StructType.fromDDL("currency string,customer_full_name string,products string"));
            dataset.show(2);

            Dataset<Row> count = dataset.select("currency").groupBy("currency").count();
            count.show(2);


        }
    }
}

第一個show展示了當(dāng)前的dataset汰扭,第二個show展示group by之后的結(jié)果稠肘。

image.png

四、Spark Structure Streaming

Structured Streaming使用DataFrame萝毛、DataSet的編程接口启具,處理數(shù)據(jù)時可以使用Spark SQL中提供的方法,數(shù)據(jù)的轉(zhuǎn)換和輸出會變得更加簡單珊泳。

在structured streaming中鲁冯,對于指定的代碼修改操作拷沸,是不影響修改后從checkpoint中恢復(fù)數(shù)據(jù)的。具體可參見文檔薯演。下面這個例子是從控制臺中讀取數(shù)據(jù)撞芍,然后根據(jù)","切割,把第一個賦值給name跨扮,然后寫入到es的spark-structured-streaming索引中去序无,啟動程序前需要在控制臺執(zhí)行下命令:nc -lk 9999。

@Data
public static class PersonBean {
    private String name;
    private String surname;
}

public static void main(String[] args) throws StreamingQueryException {
    SparkConf sparkConf = getSparkConf();
    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();


    Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();

    Dataset<PersonBean> people = lines.as(Encoders.STRING())
            .map((MapFunction<String, PersonBean>) value -> {
                String[] split = value.split(",");
                PersonBean personBean = new PersonBean();
                personBean.setName(split[0]);
                return personBean;
            }, Encoders.bean(PersonBean.class));

    StreamingQuery es = people.writeStream().option("checkpointLocation", "./location")
            .format("es").start("spark-structured-streaming");
    es.awaitTermination();
}

checkpointLocation是用來設(shè)置檢查點衡创,里面會存儲一些commits帝嗡、offsets、sinks璃氢、metadata的信息哟玷。

image.png

執(zhí)行完nc -lk 9999后,在控制臺隨便輸入一也,即可在es中查看響應(yīng)的結(jié)果巢寡。

image.png

相關(guān)源代碼:

spark-java-demo

參考:

1.Apache Spark support

2.elasticsearch-hadoop

3.使用SparkSQL操作Elasticsearch - Spark入門教程

4.Spark——Spark Streaming 對比 Structured Streaming

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市椰苟,隨后出現(xiàn)的幾起案子抑月,更是在濱河造成了極大的恐慌,老刑警劉巖舆蝴,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谦絮,死亡現(xiàn)場離奇詭異,居然都是意外死亡洁仗,警方通過查閱死者的電腦和手機挨稿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來京痢,“玉大人奶甘,你說我怎么就攤上這事〖酪” “怎么了臭家?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長方淤。 經(jīng)常有香客問我钉赁,道長,這世上最難降的妖魔是什么携茂? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任你踩,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘带膜。我一直安慰自己吩谦,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布膝藕。 她就那樣靜靜地躺著式廷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芭挽。 梳的紋絲不亂的頭發(fā)上滑废,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機與錄音袜爪,去河邊找鬼蠕趁。 笑死,一個胖子當(dāng)著我的面吹牛辛馆,可吹牛的內(nèi)容都是我干的俺陋。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼怀各,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了术浪?” 一聲冷哼從身側(cè)響起瓢对,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胰苏,沒想到半個月后硕蛹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡硕并,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年法焰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片倔毙。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡埃仪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出陕赃,到底是詐尸還是另有隱情卵蛉,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布么库,位于F島的核電站傻丝,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏诉儒。R本人自食惡果不足惜葡缰,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧泛释,春花似錦滤愕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至韭畸,卻和暖如春宇智,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背胰丁。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工随橘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人锦庸。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓机蔗,卻偏偏與公主長得像,于是被迫代替她去往敵國和親甘萧。 傳聞我的和親對象是個殘疾皇子萝嘁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容