ES-Spark連接ES后,ES Client節(jié)點(diǎn)流量打滿分析

問題描述

前段時(shí)間用es-spark讀取es數(shù)遇到了client節(jié)點(diǎn)流量打滿的現(xiàn)象。es-spark配置的es.nodes是es的域名续徽。由于其中一個(gè)client是master節(jié)點(diǎn),然后普通查詢變得特別慢亲澡,運(yùn)行20多分鐘后钦扭,主節(jié)點(diǎn)崩潰。

解決方法

臨時(shí)解決方案:降低es-spark的并發(fā)谷扣,并重啟主節(jié)點(diǎn)土全。

最終解決方案:設(shè)置es.nodes.wan.only為false捎琐,即不用域名訪問会涎。將es.nodes配置為client節(jié)點(diǎn)的IP。

原因分析

域名訪問時(shí)必須配置參數(shù)es.nodes.wan.only為true瑞凑,關(guān)于該參數(shù)的解釋如下:

Whether the connector is used against an Elasticsearch instance in a cloud/restricted environment over the WAN, such as Amazon Web Services. In this mode, the connector disables discovery and onlyconnects through the declared es.nodes during all operations, including reads and writes. Note that in this mode, performance is highly affected.

es.nodes.wan.only設(shè)置為true時(shí)即只通過client節(jié)點(diǎn)進(jìn)行讀取操作末秃,因此主節(jié)點(diǎn)負(fù)載會(huì)特別高,性能很差籽御。長(zhǎng)時(shí)間運(yùn)行后练慕,java gc回收一次要幾十秒惰匙,慢慢的OOM,系統(tǒng)崩潰铃将。

配置es.nodes為client節(jié)點(diǎn)的IP后项鬼,spark只通過data節(jié)點(diǎn)訪問ES:

es.nodes.data.only (default true)
Whether to use Elasticsearch data nodes only. When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the data nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.

es.nodes.data.only 默認(rèn)為true,即spark所有的請(qǐng)求都會(huì)發(fā)到數(shù)據(jù)節(jié)點(diǎn)劲阎,不在通過client節(jié)點(diǎn)進(jìn)行請(qǐng)求的轉(zhuǎn)發(fā)绘盟,client節(jié)點(diǎn)只用來服務(wù)普通的查詢。

源碼角度分析

1悯仙、es-spark 讀

其架構(gòu)圖如下所示:

es_spark_read.png

我們知道spark能動(dòng)態(tài)的發(fā)現(xiàn)節(jié)點(diǎn),龄毡,但當(dāng)我們配置wan.only為true的時(shí)候,整個(gè)集群的節(jié)點(diǎn)IP中只有從域名中解析出來的IP:

private static List<String> qualifyNodes(String nodes, int defaultPort, boolean resolveHostNames)
 {
   List<String> list = StringUtils.tokenize(nodes);
   for (int i = 0; i < list.size(); i++)
   {
     String nodeIp = resolveHostNames ? resolveHostToIpIfNecessary((String)list.get(i)) : (String)list.get(i);
     list.set(i, qualifyNode(nodeIp, defaultPort));
   }
   return list;
 }

從源碼角度以scroll為例:

JavaEsSpark.esJsonRDD()-->JavaEsRDD.compute()-->JavaEsRDDIterator(繼承AbstractEsRDDIterator).reader$lzycompute()
在lzycompute方法中我們可以看到锡垄,執(zhí)行請(qǐng)求的是RestService:

private ScrollQuery reader$lzycompute()
 {
   synchronized (this)
   {
     if (!this.bitmap$0)
     {
       initialized_$eq(true);
       Settings settings = this.partition.settings();

       initReader(settings, log());

       RestService.PartitionReader readr = RestService.createReader(settings, this.partition, log());this.reader =
         readr.scrollQuery();this.bitmap$0 = true;
     }
     return this.reader;
   }
 }

在createReader方法中會(huì)判斷spark節(jié)點(diǎn)和當(dāng)前請(qǐng)求請(qǐng)求的shard是否是同一個(gè)節(jié)點(diǎn)沦零,如果是同一個(gè)節(jié)點(diǎn),則將該IP寫入Setting货岭,用本地節(jié)點(diǎn)IP進(jìn)行請(qǐng)求(執(zhí)行請(qǐng)求的時(shí)候路操,從setting中讀取該ip):

