本文主要介紹 spark SQL 讀寫 ES,參數(shù)的配置以及問題總結。
ES官方提供了對spark的支持,可以直接通過spark讀寫es苏揣,具體可以參考ES Spark Support文檔
以下是pom依賴,具體版本可以根據(jù)自己的es和spark版本進行選擇:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.3.1</version>
</dependency>
Spark SQL to ES
主要提供了兩種讀寫方式:
- 一種是通過DataFrameReader/Writer傳入ES Source實現(xiàn)推姻;
- 另一種是直接讀寫DataFrame實現(xiàn)平匈。
在實現(xiàn)前,還要列一些相關的配置:
參數(shù) | 描述 |
---|---|
es.nodes.wan.only | true or false藏古,在此模式下增炭,連接器禁用發(fā)現(xiàn),并且所有操作通過聲明的es.nodes連接 |
es.nodes | ES節(jié)點 |
es.port | ES端口 |
es.index.auto.create | true or false拧晕,是否自動創(chuàng)建index |
es.resource | 資源路徑 |
es.mapping.id | es會為每個文檔分配一個全局id隙姿。如果不指定此參數(shù)將隨機生成;如果指定的話按指定的來 |
es.batch.size.bytes | es批量API的批量寫入的大蟹乐ⅰ(以字節(jié)為單位) |
es.batch.write.refresh | 批量更新完成后是否調用索引刷新 |
es.read.field.as.array.include | 讀es的時候孟辑,指定將哪些字段作為數(shù)組類型 |
列了一些常用的配置哎甲,更多配置查看ES Spark Configuration文檔
DataFrameReader 讀 ES
import org.elasticsearch.spark.sql._
val options = Map(
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
.read
.format("es")
.options(options)
.load("index1/info")
df.show()
DataFrameWriter 寫 ES
import org.elasticsearch.spark.sql._
val options = Map(
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200",
"es.mapping.id" -> "id"
)
val sourceDF = spark.table("hive_table")
sourceDF
.write
.format("org.elasticsearch.spark.sql")
.options(options)
.mode(SaveMode.Append)
.save("hive_table/docs")
讀DataFrame
jar包中提供了 esDF() 方法可以直接讀es數(shù)據(jù)為DataFrame蔫敲,以下是源碼截圖。
參數(shù)說明:
- resource:資源路徑炭玫,例如index和tpye: hive_table/docs
- cfg:一些es的配置奈嘿,和上面代碼中的options差不多
- query:指定DSL查詢語句來過濾要讀的數(shù)據(jù),例如"?q=user_group_id:3"表示讀user_group_id為3的數(shù)據(jù)
val options = Map(
"pushdown" -> "true",
"es.nodes.wan.only" -> "true",
"es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
"es.port" -> "9200"
)
val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()
寫 DataFrame
jar包中提供了 saveToEs() 方法可以將DataFrame寫入ES吞加,以下是源碼截圖裙犹。
resource:資源路徑,例如index和tpye: hive_table/docs
cfg:一些es的配置衔憨,和上面代碼中的options差不多
示例:
val brandDF = sparkSession.sql(""" SELECT
| categoryname AS id
| , concat_ws(',', collect_set(targetword)) AS targetWords
| , get_utc_time() as `@timestamp`
| FROM t1
| GROUP BY
| categoryname
""".stripMargin)
// 手動指定ES _id值
val map = Map("es.mapping.id" -> "id")
EsSparkSQL.saveToEs(brandDF, "mkt_noresult_brand/mkt_noresult_brand", map)
Spark RDD to ES
SparkRDD方式寫 ES叶圃,以下是源碼截圖。
示例:
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
val rdd = sparkSession.sparkContext.makeRDD(Seq(numbers, airports))
EsSpark.saveToEs(rdd, "mkt_noresult_brand/mkt_noresult_brand", map)
問題總結
手動指定ES _id值
EsSparkSQL.saveToEs 報錯org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[...]]] cannot extract value from entity
原因:"es.mapping.id"參數(shù)指定文檔的id践图,這個參數(shù)必須配置成DataFrame中已有的字段掺冠,不能隨意指定。配置成 val map = Map("es.mapping.id" -> "id")
, 數(shù)據(jù)導入成功码党。