對于同一次查詢查詢請求躯护,ES會在選擇某個分片副本進行查詢。
例如:如圖1所示丽涩,索引twitter中有3個主分片棺滞,每個分片有2個副本,共9個分片矢渊,一次搜索請求會由3個分片來完成继准,他們可能是主分片也可能是副本分片。即一次搜索請求只會命中所有分片副本中的一個矮男。
主分片和副本分片中的數(shù)據(jù)理論上是完全一致的移必,并且一次查詢只會使用一個副本,所以增加副本數(shù)不會因為并行查詢而使搜索變快毡鉴。但是在某些場景下多個副本下崔泵,可能會選擇出一個當前集群狀態(tài)下能快速響應(yīng)的副本,從而使搜索快速響應(yīng)猪瞬。
副本選擇
副本選擇的方式有如下幾種:
- 根據(jù)偏好參數(shù)指定分片副本
- 感知副本選擇
- 自適應(yīng)副本選擇
- 輪詢分片的方式選擇副本(默認)
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
DiscoveryNodes nodes, @Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts) {
if (preference == null || preference.isEmpty()) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
return indexShard.activeInitializingShardsRandomIt();
}
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
// starts with _shards, so execute on specific ones
int index = preference.indexOf('|');
String shards;
if (index == -1) {
shards = preference.substring(Preference.SHARDS.type().length() + 1);
} else {
shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
}
String[] ids = Strings.splitStringByCommaToArray(shards);
boolean found = false;
for (String id : ids) {
if (Integer.parseInt(id) == indexShard.shardId().id()) {
found = true;
break;
}
}
if (!found) {
return null;
}
// no more preference
if (index == -1 || index == preference.length() - 1) {
if (awarenessAttributes.isEmpty()) {
if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
return indexShard.activeInitializingShardsRandomIt();
}
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
} else {
// update the preference and continue
preference = preference.substring(index + 1);
}
}
preferenceType = Preference.parse(preference);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds =
Arrays.stream(
preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
).collect(Collectors.toSet());
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case PRIMARY:
deprecationLogger.deprecated("[_primary] has been deprecated in 6.1+, and will be removed in 7.0; " +
"use [_only_nodes] or [_prefer_nodes]");
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
deprecationLogger.deprecated("[_replica] has been deprecated in 6.1+, and will be removed in 7.0; " +
"use [_only_nodes] or [_prefer_nodes]");
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
deprecationLogger.deprecated("[_primary_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
"use [_only_nodes] or [_prefer_nodes]");
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
deprecationLogger.deprecated("[_replica_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
"use [_only_nodes] or [_prefer_nodes]");
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
default:
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
}
}
// if not, then use it as the index
int routingHash = Murmur3HashFunction.hash(preference);
if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
// The AllocationService lists shards in a fixed order based on nodes
// so earlier versions of this class would have a tendency to
// select the same node across different shardIds.
// Better overall balancing can be achieved if each shardId opts
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
}
if (awarenessAttributes.isEmpty()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
}
}
根據(jù)preference參數(shù)選擇副本
偏好 | 說明 |
---|---|
_primary | 發(fā)送到集群的相關(guān)操作請求只會在主分片上執(zhí)行憎瘸。 |
_primary_first | 指查詢會先在主分片中查詢,如果主分片找不到(掛了)撑螺,就會在副本中查詢含思。 |
_replica | 發(fā)送到集群的相關(guān)操作請求只會在副本上執(zhí)行。 |
_replica_first | 指查詢會先在副本中查詢甘晤,如果副本找不到(掛了),就會在主分片中查詢饲做。 |
_local | 指查詢操作會優(yōu)先在本地節(jié)點有的分片中查詢线婚,沒有的話再在其它節(jié)點查詢。 |
_only_local | 盡在本地節(jié)點上的分片上執(zhí)行查詢盆均。 |
_prefer_nodes:abc,xyz | 在提供的節(jié)點上優(yōu)先執(zhí)行(在這種情況下為'abc'或'xyz') |
_shards:2,3 | 限制操作到指定的分片塞弊。 (2 和3 )。這個偏好可以與其他偏好組合泪姨,但必須首先出現(xiàn)_shards:2,3 | _primary 游沿。 |
_only_nodes:node1,node2 | 指在指定id的節(jié)點里面進行查詢,如果該節(jié)點只有要查詢索引的部分分片肮砾,就只在這部分分片中查找诀黍,不同節(jié)點之間用“,”分隔仗处。 |
custom(自定義) | 任何不以_開頭的值眯勾。如果兩個搜索都為其首選項提供相同的自定義字符串值枣宫,并且基礎(chǔ)集群狀態(tài)不會更改,則將使用相同的分片順序進行搜索吃环。這并不能保證每次都使用完全相同的分片:群集狀態(tài)以及所選分片可能會因包括分片重定位和分片失敗在內(nèi)的多種原因而發(fā)生變化也颤,并且節(jié)點有時可能會拒絕導致備用節(jié)點回退的搜索。然而郁轻,在實踐中翅娶,碎片的排序趨于長時間保持穩(wěn)定。自定義首選項值的良好候選者類似于Web會話ID或用戶名好唯。 |
注意
- custom preference參數(shù)不能以下劃線"_"開頭故觅。custom preference的作用是保證搜索到結(jié)果是按照相同的順序呈現(xiàn)的。
- _only_local首選項僅保證在本地節(jié)點上使用分片副本渠啊,這有時對故障排除很有用输吏。 所有其他選項不能完成保證在搜索中使用任何特定的分片副本,并且在變化的索引上替蛉,這可能意味著在不同的刷新狀態(tài)的不同分片副本上執(zhí)行重復搜索贯溅,則重復搜索可能產(chǎn)生不同的結(jié)果。
- _primary躲查,_primary_first它浅,_replica和_replica_first已棄用,因為不建議使用它們镣煮。
它們無助于避免因使用具有不同刷新狀態(tài)的分片而產(chǎn)生的不一致結(jié)果姐霍,并且Elasticsearch使用同步復制,因此主數(shù)據(jù)通常不包含比其副本更新的數(shù)據(jù)典唇。
如果無法搜索首選副本镊折,_primary_first和_replica_first首選項將無聲地回退到非首選副本。
如果將副本提升為主副本介衔,則_primary和_replica首選項將以靜默方式更改其首選分片恨胚,這可能隨時發(fā)生。
_primary首選項會在主分片上添加不必要的額外負載炎咖。
也可以使用_only_nodes赃泡,_prefer_nodes或自定義字符串值來獲取這些選項的緩存相關(guān)優(yōu)勢。
機架感知特性
如果在一個物理機上運行多個虛擬機乘盼,并且在多個虛擬機中運行了多個es節(jié)點升熊,或者在多個機架上,多個機房绸栅,都有可能有多個es節(jié)點在相同的物理機上级野,或者在相同的機架上,或者在相同的機房里阴幌,那么這些節(jié)點就可能會因為物理機勺阐,機架卷中,機房的問題,一起崩潰渊抽。
如果es可以感知到硬件的物理布局蟆豫,就可以確保說,priamry shard和replica shard一定是分配到不同的物理機懒闷,或者物理機架十减,或者不同的機房,這樣可以最小化物理機愤估,機架帮辟,機房崩潰的風險。
shard allocation awareness可以告訴es我們的硬件架構(gòu)
舉個栗子玩焰,如果我們有多個機架由驹,那么我們啟動一個node的時候,就要告訴這個node它在哪個機架上昔园,可以給它一個rack_id蔓榄,比如下面的命令:./bin/elasticsearch -Enode.attr.rack_id=rack_one,也可以在elasticsearch.yml中設(shè)置這個機架id
cluster.routing.allocation.awareness.attributes: rack_id
node.attr.rack_id=rack_one
上面的兩行設(shè)置里默刚,第一行是設(shè)置機架id的屬性名稱甥郑,第二行是用那個機架id屬性名稱設(shè)置具體的機架id
如果啟動兩個node,都在一個機架上荤西,此時創(chuàng)建一個有5個primary shard和5個replica shard的索引澜搅,此時shards會被分配到兩個節(jié)點上
如果再啟動兩個node,設(shè)置為另外一個機架邪锌,此時es會將shard移動到新的node上勉躺,去確保說,不會讓primary shard和其replica shard在同一個機架上秃流。但是如果機架2故障了赂蕴,為了恢復集群,那么還是會在恢復的時候舶胀,將shards全部在機架1上分配的
prefer local shard機制:在執(zhí)行search或者get請求的時候,如果啟用了shard awareness特性碧注,那么es會盡量使用local shard來執(zhí)行請求嚣伐,也就是在同一個awareness group中的shard來執(zhí)行請求,也就是說盡量用一個機架或者一個機房中的shard來執(zhí)行請求萍丐,而不要跨機架或者跨機房來執(zhí)行請求
可以指定多個awareness屬性轩端,比如機架id和機房名稱,類似下面:
cluster.routing.allocation.awareness.attributes: rack_id,zone
強制性的感知
如果現(xiàn)在我們有兩個機房逝变,并且有足夠的硬件資源來容納所有的shard基茵,但是可能每個機房的硬件只能容納一半shard奋构,不能容納所有的shard。如果僅僅使用原始的感知特性拱层,如果一個機房故障了弥臼,那么es會將需要恢復的shard全部分配給剩下的一個機房,但是剩下的那個機房的硬件資源并不足以容納所有的shard根灯。
強制感知特性會解決這個問題径缅,因為這個特性會絕對不允許在一個機房內(nèi)分配所有的shard
比如說,有一個感知屬性叫做zone烙肺,有兩個機房纳猪,zone1和zone2,看看下面的配置:
cluster.routing.allocation.awareness.attributes: zone
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2
[圖片上傳失敗...(image-a61d7e-1562841058405)]
那么此時如果將2個node分配給zone1機房桃笙,然后創(chuàng)建一個索引氏堤,5個primary shard和5個replica shard,但是此時只會在zone1機房分配5個primary shard搏明,只有我們啟動一批node在zone2機房鼠锈,才會分配replica shard
配置了機架后,副本選擇默認是優(yōu)先在本地的機架上來查找的熏瞄。
final ArrayList<ShardRouting> to = new ArrayList<>();
for (final String attribute : key.attributes) {
final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
if (localAttributeValue != null) {
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
if (discoveryNode == null) {
iterator.remove(); // node is not present anymore - ignore shard
} else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}
}
return Collections.unmodifiableList(to);
}
默認副本選擇方式
在7.0之前的版本脚祟,搜索時,初始使用一個隨機值强饮,接下來的請求輪詢每一個分片由桌。
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRandomIt() {
return activeInitializingShardsIt(shuffler.nextSeed());
}
@Override
public int nextSeed() {
return seed.getAndIncrement();
}
@Override
public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
return CollectionUtils.rotate(shards, seed);
}
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsIt(int seed) {
// 保證初始化好的排在正在初始化中的前面
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
ordered.addAll(shuffler.shuffle(activeShards, seed));
ordered.addAll(allInitializingShards);
return new PlainShardIterator(shardId, ordered);
}
/**
* Return a rotated view of the given list with the given distance.
*/
// 根據(jù)seed的自增和哈希來保證不同的分片被輪詢到
public static <T> List<T> rotate(final List<T> list, int distance) {
if (list.isEmpty()) {
return list;
}
int d = distance % list.size();
if (d < 0) {
d += list.size();
}
if (d == 0) {
return list;
}
return new RotatedList<>(list, d);
}
假設(shè)現(xiàn)在有三個分片,請求響應(yīng)的時延如下:
- shard copy 1: 100ms
- shard copy 2 (degraded): 1350ms
- shard copy 3: 150ms
長時間得垃圾回收邮丰、高磁盤IO行您、網(wǎng)絡(luò)帶寬滿、節(jié)點硬件異構(gòu)等情況下很容易出現(xiàn)某個節(jié)點性能惡化剪廉,例如shard copy 2娃循。
發(fā)送到shard copy 2上的請求響應(yīng)會變慢,為了使副本選擇更加智能斗蒋,ES的工程師開發(fā)了ARS的功能捌斧。
自適應(yīng)副本選擇
Our ARS implementation is based on a formula where, for each search request,
Elasticsearch ranks each copy of the shard to determine which is likeliest to be the "best"
copy to send the request to. Instead of sending requests in a round-robin fashion to each
copy of the shard, Elasticsearch selects the "best" copy and routes the request there.
The ARS formula initially seems complex, but let's break it down:
Ψ(s) = R(s) - 1/μ?(s) + (q?(s))^3 / μ?(s)
Where q?(s) is:
q?(s) = 1 + (os(s) * n) + q(s)
private double innerRank(long outstandingRequests) {
// the concurrency compensation is defined as the number of
// outstanding requests from the client to the node times the number
// of clients in the system
double concurrencyCompensation = outstandingRequests * clientNum;
// Cubic queue adjustment factor. The paper chose 3 though we could
// potentially make this configurable if desired.
int queueAdjustmentFactor = 3;
// EWMA of queue size
double qBar = queueSize;
double qHatS = 1 + concurrencyCompensation + qBar;
// EWMA of response time
double rS = responseTime / FACTOR;
// EWMA of service time
double muBarS = serviceTime / FACTOR;
// The final formula
double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
return rank;
}
And looking at the individual pieces:
- os(s) ,節(jié)點未完成的搜索請求數(shù)。
- n 泉沾,系統(tǒng)中節(jié)點的數(shù)量捞蚂。
- R(s), 從協(xié)調(diào)節(jié)點上得到的響應(yīng)時間的加權(quán)平均值跷究,單位毫秒姓迅。
- q(s) ,搜索隊列中等待任務(wù)的加權(quán)平均值。
- μ?(s) 丁存,數(shù)據(jù)節(jié)點上搜索時間得加權(quán)平均值肩杈。
private static class NodeStatistics {
final String nodeId;
final ExponentiallyWeightedMovingAverage queueSize;
final ExponentiallyWeightedMovingAverage responseTime;
double serviceTime;
NodeStatistics(String nodeId,
ExponentiallyWeightedMovingAverage queueSizeEWMA,
ExponentiallyWeightedMovingAverage responseTimeEWMA,
double serviceTimeEWMA) {
this.nodeId = nodeId;
this.queueSize = queueSizeEWMA;
this.responseTime = responseTimeEWMA;
this.serviceTime = serviceTimeEWMA;
}
}
"adaptive_selection": {
"5BN2QxfZQ3yzotzQhUXzlg": {
"outgoing_searches": 0,
"avg_queue_size": 0,
"avg_service_time_ns": 2976073,
"avg_response_time_ns": 3396261,
"rank": "3.4"
},
"eAznL5r5RreHLIU16XpczA": {
"outgoing_searches": 0,
"avg_queue_size": 0,
"avg_service_time_ns": 8884750,
"avg_response_time_ns": 15520622,
"rank": "15.5"
}
}
有上圖可以看出,即使集群處于沒有負載的情況下解寝,ARS仍然有利于增加吞吐和減少時延扩然。
在某個數(shù)據(jù)節(jié)點處于高負載的情況下,吞吐有了很大的提高编丘,延遲中位數(shù)有所增加与学, 這是為了繞開高負載的節(jié)點,增加了壓力較低的節(jié)點的負載嘉抓,從而增加了延遲索守,
private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
final Map<String, Long> nodeSearchCounts) {
if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
return shards;
}
// Retrieve which nodes we can potentially send the query to
final Set<String> nodeIds = getAllNodeIds(shards);
final int nodeCount = nodeIds.size();
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);
// Retrieve all the nodes the shards exist on
final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);
// sort all shards based on the shard rank
ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));
// adjust the non-winner nodes' stats so they will get a chance to receive queries
if (sortedShards.size() > 1) {
ShardRouting minShard = sortedShards.get(0);
// If the winning shard is not started we are ranking initializing
// shards, don't bother to do adjustments
if (minShard.started()) {
String minNodeId = minShard.currentNodeId();
Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
if (maybeMinStats.isPresent()) {
adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
// Increase the number of searches for the "winning" node by one.
// Note that this doesn't actually affect the "real" counts, instead
// it only affects the captured node search counts, which is
// captured once for each query in TransportSearchAction
nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}
}
}
return sortedShards;
}
eg. 防止某些節(jié)點一直不處理請求,會在每次選擇完節(jié)點后抑片,對選出的節(jié)點的計數(shù)+1卵佛,并且調(diào)整沒有選中的節(jié)點。
/**
* Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
* Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
* Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
* then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
* requests.
*
* This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
* the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
* non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
*/
private static void adjustStats(final ResponseCollectorService collector,
final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
final String minNodeId,
final ResponseCollectorService.ComputedNodeStats minStats) {
if (minNodeId != null) {
for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
final String nodeId = entry.getKey();
final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
}
}
}
}
https://elasticsearch.cn/article/334
https://juejin.im/post/5b83b1d5e51d4538da22ef50
https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/45_README.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/allocation-awareness.html
http://zh1cheung.com/zhi1cheung.github.io/elk/2018/10/02/elk/
https://pdfs.semanticscholar.org/99c7/f437d672abf56fdc9438c0c46a7ef716e8c7.pdf
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-preference.html
https://jobs.zalando.com/tech/blog/a-closer-look-at-elasticsearch-express/?gh_src=4n3gxh1
https://www.elastic.co/blog/improving-response-latency-in-elasticsearch-with-adaptive-replica-selection
https://www.elastic.co/guide/cn/elasticsearch/guide/current/_search_options.html