if ((!SettingsUtils.hasPinnedNode(settings)) && (partition.getLocations().length > 0))
{
  String pinAddress = checkLocality(partition.getLocations(), log);
  if (pinAddress != null)
  {
    if (log.isDebugEnabled()) {
      log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]", new Object[] { partition, pinAddress }));
    }
    SettingsUtils.pinNode(settings, pinAddress);
  }
}

通過PartitionReader.scrollQuery()-->SearchRequestBuilder.build()-->RestRepository.scanLimit()-->ScrollQuery.hasNext()-->RestRepository.scroll()-->RestClient.execute()-->NetWorkClient.execute()-->Transport.execute()

其實(shí)我們看到的最終要的執(zhí)行是在NetWorkClient中,他會(huì)打亂所有的數(shù)據(jù)節(jié)點(diǎn)千贯,并從中選出一個(gè)節(jié)點(diǎn)用來通信寻拂,如下:

public NetworkClient(Settings settings, TransportFactory transportFactory)
{
  this.settings = settings.copy();
  this.nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
  this.transportFactory = transportFactory;

  Collections.shuffle(this.nodes);//打亂排序
  if (SettingsUtils.hasPinnedNode(settings))
  {
    String pinnedNode = SettingsUtils.getPinnedNode(settings);
    if (log.isDebugEnabled()) {
      log.debug("Opening (pinned) network client to " + pinnedNode);
    }
    this.nodes.remove(pinnedNode);
    this.nodes.add(0, pinnedNode);
  }
  selectNextNode();

  Assert.notNull(this.currentTransport, "no node information provided");
}


private boolean selectNextNode()
{
  if (this.nextClient >= this.nodes.size()) {
    return false;
  }
  if (this.currentTransport != null) {
    this.stats.nodeRetries += 1;
  }
  closeTransport();
  this.currentNode = ((String)this.nodes.get(this.nextClient++));
  SettingsUtils.pinNode(this.settings, this.currentNode);
  this.currentTransport = this.transportFactory.create(this.settings, this.currentNode);
  return true;
}

2、es-spark 寫

其架構(gòu)圖如下所示:

es_spark_write.png

從源碼角度來看:
寫請(qǐng)求的時(shí)候丈牢,如果wan.only配置為true祭钉,則節(jié)點(diǎn)IP就是從域名解析出的IP中隨機(jī)選擇一個(gè)進(jìn)行寫操作。

if (settings.getNodesWANOnly()) {
  return randomNodeWrite(settings, currentInstance, resource, log);
}

以bulk為例己沛,其操作過程如下:

EsSpark.doSaveToEs()-->EsRDDWriter.write()-->RestService.createWriter()

在createWriter中首先隨機(jī)或者按照split選擇一個(gè)節(jié)點(diǎn):

int selectedNode = currentSplit < 0 ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();
SettingsUtils.pinNode(settings, (String)nodes.get(selectedNode));

最終的改變是在RestService的initSingleIndex方法中慌核,通過根據(jù)當(dāng)前的split,找到對(duì)應(yīng)的shard申尼,然后獲取到shard所在的IP垮卓,寫入setting中(執(zhí)行請(qǐng)求的時(shí)候,從setting中讀取該ip)师幕。

if (currentInstance <= 0) {
   currentInstance = new Random().nextInt(targetShards.size()) + 1;
 }
 int bucket = currentInstance % targetShards.size();
 ShardInfo chosenShard = (ShardInfo)orderedShards.get(bucket);
 NodeInfo targetNode = (NodeInfo)targetShards.get(chosenShard);

 SettingsUtils.pinNode(settings, targetNode.getPublishAddress());
 String node = SettingsUtils.getPinnedNode(settings);
 repository = new RestRepository(settings);

接下來就是RestRepository.tryFlush()-->RestClient.bulk()-->NetWorkClient.execute()-->Transport.execute()粟按,這一套流程和讀差不多,這里就不再介紹霹粥。

