Hadoop允許Elasticsearch在Spark中以兩種方式使用:通過自2.1以來的原生RDD支持课舍,或者通過自2.0以來的Map/Reduce橋接器塌西。從5.0版本開始,elasticsearch-hadoop就支持Spark 2.0筝尾。目前spark支持的數(shù)據(jù)源有:
(1)文件系統(tǒng):LocalFS雨让、HDFS、Hive忿等、text栖忠、parquet、orc贸街、json庵寞、csv
(2)數(shù)據(jù)RDBMS:mysql、oracle薛匪、mssql
(3)NOSQL數(shù)據(jù)庫:HBase捐川、ES、Redis
(4)消息對象:Redis
elasticsearch相對hdfs來說逸尖,容易搭建古沥、并且有可視化kibana支持,非常方便spark的初學(xué)入門娇跟,本文主要講解用elasticsearch-spark的入門岩齿。
一、原生RDD支持
1.1 基礎(chǔ)配置
相關(guān)庫引入:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.13</artifactId>
<version>8.1.3</version>
</dependency>
SparkConf配置苞俘,更多詳細(xì)的請點擊這里或者源碼ConfigurationOptions盹沈。
public static SparkConf getSparkConf() {
SparkConf sparkConf = new SparkConf().setAppName("elasticsearch-spark-demo");
sparkConf.set("es.nodes", "host")
.set("es.port", "xxxxxx")
.set("es.nodes.wan.only", "true")
.set("es.net.http.auth.user", "elxxxxastic")
.set("es.net.http.auth.pass", "xxxx")
.setMaster("local[*]");
return sparkConf;
}
1.2 讀取es數(shù)據(jù)
這里用的是kibana提供的sample data里面的索引kibana_sample_data_ecommerce,也可以替換成自己的索引吃谣。
public static void main(String[] args) {
SparkConf conf = getSparkConf();
try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
JavaPairRDD<String, Map<String, Object>> esRDD =
JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce");
esRDD.collect().forEach(System.out::println);
}
}
esRDD同時也支持query語句esRDD(final JavaSparkContext jsc, final String resource, final String query)乞封,一般對es的查詢都需要根據(jù)時間篩選一下做裙,不過相對于es的官方sdk,并沒有那么友好的api肃晚,只能直接使用原生的dsl語句锚贱。
1.3 寫數(shù)據(jù)
支持序列化對象、json关串,并且能夠使用占位符動態(tài)索引寫入數(shù)據(jù)(使用較少)惋鸥,不過多介紹了。
public static void jsonWrite(){
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
JavaEsSpark.saveJsonToEs(stringRDD, "spark-json");
}
比較常用的讀寫也就這些悍缠,更多可以看下官網(wǎng)相關(guān)介紹。
二耐量、Spark Streaming
spark的實時處理飞蚓,es5.0的時候開始支持,Spark Streaming中的DStream編程接口是RDD廊蜒,我們需要對RDD進(jìn)行處理趴拧,處理起來較為費勁且不美觀。
在spark streaming中山叮,如果我們需要修改流程序的代碼著榴,在修改代碼重新提交任務(wù)時,是不能從checkpoint中恢復(fù)數(shù)據(jù)的(程序就跑不起來)屁倔,是因為spark不認(rèn)識修改后的程序了脑又。
public class EsSparkStreaming extends EsBaseConfig {
public static void main(String[] args) throws StreamingQueryException, TimeoutException {
SparkConf conf = getSparkConf();
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Seconds.apply(1));
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);
JavaEsSparkStreaming.saveToEs(javaDStream, "spark-streaming");
jssc.start();
}
}
這里沒有執(zhí)行awaitTermination,執(zhí)行代碼后沒有卡住锐借,即可在es上查看
三问麸、Spark SQL
elasticsearch-hadoop也提供了spark sql的插件,換言之钞翔,elasticsearch變成了Spark SQL的原生數(shù)據(jù)源严卖,可以通過Spark SQL顯示調(diào)用,下面的例子將kibana_sample_data_ecommerce索引讀取布轿,然后轉(zhuǎn)化成dataset哮笆,在用sql來統(tǒng)計出當(dāng)前貨幣。
public class EsToMysqlDemo extends EsBaseConfig {
public static void main(String[] args) {
SparkConf conf = getSparkConf();
try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
SparkSession sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce").values();
JavaRDD<Row> map = esRDD.map(v -> {
String currency = v.get("currency").toString();
String customerFullName = v.get("customer_full_name").toString();
String productsSku = v.getOrDefault("products", "").toString();
return RowFactory.create(currency, customerFullName, productsSku);
});
Dataset<Row> dataset = sparkSession.createDataFrame(map, StructType.fromDDL("currency string,customer_full_name string,products string"));
dataset.show(2);
Dataset<Row> count = dataset.select("currency").groupBy("currency").count();
count.show(2);
}
}
}
第一個show展示了當(dāng)前的dataset汰扭,第二個show展示group by之后的結(jié)果稠肘。
四、Spark Structure Streaming
Structured Streaming使用DataFrame萝毛、DataSet的編程接口启具,處理數(shù)據(jù)時可以使用Spark SQL中提供的方法,數(shù)據(jù)的轉(zhuǎn)換和輸出會變得更加簡單珊泳。
在structured streaming中鲁冯,對于指定的代碼修改操作拷沸,是不影響修改后從checkpoint中恢復(fù)數(shù)據(jù)的。具體可參見文檔薯演。下面這個例子是從控制臺中讀取數(shù)據(jù)撞芍,然后根據(jù)","切割,把第一個賦值給name跨扮,然后寫入到es的spark-structured-streaming索引中去序无,啟動程序前需要在控制臺執(zhí)行下命令:nc -lk 9999。
@Data
public static class PersonBean {
private String name;
private String surname;
}
public static void main(String[] args) throws StreamingQueryException {
SparkConf sparkConf = getSparkConf();
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();
Dataset<PersonBean> people = lines.as(Encoders.STRING())
.map((MapFunction<String, PersonBean>) value -> {
String[] split = value.split(",");
PersonBean personBean = new PersonBean();
personBean.setName(split[0]);
return personBean;
}, Encoders.bean(PersonBean.class));
StreamingQuery es = people.writeStream().option("checkpointLocation", "./location")
.format("es").start("spark-structured-streaming");
es.awaitTermination();
}
checkpointLocation是用來設(shè)置檢查點衡创,里面會存儲一些commits帝嗡、offsets、sinks璃氢、metadata的信息哟玷。
執(zhí)行完nc -lk 9999后,在控制臺隨便輸入一也,即可在es中查看響應(yīng)的結(jié)果巢寡。
相關(guān)源代碼: