問題描述
前段時(shí)間用es-spark讀取es數(shù)遇到了client節(jié)點(diǎn)流量打滿的現(xiàn)象。es-spark配置的es.nodes是es的域名续徽。由于其中一個(gè)client是master節(jié)點(diǎn),然后普通查詢變得特別慢亲澡,運(yùn)行20多分鐘后钦扭,主節(jié)點(diǎn)崩潰。
解決方法
臨時(shí)解決方案:降低es-spark的并發(fā)谷扣,并重啟主節(jié)點(diǎn)土全。
最終解決方案:設(shè)置es.nodes.wan.only為false捎琐,即不用域名訪問会涎。將es.nodes配置為client節(jié)點(diǎn)的IP。
原因分析
域名訪問時(shí)必須配置參數(shù)es.nodes.wan.only為true瑞凑,關(guān)于該參數(shù)的解釋如下:
Whether the connector is used against an Elasticsearch instance in a cloud/restricted environment over the WAN, such as Amazon Web Services. In this mode, the connector disables discovery and onlyconnects through the declared es.nodes during all operations, including reads and writes. Note that in this mode, performance is highly affected.
es.nodes.wan.only設(shè)置為true時(shí)即只通過client節(jié)點(diǎn)進(jìn)行讀取操作末秃,因此主節(jié)點(diǎn)負(fù)載會(huì)特別高,性能很差籽御。長(zhǎng)時(shí)間運(yùn)行后练慕,java gc回收一次要幾十秒惰匙,慢慢的OOM,系統(tǒng)崩潰铃将。
配置es.nodes為client節(jié)點(diǎn)的IP后项鬼,spark只通過data節(jié)點(diǎn)訪問ES:
es.nodes.data.only (default true)
Whether to use Elasticsearch data nodes only. When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the data nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.
es.nodes.data.only 默認(rèn)為true,即spark所有的請(qǐng)求都會(huì)發(fā)到數(shù)據(jù)節(jié)點(diǎn)劲阎,不在通過client節(jié)點(diǎn)進(jìn)行請(qǐng)求的轉(zhuǎn)發(fā)绘盟,client節(jié)點(diǎn)只用來服務(wù)普通的查詢。
源碼角度分析
1悯仙、es-spark 讀
其架構(gòu)圖如下所示:
我們知道spark能動(dòng)態(tài)的發(fā)現(xiàn)節(jié)點(diǎn),龄毡,但當(dāng)我們配置wan.only為true的時(shí)候,整個(gè)集群的節(jié)點(diǎn)IP中只有從域名中解析出來的IP:
private static List<String> qualifyNodes(String nodes, int defaultPort, boolean resolveHostNames)
{
List<String> list = StringUtils.tokenize(nodes);
for (int i = 0; i < list.size(); i++)
{
String nodeIp = resolveHostNames ? resolveHostToIpIfNecessary((String)list.get(i)) : (String)list.get(i);
list.set(i, qualifyNode(nodeIp, defaultPort));
}
return list;
}
從源碼角度以scroll為例:
JavaEsSpark.esJsonRDD()-->JavaEsRDD.compute()-->JavaEsRDDIterator(繼承AbstractEsRDDIterator).reader$lzycompute()
在lzycompute方法中我們可以看到锡垄,執(zhí)行請(qǐng)求的是RestService:
private ScrollQuery reader$lzycompute()
{
synchronized (this)
{
if (!this.bitmap$0)
{
initialized_$eq(true);
Settings settings = this.partition.settings();
initReader(settings, log());
RestService.PartitionReader readr = RestService.createReader(settings, this.partition, log());this.reader =
readr.scrollQuery();this.bitmap$0 = true;
}
return this.reader;
}
}
在createReader方法中會(huì)判斷spark節(jié)點(diǎn)和當(dāng)前請(qǐng)求請(qǐng)求的shard是否是同一個(gè)節(jié)點(diǎn)沦零,如果是同一個(gè)節(jié)點(diǎn),則將該IP寫入Setting货岭,用本地節(jié)點(diǎn)IP進(jìn)行請(qǐng)求(執(zhí)行請(qǐng)求的時(shí)候路操,從setting中讀取該ip):
if ((!SettingsUtils.hasPinnedNode(settings)) && (partition.getLocations().length > 0))
{
String pinAddress = checkLocality(partition.getLocations(), log);
if (pinAddress != null)
{
if (log.isDebugEnabled()) {
log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]", new Object[] { partition, pinAddress }));
}
SettingsUtils.pinNode(settings, pinAddress);
}
}
通過PartitionReader.scrollQuery()-->SearchRequestBuilder.build()-->RestRepository.scanLimit()-->ScrollQuery.hasNext()-->RestRepository.scroll()-->RestClient.execute()-->NetWorkClient.execute()-->Transport.execute()
其實(shí)我們看到的最終要的執(zhí)行是在NetWorkClient中,他會(huì)打亂所有的數(shù)據(jù)節(jié)點(diǎn)千贯,并從中選出一個(gè)節(jié)點(diǎn)用來通信寻拂,如下:
public NetworkClient(Settings settings, TransportFactory transportFactory)
{
this.settings = settings.copy();
this.nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
this.transportFactory = transportFactory;
Collections.shuffle(this.nodes);//打亂排序
if (SettingsUtils.hasPinnedNode(settings))
{
String pinnedNode = SettingsUtils.getPinnedNode(settings);
if (log.isDebugEnabled()) {
log.debug("Opening (pinned) network client to " + pinnedNode);
}
this.nodes.remove(pinnedNode);
this.nodes.add(0, pinnedNode);
}
selectNextNode();
Assert.notNull(this.currentTransport, "no node information provided");
}
private boolean selectNextNode()
{
if (this.nextClient >= this.nodes.size()) {
return false;
}
if (this.currentTransport != null) {
this.stats.nodeRetries += 1;
}
closeTransport();
this.currentNode = ((String)this.nodes.get(this.nextClient++));
SettingsUtils.pinNode(this.settings, this.currentNode);
this.currentTransport = this.transportFactory.create(this.settings, this.currentNode);
return true;
}
2、es-spark 寫
其架構(gòu)圖如下所示:
從源碼角度來看:
寫請(qǐng)求的時(shí)候丈牢,如果wan.only配置為true祭钉,則節(jié)點(diǎn)IP就是從域名解析出的IP中隨機(jī)選擇一個(gè)進(jìn)行寫操作。
if (settings.getNodesWANOnly()) {
return randomNodeWrite(settings, currentInstance, resource, log);
}
以bulk為例己沛,其操作過程如下:
EsSpark.doSaveToEs()-->EsRDDWriter.write()-->RestService.createWriter()
在createWriter中首先隨機(jī)或者按照split選擇一個(gè)節(jié)點(diǎn):
int selectedNode = currentSplit < 0 ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();
SettingsUtils.pinNode(settings, (String)nodes.get(selectedNode));
最終的改變是在RestService的initSingleIndex方法中慌核,通過根據(jù)當(dāng)前的split,找到對(duì)應(yīng)的shard申尼,然后獲取到shard所在的IP垮卓,寫入setting中(執(zhí)行請(qǐng)求的時(shí)候,從setting中讀取該ip)师幕。
if (currentInstance <= 0) {
currentInstance = new Random().nextInt(targetShards.size()) + 1;
}
int bucket = currentInstance % targetShards.size();
ShardInfo chosenShard = (ShardInfo)orderedShards.get(bucket);
NodeInfo targetNode = (NodeInfo)targetShards.get(chosenShard);
SettingsUtils.pinNode(settings, targetNode.getPublishAddress());
String node = SettingsUtils.getPinnedNode(settings);
repository = new RestRepository(settings);
接下來就是RestRepository.tryFlush()-->RestClient.bulk()-->NetWorkClient.execute()-->Transport.execute()粟按,這一套流程和讀差不多,這里就不再介紹霹粥。
3灭将、shard-partition 對(duì)應(yīng)關(guān)系
es-spark寫的話就是就是一個(gè)partition對(duì)應(yīng)一個(gè)shard,這里從上述的es-spark寫代碼中可以看出后控,不再過多介紹庙曙。
es-spark讀的時(shí)候是按照shard的文檔數(shù)來分的:
partition=numberOfDoc(shard)/100000
100000是默認(rèn)的配置,這個(gè)可通過es.input.max.docs.per.partition配置浩淘。
假設(shè)一個(gè)shard有23w條doc捌朴,10w條一個(gè)partition吴攒,則分為3個(gè)partition。讀操作時(shí)shard-partition 的架構(gòu)圖如下所示:
從源碼角度來說砂蔽,如果是5.X版本洼怔,則用scrollSlice提高并發(fā)度。
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards);
} else {
partitions = findShardPartitions(settings, mapping, nodesMap, shards);
}
在findSlicePartitions中給出了計(jì)算公式:
for (List<Map<String, Object>> group : shards)
{
String index = null;
int shardId = -1;
List<String> locationList = new ArrayList();
for (Map<String, Object> replica : group)
{
ShardInfo shard = new ShardInfo(replica);
index = shard.getIndex();
shardId = shard.getName().intValue();
if (nodes.containsKey(shard.getNode())) {
locationList.add(((NodeInfo)nodes.get(shard.getNode())).getPublishAddress());
}
}
String[] locations = (String[])locationList.toArray(new String[0]);
StringBuilder indexAndType = new StringBuilder(index);
if (StringUtils.hasLength(types))
{
indexAndType.append("/");
indexAndType.append(types);
}
long numDocs = client.count(indexAndType.toString(), Integer.toString(shardId), query);
int numPartitions = (int)Math.max(1L, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++)
{
PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
partitions.add(new PartitionDefinition(settings, mapping, index, shardId, slice, locations));
}
}
public int getMaxDocsPerPartition()
{
return Integer.parseInt(getProperty("es.input.max.docs.per.partition", Integer.toString(100000)));
}