Elasticsearch 查詢分片選擇





  • 根據(jù)偏好參數(shù)指定分片副本
  • 感知副本選擇
  • 自適應(yīng)副本選擇
  • 輪詢分片的方式選擇副本(默認
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
                                                        DiscoveryNodes nodes, @Nullable String preference,
                                                        @Nullable ResponseCollectorService collectorService,
                                                        @Nullable Map<String, Long> nodeCounts) {
        if (preference == null || preference.isEmpty()) {
            if (awarenessAttributes.isEmpty()) {
                if (useAdaptiveReplicaSelection) {
                    return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                } else {
                    return indexShard.activeInitializingShardsRandomIt();
            } else {
                return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
        if (preference.charAt(0) == '_') {
            Preference preferenceType = Preference.parse(preference);
            if (preferenceType == Preference.SHARDS) {
                // starts with _shards, so execute on specific ones
                int index = preference.indexOf('|');

                String shards;
                if (index == -1) {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1);
                } else {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
                String[] ids = Strings.splitStringByCommaToArray(shards);
                boolean found = false;
                for (String id : ids) {
                    if (Integer.parseInt(id) == indexShard.shardId().id()) {
                        found = true;
                if (!found) {
                    return null;
                // no more preference
                if (index == -1 || index == preference.length() - 1) {
                    if (awarenessAttributes.isEmpty()) {
                        if (useAdaptiveReplicaSelection) {
                            return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                        } else {
                            return indexShard.activeInitializingShardsRandomIt();
                    } else {
                        return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
                } else {
                    // update the preference and continue
                    preference = preference.substring(index + 1);
            preferenceType = Preference.parse(preference);
            switch (preferenceType) {
                case PREFER_NODES:
                    final Set<String> nodesIds =
                                    preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
                    return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
                case LOCAL:
                    return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
                case PRIMARY:
                    deprecationLogger.deprecated("[_primary] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryActiveInitializingShardIt();
                case REPLICA:
                    deprecationLogger.deprecated("[_replica] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaActiveInitializingShardIt();
                case PRIMARY_FIRST:
                    deprecationLogger.deprecated("[_primary_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryFirstActiveInitializingShardsIt();
                case REPLICA_FIRST:
                    deprecationLogger.deprecated("[_replica_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaFirstActiveInitializingShardsIt();
                case ONLY_LOCAL:
                    return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
                case ONLY_NODES:
                    String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
                    return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
                    throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
        // if not, then use it as the index
        int routingHash = Murmur3HashFunction.hash(preference);
        if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
            // The AllocationService lists shards in a fixed order based on nodes
            // so earlier versions of this class would have a tendency to
            // select the same node across different shardIds.
            // Better overall balancing can be achieved if each shardId opts
            // for a different element in the list by also incorporating the
            // shard ID into the hash of the user-supplied preference key.
            routingHash = 31 * routingHash + indexShard.shardId.hashCode();
        if (awarenessAttributes.isEmpty()) {
            return indexShard.activeInitializingShardsIt(routingHash);
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);


偏好 說明
_primary 發(fā)送到集群的相關(guān)操作請求只會在主分片上執(zhí)行憎瘸。
_primary_first 指查詢會先在主分片中查詢,如果主分片找不到(掛了)撑螺,就會在副本中查詢含思。
_replica 發(fā)送到集群的相關(guān)操作請求只會在副本上執(zhí)行。
_replica_first 指查詢會先在副本中查詢甘晤,如果副本找不到(掛了),就會在主分片中查詢饲做。
_local 指查詢操作會優(yōu)先在本地節(jié)點有的分片中查詢线婚,沒有的話再在其它節(jié)點查詢。
_only_local 盡在本地節(jié)點上的分片上執(zhí)行查詢盆均。
_prefer_nodes:abc,xyz 在提供的節(jié)點上優(yōu)先執(zhí)行(在這種情況下為'abc'或'xyz')
_shards:2,3 限制操作到指定的分片塞弊。 (23)。這個偏好可以與其他偏好組合泪姨,但必須首先出現(xiàn)_shards:2,3 | _primary游沿。
_only_nodes:node1,node2 指在指定id的節(jié)點里面進行查詢,如果該節(jié)點只有要查詢索引的部分分片肮砾,就只在這部分分片中查找诀黍,不同節(jié)點之間用“,”分隔仗处。
custom(自定義) 任何不以_開頭的值眯勾。如果兩個搜索都為其首選項提供相同的自定義字符串值枣宫,并且基礎(chǔ)集群狀態(tài)不會更改,則將使用相同的分片順序進行搜索吃环。這并不能保證每次都使用完全相同的分片:群集狀態(tài)以及所選分片可能會因包括分片重定位和分片失敗在內(nèi)的多種原因而發(fā)生變化也颤,并且節(jié)點有時可能會拒絕導致備用節(jié)點回退的搜索。然而郁轻,在實踐中翅娶,碎片的排序趨于長時間保持穩(wěn)定。自定義首選項值的良好候選者類似于Web會話ID或用戶名好唯。


  • custom preference參數(shù)不能以下劃線"_"開頭故觅。custom preference的作用是保證搜索到結(jié)果是按照相同的順序呈現(xiàn)的。
  • _only_local首選項僅保證在本地節(jié)點上使用分片副本渠啊,這有時對故障排除很有用输吏。 所有其他選項不能完成保證在搜索中使用任何特定的分片副本,并且在變化的索引上替蛉,這可能意味著在不同的刷新狀態(tài)的不同分片副本上執(zhí)行重復搜索贯溅,則重復搜索可能產(chǎn)生不同的結(jié)果。
  • _primary躲查,_primary_first它浅,_replica和_replica_first已棄用,因為不建議使用它們镣煮。


如果es可以感知到硬件的物理布局蟆豫,就可以確保說,priamry shard和replica shard一定是分配到不同的物理機懒闷,或者物理機架十减,或者不同的機房,這樣可以最小化物理機愤估,機架帮辟,機房崩潰的風險。

shard allocation awareness可以告訴es我們的硬件架構(gòu)

舉個栗子玩焰,如果我們有多個機架由驹,那么我們啟動一個node的時候,就要告訴這個node它在哪個機架上昔园,可以給它一個rack_id蔓榄,比如下面的命令:./bin/elasticsearch -Enode.attr.rack_id=rack_one,也可以在elasticsearch.yml中設(shè)置這個機架id

cluster.routing.allocation.awareness.attributes: rack_id


如果啟動兩個node,都在一個機架上荤西,此時創(chuàng)建一個有5個primary shard和5個replica shard的索引澜搅,此時shards會被分配到兩個節(jié)點上

如果再啟動兩個node,設(shè)置為另外一個機架邪锌,此時es會將shard移動到新的node上勉躺,去確保說,不會讓primary shard和其replica shard在同一個機架上秃流。但是如果機架2故障了赂蕴,為了恢復集群,那么還是會在恢復的時候舶胀,將shards全部在機架1上分配的

prefer local shard機制:在執(zhí)行search或者get請求的時候,如果啟用了shard awareness特性碧注,那么es會盡量使用local shard來執(zhí)行請求嚣伐,也就是在同一個awareness group中的shard來執(zhí)行請求,也就是說盡量用一個機架或者一個機房中的shard來執(zhí)行請求萍丐,而不要跨機架或者跨機房來執(zhí)行請求


cluster.routing.allocation.awareness.attributes: rack_id,zone




cluster.routing.allocation.awareness.attributes: zone
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2


那么此時如果將2個node分配給zone1機房桃笙,然后創(chuàng)建一個索引氏堤,5個primary shard和5個replica shard,但是此時只會在zone1機房分配5個primary shard搏明,只有我們啟動一批node在zone2機房鼠锈,才會分配replica shard


        final ArrayList<ShardRouting> to = new ArrayList<>();
        for (final String attribute : key.attributes) {
            final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
            if (localAttributeValue != null) {
                for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
                    ShardRouting fromShard = iterator.next();
                    final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
                    if (discoveryNode == null) {
                        iterator.remove(); // node is not present anymore - ignore shard
                    } else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) {
        return Collections.unmodifiableList(to);



     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
    public ShardIterator activeInitializingShardsRandomIt() {
        return activeInitializingShardsIt(shuffler.nextSeed());
    public int nextSeed() {
        return seed.getAndIncrement();
    public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
        return CollectionUtils.rotate(shards, seed);
     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
    public ShardIterator activeInitializingShardsIt(int seed) {
      // 保證初始化好的排在正在初始化中的前面
        if (allInitializingShards.isEmpty()) {
            return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
        ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
        ordered.addAll(shuffler.shuffle(activeShards, seed));
        return new PlainShardIterator(shardId, ordered);
     * Return a rotated view of the given list with the given distance.
  // 根據(jù)seed的自增和哈希來保證不同的分片被輪詢到
    public static <T> List<T> rotate(final List<T> list, int distance) {
        if (list.isEmpty()) {
            return list;

        int d = distance % list.size();
        if (d < 0) {
            d += list.size();

        if (d == 0) {
            return list;

        return new RotatedList<>(list, d);


  • shard copy 1: 100ms
  • shard copy 2 (degraded): 1350ms
  • shard copy 3: 150ms
    長時間得垃圾回收邮丰、高磁盤IO行您、網(wǎng)絡(luò)帶寬滿、節(jié)點硬件異構(gòu)等情況下很容易出現(xiàn)某個節(jié)點性能惡化剪廉,例如shard copy 2娃循。
    發(fā)送到shard copy 2上的請求響應(yīng)會變慢,為了使副本選擇更加智能斗蒋,ES的工程師開發(fā)了ARS的功能捌斧。


Our ARS implementation is based on a formula where, for each search request, 
Elasticsearch ranks each copy of the shard to determine which is likeliest to be the "best" 
copy to send the request to. Instead of sending requests in a round-robin fashion to each 
copy of the shard, Elasticsearch selects the "best" copy and routes the request there.

The ARS formula initially seems complex, but let's break it down:

Ψ(s) = R(s) - 1/μ?(s) + (q?(s))^3 / μ?(s)

Where q?(s) is:

q?(s) = 1 + (os(s) * n) + q(s)
 private double innerRank(long outstandingRequests) {
            // the concurrency compensation is defined as the number of
            // outstanding requests from the client to the node times the number
            // of clients in the system
            double concurrencyCompensation = outstandingRequests * clientNum;

            // Cubic queue adjustment factor. The paper chose 3 though we could
            // potentially make this configurable if desired.
            int queueAdjustmentFactor = 3;

            // EWMA of queue size
            double qBar = queueSize;
            double qHatS = 1 + concurrencyCompensation + qBar;

            // EWMA of response time
            double rS = responseTime / FACTOR;
            // EWMA of service time
            double muBarS = serviceTime / FACTOR;

            // The final formula
            double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
            return rank;

And looking at the individual pieces:

  • os(s) ,節(jié)點未完成的搜索請求數(shù)。
  • n 泉沾,系統(tǒng)中節(jié)點的數(shù)量捞蚂。
  • R(s), 從協(xié)調(diào)節(jié)點上得到的響應(yīng)時間的加權(quán)平均值跷究,單位毫秒姓迅。
  • q(s) ,搜索隊列中等待任務(wù)的加權(quán)平均值。
  • μ?(s) 丁存,數(shù)據(jù)節(jié)點上搜索時間得加權(quán)平均值肩杈。
private static class NodeStatistics {
        final String nodeId;
        final ExponentiallyWeightedMovingAverage queueSize;
        final ExponentiallyWeightedMovingAverage responseTime;
        double serviceTime;

        NodeStatistics(String nodeId,
                       ExponentiallyWeightedMovingAverage queueSizeEWMA,
                       ExponentiallyWeightedMovingAverage responseTimeEWMA,
                       double serviceTimeEWMA) {
            this.nodeId = nodeId;
            this.queueSize = queueSizeEWMA;
            this.responseTime = responseTimeEWMA;
            this.serviceTime = serviceTimeEWMA;

"adaptive_selection": {
                "5BN2QxfZQ3yzotzQhUXzlg": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 2976073,
                    "avg_response_time_ns": 3396261,
                    "rank": "3.4"
                "eAznL5r5RreHLIU16XpczA": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 8884750,
                    "avg_response_time_ns": 15520622,
                    "rank": "15.5"



在某個數(shù)據(jù)節(jié)點處于高負載的情況下,吞吐有了很大的提高编丘,延遲中位數(shù)有所增加与学, 這是為了繞開高負載的節(jié)點,增加了壓力較低的節(jié)點的負載嘉抓,從而增加了延遲索守,

private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
                                                               final Map<String, Long> nodeSearchCounts) {
        if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
            return shards;

        // Retrieve which nodes we can potentially send the query to
        final Set<String> nodeIds = getAllNodeIds(shards);
        final int nodeCount = nodeIds.size();

        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);

        // Retrieve all the nodes the shards exist on
        final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);

        // sort all shards based on the shard rank
        ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
        Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));

        // adjust the non-winner nodes' stats so they will get a chance to receive queries
        if (sortedShards.size() > 1) {
            ShardRouting minShard = sortedShards.get(0);
            // If the winning shard is not started we are ranking initializing
            // shards, don't bother to do adjustments
            if (minShard.started()) {
                String minNodeId = minShard.currentNodeId();
                Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
                if (maybeMinStats.isPresent()) {
                    adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
                    // Increase the number of searches for the "winning" node by one.
                    // Note that this doesn't actually affect the "real" counts, instead
                    // it only affects the captured node search counts, which is
                    // captured once for each query in TransportSearchAction
                    nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);

        return sortedShards;

eg. 防止某些節(jié)點一直不處理請求,會在每次選擇完節(jié)點后抑片,對選出的節(jié)點的計數(shù)+1卵佛,并且調(diào)整沒有選中的節(jié)點。

     * Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
     * Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
     * Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
     * then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
     * requests.
     * This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
     * the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
     * non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
    private static void adjustStats(final ResponseCollectorService collector,
                                    final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
                                    final String minNodeId,
                                    final ResponseCollectorService.ComputedNodeStats minStats) {
        if (minNodeId != null) {
            for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
                final String nodeId = entry.getKey();
                final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
                if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
                    final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
                    final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
                    final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
                    final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
                    collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);