3灭将、shard-partition 對(duì)應(yīng)關(guān)系

es-spark寫的話就是就是一個(gè)partition對(duì)應(yīng)一個(gè)shard,這里從上述的es-spark寫代碼中可以看出后控,不再過多介紹庙曙。

es-spark讀的時(shí)候是按照shard的文檔數(shù)來分的:

partition=numberOfDoc(shard)/100000

100000是默認(rèn)的配置,這個(gè)可通過es.input.max.docs.per.partition配置浩淘。

假設(shè)一個(gè)shard有23w條doc捌朴,10w條一個(gè)partition吴攒,則分為3個(gè)partition。讀操作時(shí)shard-partition 的架構(gòu)圖如下所示:

partition_shard.png

從源碼角度來說砂蔽,如果是5.X版本洼怔,則用scrollSlice提高并發(fā)度。

if (version.onOrAfter(EsMajorVersion.V_5_X)) {
  partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards);
} else {
  partitions = findShardPartitions(settings, mapping, nodesMap, shards);
}

在findSlicePartitions中給出了計(jì)算公式:

for (List<Map<String, Object>> group : shards)
{
  String index = null;
  int shardId = -1;
  List<String> locationList = new ArrayList();
  for (Map<String, Object> replica : group)
  {
    ShardInfo shard = new ShardInfo(replica);
    index = shard.getIndex();
    shardId = shard.getName().intValue();
    if (nodes.containsKey(shard.getNode())) {
      locationList.add(((NodeInfo)nodes.get(shard.getNode())).getPublishAddress());
    }
  }
  String[] locations = (String[])locationList.toArray(new String[0]);
  StringBuilder indexAndType = new StringBuilder(index);
  if (StringUtils.hasLength(types))
  {
    indexAndType.append("/");
    indexAndType.append(types);
  }
  long numDocs = client.count(indexAndType.toString(), Integer.toString(shardId), query);
  int numPartitions = (int)Math.max(1L, numDocs / maxDocsPerPartition);
  for (int i = 0; i < numPartitions; i++)
  {
    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
    partitions.add(new PartitionDefinition(settings, mapping, index, shardId, slice, locations));
  }
}

public int getMaxDocsPerPartition()
{
  return Integer.parseInt(getProperty("es.input.max.docs.per.partition", Integer.toString(100000)));
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末左驾,一起剝皮案震驚了整個(gè)濱河市茴厉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌什荣,老刑警劉巖矾缓,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異稻爬,居然都是意外死亡嗜闻,警方通過查閱死者的電腦和手機(jī)墓律,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門妥衣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人割笙,你說我怎么就攤上這事友瘤〈渲猓” “怎么了?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵辫秧,是天一觀的道長(zhǎng)束倍。 經(jīng)常有香客問我,道長(zhǎng)盟戏,這世上最難降的妖魔是什么绪妹? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮柿究,結(jié)果婚禮上邮旷,老公的妹妹穿的比我還像新娘。我一直安慰自己蝇摸,他們只是感情好婶肩,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著貌夕,像睡著了一般律歼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蜂嗽,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天苗膝,我揣著相機(jī)與錄音,去河邊找鬼植旧。 笑死辱揭,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的病附。 我是一名探鬼主播问窃,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼完沪!你這毒婦竟也來了域庇?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤覆积,失蹤者是張志新(化名)和其女友劉穎听皿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宽档,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡尉姨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了吗冤。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片又厉。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖椎瘟,靈堂內(nèi)的尸體忽然破棺而出覆致,到底是詐尸還是另有隱情,我是刑警寧澤肺蔚,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布煌妈,位于F島的核電站,受9級(jí)特大地震影響宣羊,放射性物質(zhì)發(fā)生泄漏声旺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一段只、第九天 我趴在偏房一處隱蔽的房頂上張望腮猖。 院中可真熱鬧,春花似錦赞枕、人聲如沸澈缺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽姐赡。三九已至,卻和暖如春柠掂,著一層夾襖步出監(jiān)牢的瞬間项滑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來泰國(guó)打工涯贞, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枪狂,地道東北人危喉。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像州疾,于是被迫代替她去往敵國(guó)和親辜限。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容