試用場(chǎng)景:TB級(jí)歷史數(shù)據(jù)導(dǎo)入(hdfs2es)
1.添加maven依賴(lài)
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
注:①5.1.1添加elasticsearch-hadoop會(huì)報(bào)錯(cuò)
√肌②es-spark版本與es版本一致
2.編寫(xiě)spark程序
package com.hualala.bi
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
//隱式轉(zhuǎn)換 rdd savetoes
import org.elasticsearch.spark._
object esSparkApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local").setAppName("es-spark-test")
val inputPath = args.apply(0)
val nodes = args.apply(1)
//配置es參數(shù) 包括id routing
conf.set("es.nodes", nodes)
conf.set("es.index.auto.create", "true")
conf.set("es.mapping.id", "id")
conf.set("es.mapping.routing", "rout")
conf.set("es.input.json", "yes")
val sc = new SparkContext(conf)
val dataRdd = sc.textFile(inputPath)
//處理字段 id routing
val billRDD = dataRdd.map(...)
billRDD.saveToEs("{index}/{type}")
sc.stop()
}
}
3.es優(yōu)化設(shè)置
①關(guān)閉動(dòng)態(tài)索引
PUT {index}/{type}/_mapping -d'{"dynamic":false}'
注:id rout 會(huì)保存到source 但是不會(huì)被索引
②優(yōu)化gc算法
默認(rèn)cms 更改為g1 大內(nèi)存cms stop word會(huì)引起節(jié)點(diǎn)脫離
③增加refresh_interval赁温、translog flush size胚嘲、將durability同步改為異步
④加大zen.discover相關(guān)設(shè)置
⑤一次程序不建議寫(xiě)入過(guò)多的索引(100+) 要合理設(shè)計(jì)索引