背景
之前說(shuō)過(guò)捌议,其實(shí)ES很多功能是用不到的。尤其是mapping有很大調(diào)優(yōu)空間嚼蚀。專家和新手調(diào)配的集群禁灼,性能是截然不同的。但是再怎么調(diào)優(yōu)轿曙,仍然需要走ES的很多流程弄捕,還是滿足不了一些場(chǎng)景。
一直想弄個(gè)簡(jiǎn)化版的ES导帝,砍掉花里胡哨的代碼守谓,壓縮處理鏈與存儲(chǔ)空間,做到極限的全文檢索數(shù)據(jù)庫(kù)您单。斋荞。。
這個(gè)想法要實(shí)現(xiàn)起來(lái)起碼要大改ES源碼了虐秦,不太現(xiàn)實(shí)平酿。
正好有個(gè)項(xiàng)目需求,先嘗試做到快速加載到ES悦陋,起碼50MB/s單節(jié)點(diǎn) 約50萬(wàn)條每秒(100字節(jié))
(想法參考了滴滴公司做的fast-loades 他們跑的是MR任務(wù))
需求
- 只需要核心ES功能 索引數(shù)據(jù)以及常用檢索
- 突破ES正常加載速度 30MB/s 左右
- 數(shù)據(jù)庫(kù) 分區(qū)功能 SQL [待定]
思路
- 繞過(guò)正常加載流程
HTTP流程蜈彼、JSON解析、請(qǐng)求轉(zhuǎn)發(fā)俺驶、mapping檢查幸逆、mvcc控制、translog等
- 走直接生成Lucene文件的方式
Kafka > Avro > Lucene > merge > add Indices > ES shard 目錄的流程
技術(shù)性驗(yàn)證
- 可行性在于ES可以有plugin機(jī)制 能定位到內(nèi)存中的shard對(duì)象 add Indices 做到實(shí)時(shí)導(dǎo)入目錄
- 對(duì)Lucene來(lái)說(shuō)add Indices不需要重新索引文檔(直接拷貝文件)
且這一步操作Lucene保證了是事務(wù)的 要么成功要么不成功
給重試機(jī)制提供了可能 - ES本質(zhì)上是Lucene文件 只要能定位到mapping對(duì)應(yīng)產(chǎn)生的Fields就能拋開ES單獨(dú)直接生產(chǎn)Lucene文件
- 前提是直接生成Lucene文件的速度幾倍于ES的速度上限 否則折騰這個(gè)沒(méi)必要
工程化難點(diǎn)和思路
-
怎么生成ES格式的indexwriter (因?yàn)镋S有很多mapping參數(shù) 映射到Lucene對(duì)象上比較復(fù)雜)
考慮整理一個(gè)工具包 給定ES mapping 生成indexwriter以及add Document
(滴滴是進(jìn)程啟動(dòng)了一個(gè)本地節(jié)點(diǎn)的ES實(shí)例 原生接口入ES暮现,雖然最簡(jiǎn)單 但是不是極致性能) 怎么在本節(jié)點(diǎn)直接生成Lucene添加到本地shard(kafka怎么指定節(jié)點(diǎn)消費(fèi) 且需要?jiǎng)討B(tài)改變)
節(jié)點(diǎn)驅(qū)動(dòng)->不太可能(不清楚會(huì)有哪些日期分區(qū) 表)
數(shù)據(jù)驅(qū)動(dòng)->節(jié)點(diǎn)消費(fèi)kafka生成數(shù)據(jù)后 推送到目標(biāo)節(jié)點(diǎn)还绘?索引周期性生成的話怎么全局控制
spark streaming OR 本地kafka直接消費(fèi)?
spark好處是有全局控制栖袋、可以定制metric統(tǒng)計(jì)拍顷、管理部署簡(jiǎn)單
kafka好處是更加穩(wěn)定高效
初步方案
- 目前只考慮支持常用類型 常用分詞器等
- IndexWriter的獲取可以考慮參考ES測(cè)試類的構(gòu)造方式。塘幅。
- 使用standalone方式的spark streaming(方便控制executor分布)driver維護(hù)產(chǎn)生了哪些索引(負(fù)責(zé)創(chuàng)建)
- spark消費(fèi)kafka生成本地Lucene 不考慮目標(biāo)shard所在
- 實(shí)現(xiàn)ES插件 周期檢查生成目錄 把已關(guān)閉的Lucene(達(dá)到大小或時(shí)間)按策略發(fā)往目標(biāo)節(jié)點(diǎn)的shard 這里應(yīng)該采取本地就近原則
- Lucene添加到shard 移除
TODO
- 這樣直接add Field 實(shí)際上大大減少了ES的功能點(diǎn)昔案。。會(huì)不會(huì)導(dǎo)致添加到ES的數(shù)據(jù)無(wú)法正常使用晌块?
- 例如ES每個(gè)字段根據(jù)mapping的不同會(huì)add 多個(gè)field(indexed stored docvalues sorted ...)
- 這里可能需要對(duì)Lucene底層較了解才行 (或者直接對(duì)比正常ES流程產(chǎn)生的shard文件 解讀)
- 測(cè)試生成導(dǎo)入文件后 常見檢索有無(wú)異常。帅霜。
- 進(jìn)一步優(yōu)化 : Lucene的field能不能更簡(jiǎn)潔了
測(cè)試
-
測(cè)試Lucene生成速度 parquet -> Lucene SY數(shù)據(jù)
[SATA硬盤x10 E5 CPU 40線程] 數(shù)據(jù)30個(gè)字段 平均300字節(jié)(CSV)
測(cè)試場(chǎng)景 | 并發(fā)線程數(shù) | 持續(xù)時(shí)長(zhǎng) | 總寫入條數(shù) | 平均速度 | 字節(jié)速度 | CPU占用 |
---|---|---|---|---|---|---|
單磁盤 | 1 | 120s | 300萬(wàn) | 25000/s | 7.5MB/s | 2% ~ 5% |
單磁盤 | 2 | 200s | 1000萬(wàn) | 50000/s | 15MB/s | 5% ~ 7% |
多磁盤 | 20 | 340s | 1億 | 290000/s | 87MB/s | 30% ~ 65% |
多磁盤 | 40 | 2000s | 8億 | 400000/s | 120MB/s | 80% ~ 100% |
-
測(cè)試生成的Lucene文件添加到ES
1)測(cè)試直接add會(huì)導(dǎo)致查詢錯(cuò)誤匆背。。NPE 原因:缺少很多ES內(nèi)部metadata field以及其他一些功能性field ...
- _field_names field
- _ignored field
- _id field
- _index field
- _meta field
- _routing field
- _source field
- _type field
又如 SeqNoFieldMapper 會(huì)自動(dòng)生成好幾個(gè)Field
- Mapper for the {@code _seq_no} field.
- We expect to use the seq# for sorting, during collision checking and for
- doing range searches. Therefore the {@code _seq_no} field is stored both
- as a numeric doc value and as numeric indexed field.
2)
結(jié)論
繞過(guò)ES本身接口進(jìn)行數(shù)據(jù)加載是可行的身冀。并且能達(dá)到3-4倍于ES的極限加載速度钝尸。
附 1 ES 的常用字段類型最簡(jiǎn)化mapping添加doc 產(chǎn)生的fields 以及屬性 括享。。珍促。
重要屬性設(shè)置
參數(shù) | 含義 | 可選項(xiàng) |
---|---|---|
analyzer | text類型的分詞器 建索引的時(shí)候 | 略 |
doc_values | 列存方式的數(shù)據(jù) 能快速遍歷字段的terms铃辖。用于排序 聚合等。text類型不能設(shè)置 | 默認(rèn)開啟猪叙,可設(shè)置false |
enabled | 設(shè)置字段是否啟用(通常是object字段 在索引過(guò)程可忽略)但是能在source取出 | 略 |
fielddata | 這是給text類型單獨(dú)的選項(xiàng) 用于排序 聚合(針對(duì)text類型不能doc_values)但是需要大量?jī)?nèi)存娇斩,默認(rèn)關(guān)閉 慎用。 | 略 |
format | date類型專用 設(shè)置數(shù)據(jù)解析格式 注意可以是多個(gè) | 略 |
ignore_above | 字符串專用 設(shè)置需要索引文本長(zhǎng)度(影響取詞范圍)但是_source仍可以拿到完整 | 略 |
ignore_malformed | 對(duì)數(shù)值 日期 GEO等類型 設(shè)置容忍數(shù)據(jù)格式錯(cuò)誤 出錯(cuò)時(shí)會(huì)忽略掉該字段的索引 | 略 |
index | 設(shè)置字段是否索引 (即可查 但是聚合等仍開啟穴翩,比enabled稍輕) | 略 |
index_options | text類型專用犬第。設(shè)置索引中存儲(chǔ)的內(nèi)容包含級(jí)別 (詞頻、位置芒帕、偏移)不需要評(píng)分歉嗓、短語(yǔ)檢索、高亮功能可適當(dāng)關(guān)閉 以省磁盤空間 | docs/freqs/positions(默認(rèn))/offsets |
meta | 字段可以擴(kuò)展部分信息 額外存儲(chǔ) | 略 |
fields | 通常用于字符串類型,設(shè)置同一個(gè)字段多種索引方式 如用不同的分詞器都建索引(用于不同功能) | 略 |
normalizer | 控制分詞器的 (分詞分為切詞 過(guò)濾 統(tǒng)一化 等過(guò)程) | 略 |
norms | 評(píng)分相關(guān) 不需要評(píng)分可以關(guān)閉 | 略 |
properties | 嵌套類型的子字段。(object/nested) | 略 |
search_analyzer | 設(shè)置檢索時(shí)使用的分詞器荡含。(這里舉了個(gè)自動(dòng)補(bǔ)全檢索的例子 可以關(guān)注下) | 略 |
similarity | 相關(guān)度的算法 | bm25/classic(指TF/IDF) / boolean |
store | 字段是否在field對(duì)象里存儲(chǔ)奖慌。注意默認(rèn)是不存儲(chǔ)的 因?yàn)橥ǔJ菑腳source 內(nèi)部字段取值 但是有些情況是希望只取部分列 不想拿整個(gè)大的_source | 略 |
term_vector | 默認(rèn)關(guān)閉。設(shè)置字段額外文件結(jié)構(gòu) 存儲(chǔ)文檔中各種詞的信息 如出現(xiàn)總頻次 這里與index_options 似乎有點(diǎn)重復(fù) 個(gè)人理解這里是給其他API分析用的 非檢索用的 | 最大級(jí)別:with_positions_offsets_payloads |
測(cè)試的mapping
StructField(mobileid,ByteType,true)
StructField(nettype,ByteType,true)
StructField(nattype,ByteType,true)
StructField(imsi,LongType,true)
StructField(imei,StringType,true)
StructField(mac,StringType,true)
StructField(account,StringType,true)
StructField(ua,StringType,true)
StructField(accounttype,ByteType,true)
StructField(logintype,ByteType,true)
StructField(linetimetype,ByteType,true)
StructField(srcipv4,StringType,true)
StructField(srcipv6,StringType,true)
StructField(srcport,IntegerType,true)
StructField(dstipv4,StringType,true)
StructField(dstipv6,StringType,true)
StructField(dstport,IntegerType,true)
StructField(protocoltype,IntegerType,true)
StructField(servicetypeid,ShortType,true)
StructField(virtualusername,StringType,true)
StructField(accesstime,TimestampType,true)
StructField(lac,StringType,true)
StructField(ci,StringType,true)
StructField(httprequesttype,IntegerType,true)
StructField(url,StringType,true)
StructField(accountflag,ByteType,true)
StructField(urlflag,ByteType,true)
StructField(priipv4,StringType,true)
StructField(startport,IntegerType,true)
StructField(endport,IntegerType,true)
StructField(url_s,StringType,true)
StructField(fileid,LongType,true)
其中URL分詞 其余全部索引
附 2 ES正常加載速度測(cè)試 腺逛。。。
附 3 核心代碼(即Lucene文件生成)
import org.apache.lucene.document.Field.Store
import org.apache.lucene.document.{IntPoint, LongPoint, StringField, TextField}
import org.apache.lucene.index.IndexableField
import org.apache.spark.sql.types._
trait Converters {
val name:String
def tofield(a:Any):IndexableField
}
object Converters {
//注意把判null前移 順便做 _source 字段的拼接
def toConverters(s:StructType):Array[Converters] = {
s.fields.map(convert).toArray
}
private def convert(f:StructField):Converters = {
//注:數(shù)值類型 Lucene似乎只有int long float double
f.dataType match {
case ByteType =>
new IntConverter(f.name)
case ShortType =>
new IntConverter(f.name)
case IntegerType =>
new IntConverter(f.name)
case LongType =>
new LongConverter(f.name)
case StringType if (f.name.equalsIgnoreCase("url")) =>
new TextConverter(f.name)
case StringType =>
new StringConverter(f.name)
case TimestampType =>
new TimeConverter(f.name)
case o=>
throw new Exception(s"not supported type yet ${f}")
}
}
}
class IntConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[Number].intValue()
new IntPoint(name,v)
}
}
class LongConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[Long]
new LongPoint(name,v)
}
}
class StringConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[String]
new StringField(name,v,Store.NO)
}
}
class TextConverter (fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[String]
new TextField(name,v,Store.NO)
}
}
//暫不清楚es怎么搞 當(dāng)成long處理
class TimeConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[java.sql.Timestamp]
new LongPoint(name,v.getTime)
}
}
def newDoc(r:Row,converters: Array[Converters]):(String,Document) = {
val id = IDCreater.newid()
val doc = new Document
val source = new JSONObject()
/**
val calls = r.toSeq.zip(converters)
calls.foreach{case(v,f)=>
if(v != null){
source.put(f.name,v.toString)
doc.add(f.tofield(v))
}
}
*/
for(i <- (0 until converters.length)){
val v = r.get(i)
val f = converters(i)
if(v != null){
source.put(f.name,v.toString)
doc.add(f.tofield(v))
}
}
val json = source.toJSONString
val seqfield = SequenceIDFields.emptySeqID
doc.add(seqfield.seqNo)
doc.add(seqfield.seqNoDocValue)
doc.add(seqfield.primaryTerm)
doc.add(new NumericDocValuesField("_version",1))
doc.add(new StringField("_id",Uid.encodeId(id),Store.YES))
doc.add(new StoredField("_source",new BytesArray(json).toBytesRef))
(json,doc)
}