前言
之前寫(xiě)過(guò)一篇文章吧趣,如何提高ElasticSearch 索引速度。除了對(duì)ES本身的優(yōu)化以外,我現(xiàn)在大體思路是盡量將邏輯外移到Spark上,Spark的分布式計(jì)算能力強(qiáng),cpu密集型的很適合淑掌。這篇文章涉及的調(diào)整也是對(duì)SparkES 多維分析引擎設(shè)計(jì) 中提及的一個(gè)重要概念“shard to partition ,partition to shard ” 的實(shí)現(xiàn)。不過(guò)目前只涉及到構(gòu)建索引那塊蝶念。
問(wèn)題描述
當(dāng)你bulk數(shù)據(jù)到集群抛腕,按照ElasticSearch Bulk 源碼解析所描述的:
接著通過(guò)executeBulk方法進(jìn)入原來(lái)的流程。在該方法中媒殉,對(duì)bulkRequest.requests 進(jìn)行了兩次for循環(huán)担敌。
第一次判定如果是IndexRequest就調(diào)用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段廷蓉。
第二次是為了對(duì)數(shù)據(jù)進(jìn)行分揀柄错。大致是為了形成這么一種結(jié)構(gòu):
第二次就是對(duì)提交的數(shù)據(jù)進(jìn)行分揀,然后根據(jù)route/_id 等值找到每個(gè)數(shù)據(jù)所屬的Shard苦酱,最后將數(shù)據(jù)發(fā)送到對(duì)應(yīng)Shard所在的Node節(jié)點(diǎn)上。
然而這導(dǎo)致了兩個(gè)問(wèn)題:
- ES Node之間會(huì)形成N*N個(gè)連接给猾,消耗掉過(guò)多的bulk線程
- 出現(xiàn)了很多并不需要的網(wǎng)絡(luò)IO
所以我們希望能夠避免這種情況疫萤。
Spark Partition to ES Shard
我們希望能夠?qū)⒎謷倪壿嫹诺絊park端,保證Spark 的Partition 和ES的Shard 一一對(duì)應(yīng)敢伸,并且實(shí)現(xiàn)特定的Partitoner 保證數(shù)據(jù)到達(dá)ES都會(huì)被對(duì)應(yīng)的Shard所在的節(jié)點(diǎn)直接消費(fèi)扯饶,而不會(huì)再被轉(zhuǎn)發(fā)到其他節(jié)點(diǎn)。
經(jīng)過(guò)我的實(shí)際測(cè)試池颈,做了該調(diào)整后尾序,寫(xiě)入QPS有兩倍以上的提升
理論基礎(chǔ)
這里的理論基礎(chǔ)自然是es-hadoop項(xiàng)目。
類(lèi)的調(diào)用路徑關(guān)系為:
EsSpark ->
EsRDDWriter ->
RestService ->
RestRepository ->
RestClient ->
NetworkClient ->
CommonsHttpTransport
簡(jiǎn)單介紹下他們的作用:
- EsSpark, 讀取ES和存儲(chǔ)ES的入口躯砰。通過(guò)隱式轉(zhuǎn)換每币,會(huì)顯得更Spark.
- EsRDDWriter ,調(diào)用RestService創(chuàng)建PartitionWriter,對(duì)ES進(jìn)行數(shù)據(jù)寫(xiě)入
- RestService,負(fù)責(zé)創(chuàng)建 RestRepository琢歇,PartitionWriter
- RestRepository兰怠,bulk高層抽象,底層利用NetworkClient做真實(shí)的http請(qǐng)求李茫,另外也維護(hù)Buffer相關(guān)的揭保,典型比如積攢了多少條,多少M(fèi)之后進(jìn)行flush等魄宏。
- NetworkClient 對(duì) CommonsHttpTransport的封裝秸侣,主要添加了一些節(jié)點(diǎn)校驗(yàn)功能。
- CommonsHttpTransport 你可以認(rèn)為是對(duì)HttpClient的一個(gè)封裝
原來(lái)我以為需要對(duì)es-hadoop項(xiàng)目的源碼進(jìn)行修改才能實(shí)現(xiàn)前面提到的邏輯。事實(shí)上基于es-hadoop很容易實(shí)現(xiàn)上面提到的需求味榛。
我們現(xiàn)在解釋下為什么不需要修改源碼椭坚。
在RestService類(lèi)里,構(gòu)建RestRepository的時(shí)候励负,會(huì)判定是多索引還是單索引藕溅。對(duì)應(yīng)代碼如下:
RestRepository repository = (iformat.hasPattern() ?
initMultiIndices(settings, currentSplit, resource, log) :
initSingleIndex(settings, currentSplit, resource, log));
這里我們只解析單索引部分代碼,在對(duì)應(yīng)的initSingleIndex方法里有如下代碼:
int bucket = currentInstance % targetShards.size();
Shard chosenShard = orderedShards.get(bucket);
Node targetNode = targetShards.get(chosenShard);
先簡(jiǎn)要說(shuō)明下幾個(gè)參數(shù)變量继榆。
- targetShards 是索引所有的主分片到對(duì)應(yīng)Node節(jié)點(diǎn)的映射巾表。
- orderedShards 則是根據(jù)shardId 順序排序Shard集合
- currentInstance 是partitionId
因?yàn)槲覀円呀?jīng)通過(guò)partitioner 將partitionId 轉(zhuǎn)化為shardId,
,也就是partitionId X 里的數(shù)據(jù)略吨,都是屬于shardId 為X集币。 也就是說(shuō)currentInstance == partitionId == shardId。
下面是我們推導(dǎo)出來(lái)的關(guān)系:
- currentInstance < targetShards.size()
- bucket == currentInstance == partitionId == shardId
- targetNode 持有ShardId=currentInstance 的Primary Shard
所以這段代碼實(shí)際完成了partitionId 到 targetNode的映射關(guān)系翠忠。
ESShardPartitioner 實(shí)現(xiàn)
涉及到這塊的主要有 es-hadoop 的mr以及 spark模塊鞠苟。在mr模塊里包含了ES的分片規(guī)則實(shí)現(xiàn)。 spark 模塊則包含ESShardPartitioner類(lèi)秽之。
代碼如下:
package org.elasticsearch.spark
import ....
class ESShardPartitioner(settings:String) extends Partitioner {
protected val log = LogFactory.getLog(this.getClass())
protected var _numPartitions = -1
override def numPartitions: Int = {
val newSettings = new PropertiesSettings().load(settings)
val repository = new RestRepository(newSettings)
val targetShards = repository.getWriteTargetPrimaryShards(newSettings.getNodesClientOnly())
repository.close()
_numPartitions = targetShards.size()
_numPartitions
}
override def getPartition(key: Any): Int = {
val shardId = ShardAlg.shard(key.toString(), _numPartitions)
shardId
}
}
public class ShardAlg {
public static int shard(String id, int shardNum) {
int hash = Murmur3HashFunction.hash(id);
return mod(hash, shardNum);
}
public static int mod(int v, int m) {
int r = v % m;
if (r < 0) {
r += m;
}
return r;
}
}
使用方式如下:
......partitionBy(new ESShardPartitioner(settings)).foreachPartition { iter =>
try {
val newSettings = new PropertiesSettings().load(settings)
//創(chuàng)建EsRDDWriter
val writer = EsRDDCreator.createWriter(newSettings.save())
writer.write(TaskContext.get(), iter.map(f => f._2))
}
不過(guò)這種方式也是有一點(diǎn)問(wèn)題当娱,經(jīng)過(guò)partition 后,Spark Partition Num==ES Primary Shard Num考榨,這樣會(huì)使得Spark寫(xiě)入并發(fā)性會(huì)受到影響跨细。
這個(gè)和Spark Streaming 里KafkaRDD 的partition數(shù)受限于Kafka Partition Num 非常類(lèi)似。我之前也對(duì)這個(gè)做了擴(kuò)展河质,是的多個(gè)Spark Partition 可以映射到同一個(gè)Kafka Partition.
所以這里有第二套方案:
- 修改ESShardPartitioner冀惭,可以讓多個(gè)分區(qū)對(duì)應(yīng)一個(gè)Shard,并且通過(guò)一個(gè)Map維護(hù)這個(gè)關(guān)系
- 每個(gè)分區(qū)通過(guò)EsRDDWriter指定shardId進(jìn)行寫(xiě)入。
第二點(diǎn)可能需要修改es-hadoop源碼了掀鹅,不過(guò)修改也很簡(jiǎn)單散休,通過(guò)settings傳遞shardId,然后在RestService.initSingleIndex添加如下代碼:
if(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID) != null){
targetNode = targetShards.get(orderedShards.get(Integer.parseInt(settings.getProperty(ConfigurationOptions.ES_BULK_SHARDID))));
}
在創(chuàng)建EsRDDWriter時(shí)拷貝settings的副本并且加入對(duì)應(yīng)的ConfigurationOptions.ES_BULK_SHARDID.
使用時(shí)類(lèi)似下面這個(gè)例子:
//val settings = new SparkSettings(conf).save()
.partitionBy(new ESShardPartitioner(settings)).mapPartitionsWithIndex { (partitionIndex, iter) =>
try {
val writer = EsSpark.createEsRDDWriter[Map[String,String]](settings, resource)
//shardToPartitions個(gè) Spark partition 對(duì)應(yīng)一個(gè)ES Shard
val shardId = ESShardPartitioner.shardIdFromPartitionId(partionId, shardToPartitions)
//強(qiáng)制該分片寫(xiě)入到特定的Shard里
val stats = writer.writeToSpecificPrimaryShard(TaskContext.get(), shardId, iter.map(f => f._2))
List(NewStats(stats.bulkTotalTime, stats.docsSent)).iterator
} catch {
這樣可以把一份數(shù)據(jù)切成多分,并發(fā)寫(xiě)入ES的某個(gè)Shard.
總結(jié)
將ES的計(jì)算外移到Spark在這個(gè)場(chǎng)景中還是比較容易的乐尊。下次我還會(huì)專(zhuān)門(mén)寫(xiě)篇文章戚丸,剖析es-hadoop的實(shí)現(xiàn),以及一些關(guān)鍵參數(shù)科吭,尤其是一些類(lèi)的使用昏滴。方便我們對(duì)es-hadoop實(shí)現(xiàn)定制化修改。