Spark SQL讀寫 ES7.x 及問題總結

本文主要介紹 spark SQL 讀寫 ES,參數(shù)的配置以及問題總結。

ES官方提供了對spark的支持,可以直接通過spark讀寫es苏揣,具體可以參考ES Spark Support文檔

以下是pom依賴,具體版本可以根據(jù)自己的es和spark版本進行選擇:

      <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.11</artifactId>
        <version>7.3.1</version>
      </dependency>

Spark SQL to ES

主要提供了兩種讀寫方式:

  • 一種是通過DataFrameReader/Writer傳入ES Source實現(xiàn)推姻;
  • 另一種是直接讀寫DataFrame實現(xiàn)平匈。

在實現(xiàn)前,還要列一些相關的配置:

參數(shù) 描述
es.nodes.wan.only true or false藏古,在此模式下增炭,連接器禁用發(fā)現(xiàn),并且所有操作通過聲明的es.nodes連接
es.nodes ES節(jié)點
es.port ES端口
es.index.auto.create true or false拧晕,是否自動創(chuàng)建index
es.resource 資源路徑
es.mapping.id es會為每個文檔分配一個全局id隙姿。如果不指定此參數(shù)將隨機生成;如果指定的話按指定的來
es.batch.size.bytes es批量API的批量寫入的大蟹乐ⅰ(以字節(jié)為單位)
es.batch.write.refresh 批量更新完成后是否調用索引刷新
es.read.field.as.array.include 讀es的時候孟辑,指定將哪些字段作為數(shù)組類型

列了一些常用的配置哎甲,更多配置查看ES Spark Configuration文檔

DataFrameReader 讀 ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "arr1, arr2"
)
val df = spark
    .read
    .format("es")
    .options(options)
    .load("index1/info")
df.show()

DataFrameWriter 寫 ES

import org.elasticsearch.spark.sql._
val options = Map(
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200",
  "es.mapping.id" -> "id"
)

val sourceDF = spark.table("hive_table")
sourceDF
  .write
  .format("org.elasticsearch.spark.sql")
  .options(options)
  .mode(SaveMode.Append)
  .save("hive_table/docs")

讀DataFrame

jar包中提供了 esDF() 方法可以直接讀es數(shù)據(jù)為DataFrame蔫敲,以下是源碼截圖。


參數(shù)說明:

  • resource:資源路徑炭玫,例如index和tpye: hive_table/docs
  • cfg:一些es的配置奈嘿,和上面代碼中的options差不多
  • query:指定DSL查詢語句來過濾要讀的數(shù)據(jù),例如"?q=user_group_id:3"表示讀user_group_id為3的數(shù)據(jù)
val options = Map(
  "pushdown" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.nodes" -> "29.29.29.29:10008,29.29.29.29:10009",
  "es.port" -> "9200"
)

val df = spark.esDF("hive_table/docs", "?q=user_group_id:3", options)
df.show()

寫 DataFrame

jar包中提供了 saveToEs() 方法可以將DataFrame寫入ES吞加,以下是源碼截圖裙犹。

resource:資源路徑,例如index和tpye: hive_table/docs
cfg:一些es的配置衔憨,和上面代碼中的options差不多

示例:

val brandDF = sparkSession.sql(""" SELECT
              |   categoryname AS id
              | , concat_ws(',', collect_set(targetword)) AS targetWords
              | , get_utc_time() as `@timestamp`
              | FROM  t1
              | GROUP BY
              | categoryname
              """.stripMargin)

 // 手動指定ES _id值
 val map = Map("es.mapping.id" -> "id")
 EsSparkSQL.saveToEs(brandDF, "mkt_noresult_brand/mkt_noresult_brand", map)

Spark RDD to ES

SparkRDD方式寫 ES叶圃,以下是源碼截圖。


示例:

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
    val rdd = sparkSession.sparkContext.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(rdd, "mkt_noresult_brand/mkt_noresult_brand", map)

問題總結

手動指定ES _id值

EsSparkSQL.saveToEs 報錯org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[...]]] cannot extract value from entity

原因:"es.mapping.id"參數(shù)指定文檔的id践图,這個參數(shù)必須配置成DataFrame中已有的字段掺冠,不能隨意指定。配置成 val map = Map("es.mapping.id" -> "id"), 數(shù)據(jù)導入成功码党。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末德崭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子揖盘,更是在濱河造成了極大的恐慌眉厨,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兽狭,死亡現(xiàn)場離奇詭異憾股,居然都是意外死亡鹿蜀,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門服球,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耻姥,“玉大人,你說我怎么就攤上這事有咨∷龃兀” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵座享,是天一觀的道長婉商。 經(jīng)常有香客問我,道長渣叛,這世上最難降的妖魔是什么丈秩? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮淳衙,結果婚禮上蘑秽,老公的妹妹穿的比我還像新娘。我一直安慰自己箫攀,他們只是感情好肠牲,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著靴跛,像睡著了一般缀雳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梢睛,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天肥印,我揣著相機與錄音,去河邊找鬼绝葡。 笑死深碱,一個胖子當著我的面吹牛,可吹牛的內容都是我干的藏畅。 我是一名探鬼主播敷硅,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼墓赴!你這毒婦竟也來了竞膳?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤诫硕,失蹤者是張志新(化名)和其女友劉穎坦辟,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體章办,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡锉走,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年滨彻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挪蹭。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡亭饵,死狀恐怖,靈堂內的尸體忽然破棺而出梁厉,到底是詐尸還是另有隱情辜羊,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布词顾,位于F島的核電站八秃,受9級特大地震影響,放射性物質發(fā)生泄漏肉盹。R本人自食惡果不足惜昔驱,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望上忍。 院中可真熱鬧骤肛,春花似錦、人聲如沸窍蓝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽它抱。三九已至秕豫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間观蓄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工祠墅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留侮穿,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓毁嗦,卻偏偏與公主長得像亲茅,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子狗准,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內容