elasticsearch shard split 分析(二)

內(nèi)部實現(xiàn)分析

首先通過調(diào)用RestController的registerHandler函數(shù)注冊split接口的handler為RestSplitIndexAction

RestSplitIndexAction

controller.registerHandler(RestRequest.Method.PUT, "/{index}/_split/{target}", this);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_split/{target}", this);

當(dāng)elasticsearch收到請求時會進入RestController的dispatcher函數(shù)

RestController

boolean dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client,
                            final Optional<RestHandler> mHandler) throws Exception {
        final int contentLength = request.hasContent() ? request.content().length() : 0;

        RestChannel responseChannel = channel;
        // Indicator of whether a response was sent or not
        boolean requestHandled;

        if (contentLength > 0 && mHandler.map(h -> hasContentType(request, h) == false).orElse(false)) {
           ...
        } else if (contentLength > 0 && mHandler.map(h -> h.supportsContentStream()).orElse(false) &&
            request.getXContentType() != XContentType.JSON && request.getXContentType() != XContentType.SMILE) {
           ...
        } else if (mHandler.isPresent()) {
            //在這個分支里處理
            try {
                if (canTripCircuitBreaker(mHandler)) {
                    inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
                } else {
                    inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
                }
                // iff we could reserve bytes for the request we need to send the response also over this channel
                responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);

                final RestHandler wrappedHandler = mHandler.map(h -> handlerWrapper.apply(h)).get();
                //wrappedHandler其實就是在restController中注冊的handler蛙卤,也就是RestSplitIndexAction
                wrappedHandler.handleRequest(request, responseChannel, client);
                requestHandled = true;
            } catch (Exception e) {
                responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
                // We "handled" the request by returning a response, even though it was an error
                requestHandled = true;
            }
        } else {
            ...
        }
        // Return true if the request was handled, false otherwise.
        return requestHandled;
    }

上面代碼省略了一些其它的東西焕刮,主要是在mHandler.isPresent分支冕碟,因為開始的時候在restController中注冊了split的handler歌懒。所以mHandler一定不為空梯轻。handleRequest其實是BaseRestHandler中的一個方法划鸽。跟進去看一下美浦。

BaseRestHandler

public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        final RestChannelConsumer action = prepareRequest(request, client);
        ...
        usageCount.increment();
        // execute the action
        //正真的執(zhí)行在這里弦赖,調(diào)用prepareRequset返回的channleConsumer,并將channle傳遞給它浦辨。
        action.accept(channel);
    }

在該方法中首先調(diào)用了prepareRequest方法蹬竖,該方法會返回一個RestChannleConsumer,在elasticsearch中有大量的這種consumer接口流酬。這種接口其實就是java 1.8中的函數(shù)式接口币厕。接口中有一個accept函數(shù),該接受一個參數(shù)芽腾。該consumer是prepareRequest函數(shù)返回的旦装。而在RestSplitIndexAction中覆蓋了該方法。

RestSplitIndexAction

public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        if (request.param("target") == null) {
            throw new IllegalArgumentException("no target index");
        }
        if (request.param("index") == null) {
            throw new IllegalArgumentException("no source index");
        }
        ResizeRequest shrinkIndexRequest = new ResizeRequest(request.param("target"), request.param("index"));
        shrinkIndexRequest.setResizeType(ResizeType.SPLIT);
        request.applyContentParser(parser -> ResizeRequest.PARSER.parse(parser, shrinkIndexRequest, null));
        shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
        shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
        shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
        return channel -> client.admin().indices().resizeIndex(shrinkIndexRequest, new AcknowledgedRestListener<ResizeResponse>(channel) {
            @Override
            public void addCustomFields(XContentBuilder builder, ResizeResponse response) throws IOException {
                response.addCustomFields(builder);
            }
        });
    }

在該方法中摊滔,構(gòu)造了一個ResizeRequest對象阴绢,并將源索引和目標(biāo)索引傳遞進去。同時設(shè)置resize的方式為spilt艰躺。elasticsearch支持兩種resize操作呻袭。一種是split,將shard 分裂腺兴。另一種是shrink左电。也就是將shard合并。設(shè)置了一些控制超時的參數(shù)后返回了一個匿名函數(shù)。在BaseRestHandler對象中的handleRequest函數(shù)最終會調(diào)用該函數(shù)篓足。然后進入了IndicesAdmin中的resizeIndex函數(shù)段誊。

IndicesAdmin

public void resizeIndex(ResizeRequest request, ActionListener<ResizeResponse> listener) {
            execute(ResizeAction.INSTANCE, request, listener);
        }

在該函數(shù)中調(diào)用execute函數(shù),傳入了一個ResezeAction對象纷纫。在elasticsearch中枕扫,對外暴露的接口都是通過內(nèi)部的action對象來處理的。因為elasticsearch本身提供了restful的接口和rpc接口(傳輸層客戶端)辱魁。所以有兩套action烟瞧。通過restful調(diào)的接口首先會被以rest開頭的action處理(rest接口和對應(yīng)的處理action關(guān)系由RestController維護),然后再在中間做一層轉(zhuǎn)換染簇,找到相應(yīng)的以transport開頭的action來處理参滴。比如在split接口中,首先會被RestSplitIndexAction處理锻弓。處理完后進入了IndicesAdmin中的resizeIndex函數(shù)砾赔。而該函數(shù)中就直接去執(zhí)行ResizeAction了。

IndicesAdmin

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
                Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
            client.execute(action, request, listener);
        }

然后會調(diào)用client的execute方法來執(zhí)行

AbstractClient

public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
            Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        listener = threadedWrapper.wrap(listener);
        doExecute(action, request, listener);
    }

隨后再調(diào)用doExecute方法青灼,因為在IndicesAdmin中的client是NodeClient暴心,所以直接進入NodeClient的doExecute方法。

NodeClient

public <    Request extends ActionRequest,
                Response extends ActionResponse,
                RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
            > void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        // Discard the task because the Client interface doesn't use it.
        executeLocally(action, request, listener);
    }

然后再調(diào)用executeLocally函數(shù)

public <    Request extends ActionRequest,
                Response extends ActionResponse
            > Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
        return transportAction(action).execute(request, listener);
    }

這里首先調(diào)用了transportAction函數(shù)杂拨,并把ResizeAction傳了進去专普。這一步其實就是解析傳輸層的handler。

private <    Request extends ActionRequest,
                Response extends ActionResponse
            > TransportAction<Request, Response> transportAction(GenericAction<Request, Response> action) {
        if (actions == null) {
            throw new IllegalStateException("NodeClient has not been initialized");
        }
        //根據(jù)傳入的action去actions里找到對應(yīng)的傳輸層action來處理
        TransportAction<Request, Response> transportAction = actions.get(action);
        if (transportAction == null) {
            throw new IllegalStateException("failed to find action [" + action + "] to execute");
        }
        return transportAction;
    }

其中actions就是一個map弹沽,在ActionModule中的setupActions中會向actions中注冊所有的傳輸層action檀夹。

ActionModule

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
        // Subclass NamedRegistry for easy registration
        class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
            ActionRegistry() {
                super("action");
            }

            public void register(ActionHandler<?, ?> handler) {
                register(handler.getAction().name(), handler);
            }

            public <Request extends ActionRequest, Response extends ActionResponse> void register(
                    GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
                    Class<?>... supportTransportActions) {
                register(new ActionHandler<>(action, transportAction, supportTransportActions));
            }
        }
        ActionRegistry actions = new ActionRegistry();

        ...
        //此處省略了很多action的注冊
        actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
        ...

        actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);

        return unmodifiableMap(actions.getRegistry());
    }

可以看到,ResizeAction對應(yīng)的處理對象為TransportResizeAction策橘≌ǘ桑回到NodeClient中的executeLocally函數(shù),在找到對應(yīng)的action處理后丽已,調(diào)用其execute方法蚌堵。

TransportAction

public final Task execute(Request request, ActionListener<Response> listener) {
        /*
         * While this version of execute could delegate to the TaskListener
         * version of execute that'd add yet another layer of wrapping on the
         * listener and prevent us from using the listener bare if there isn't a
         * task. That just seems like too many objects. Thus the two versions of
         * this method.k
         */
        Task task = taskManager.register("transport", actionName, request);
        if (task == null) {
            execute(null, request, listener);
        } else {
            execute(task, request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response response) {
                    taskManager.unregister(task);
                    listener.onResponse(response);
                }

                @Override
                public void onFailure(Exception e) {
                    taskManager.unregister(task);
                    listener.onFailure(e);
                }
            });
        }
        return task;
    }

這里首先向taskManager注冊一個task,task就是一個任務(wù)的包裝沛婴,包括該任務(wù)的類型吼畏、創(chuàng)建的時間、執(zhí)行的action瘸味、task id及父task信息宫仗。生成task后調(diào)用了另一個重載的execute函數(shù),同時對listener重新包裝了一下旁仿,這里之所以重新包裝主要是為了在listener中調(diào)用taskManager的unregister函數(shù)藕夫,把該task去掉孽糖。

public final void execute(Task task, Request request, ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }

        RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
        requestFilterChain.proceed(task, actionName, request, listener);
    }

execute函數(shù)中首先調(diào)用request.validate驗證該請求是否有效,如果通過后會構(gòu)造一個RequestFilterChain對象毅贮。

RequestFilterChain

public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
            int i = index.getAndIncrement();
            try {
                if (i < this.action.filters.length) {
                    this.action.filters[i].apply(task, actionName, request, listener, this);
                } else if (i == this.action.filters.length) {
                    this.action.doExecute(task, request, listener);
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch(Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
        }

action中有一個filter數(shù)組办悟,維護著所有的filter。如果有filter的話會逐個的調(diào)用filter來處理滩褥。直到最后調(diào)用action的doExecute方法病蛉。TransportResizeAction沒有設(shè)置filter,所以會直接調(diào)用action.doExecute方法瑰煎。因為TransportResizeAction繼承了TransportMasterNodeAction铺然,最終進入了TransportMasterNodeAction的doExecute方法。

TransportMasterNodeAction

protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
        new AsyncSingleAction(task, request, listener).start();
    }

這里創(chuàng)建了一個AsyncSingleAction對象酒甸,并調(diào)用了期start方法魄健。

AsyncSingleAction

public void start() {
            ClusterState state = clusterService.state();
            this.observer = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
            doStart(state);
        }

首先獲取了集群當(dāng)前的狀態(tài),然后調(diào)用doStart方法

protected void doStart(ClusterState clusterState) {
            final Predicate<ClusterState> masterChangePredicate = MasterNodeChangePredicate.build(clusterState);
            final DiscoveryNodes nodes = clusterState.nodes();
            if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
                // check for block, if blocked, retry, else, execute locally
                final ClusterBlockException blockException = checkBlock(request, clusterState);
                if (blockException != null) {
                    if (!blockException.retryable()) {
                        listener.onFailure(blockException);
                    } else {
                        logger.trace("can't execute due to a cluster block, retrying", blockException);
                        retry(blockException, newState -> {
                            ClusterBlockException newException = checkBlock(request, newState);
                            return (newException == null || !newException.retryable());
                        });
                    }
                } else {
                    ActionListener<Response> delegate = new ActionListener<Response>() {
                        @Override
                        public void onResponse(Response response) {
                            listener.onResponse(response);
                        }

                        @Override
                        public void onFailure(Exception t) {
                            if (t instanceof Discovery.FailedToCommitClusterStateException
                                    || (t instanceof NotMasterException)) {
                                logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
                                retry(t, masterChangePredicate);
                            } else {
                                listener.onFailure(t);
                            }
                        }
                    };
                    threadPool.executor(executor).execute(new ActionRunnable(delegate) {
                        @Override
                        protected void doRun() throws Exception {
                            masterOperation(task, request, clusterState, delegate);
                        }
                    });
                }
            } else {
                if (nodes.getMasterNode() == null) {
                    logger.debug("no known master node, scheduling a retry");
                    retry(null, masterChangePredicate);
                } else {
                    DiscoveryNode masterNode = nodes.getMasterNode();
                    final String actionName = getMasterActionName(masterNode);
                    transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler<Response>(listener,
                        TransportMasterNodeAction.this::newResponse) {
                        @Override
                        public void handleException(final TransportException exp) {
                            Throwable cause = exp.unwrapCause();
                            if (cause instanceof ConnectTransportException) {
                                // we want to retry here a bit to see if a new master is elected
                                logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
                                        actionName, nodes.getMasterNode(), exp.getDetailedMessage());
                                retry(cause, masterChangePredicate);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                    });
                }
            }
        }

這里先判斷該節(jié)點是不是master節(jié)點插勤。如果不是master節(jié)點需要獲取master節(jié)點并且把請求轉(zhuǎn)發(fā)到master上去執(zhí)行沽瘦。否則就在本地執(zhí)行。如果是本地執(zhí)行农尖,又將listener包裝了一次析恋,這次包裝主要是為了在失敗的時候能重試。準(zhǔn)備工作做完后就獲取生成一個ActionRunnable對象盛卡,并執(zhí)行起run方法助隧。注意,這里還是同步執(zhí)行的窟扑。

EsExecutor

public void execute(Runnable command) {
            command.run();
        }

AbstractRunnable

public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

ActionRunnable

protected void doRun() throws Exception {
          masterOperation(task, request, clusterState, delegate);
   }

TransportMasterNodeAction

protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
        masterOperation(request, state, listener);
    }

在這里task信息就直接背忽略了喇颁,最終調(diào)用了TransportResizeAction的masterOperation方法

TransportResizeAction

protected void masterOperation(final ResizeRequest resizeRequest, final ClusterState state,
                                   final ActionListener<ResizeResponse> listener) {

        // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
        final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
        final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
        client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
            @Override
            public void onResponse(IndicesStatsResponse indicesStatsResponse) {
                CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
                    (i) -> {
                        IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
                        return shard == null ? null : shard.getPrimary().getDocs();
                    }, sourceIndex, targetIndex);
                createIndexService.createIndex(
                    updateRequest,
                    ActionListener.wrap(response ->
                            listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcked(),
                                updateRequest.index())), listener::onFailure
                    )
                );
            }

            @Override
            public void onFailure(Exception e) {
                listener.onFailure(e);
            }
        });

    }

這里似乎又執(zhí)行了一個task漏健,其實該task是為了獲取索引的狀態(tài)信息的嚎货。但貌似這個索引狀態(tài)只有在調(diào)用shrink api的時候才會有用,這里暫時不分析蔫浆。當(dāng)索引狀態(tài)獲取完畢后殖属,會調(diào)用listener的onResponse函數(shù)。注意瓦盛,這里進入到onResponse函數(shù)里其實已經(jīng)是在另外的線程里了洗显。在onResponse函數(shù)中緊接著調(diào)用了prepareCreateIndexRequest函數(shù)。

static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest, final ClusterState state
       , final IntFunction<DocsStats> perShardDocStats, String sourceIndexName, String targetIndexName) {
     //此處省略了一些代碼
       ...
       //對目標(biāo)索引中的每個shard
       for (int i = 0; i < numShards; i++) {
           if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
               Set<ShardId> shardIds = IndexMetaData.selectShrinkShards(i, metaData, numShards);
               long count = 0;
               for (ShardId id : shardIds) {
                   DocsStats docsStats = perShardDocStats.apply(id.id());
                   if (docsStats != null) {
                       count += docsStats.getCount();
                   }
                   if (count > IndexWriter.MAX_DOCS) {
                       throw new IllegalStateException("Can't merge index with more than [" + IndexWriter.MAX_DOCS
                           + "] docs - too many documents in shards " + shardIds);
                   }
               }
           } else {
               //在這里對目標(biāo)索引中的每個shard都選擇一個源shard原环。判斷源shard是否為空挠唆,如果為空則拋異常
               Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));
               // we just execute this to ensure we get the right exceptions if the number of shards is wrong or less then etc.
           }
       }

       if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
           throw new IllegalArgumentException("cannot provide a routing partition size value when resizing an index");
       }
       if (IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(targetIndexSettings)) {
           throw new IllegalArgumentException("cannot provide index.number_of_routing_shards on resize");
       }
       String cause = resizeRequest.getResizeType().name().toLowerCase(Locale.ROOT) + "_index";
       targetIndex.cause(cause);
       Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
       settingsBuilder.put("index.number_of_shards", numShards);
       targetIndex.settings(settingsBuilder);

       return new CreateIndexClusterStateUpdateRequest(targetIndex,
           cause, targetIndex.index(), targetIndexName, true)
           // mappings are updated on the node when creating in the shards, this prevents race-conditions since all mapping must be
           // applied once we took the snapshot and if somebody messes things up and switches the index read/write and adds docs we miss
           // the mappings for everything is corrupted and hard to debug
           .ackTimeout(targetIndex.timeout())
           .masterNodeTimeout(targetIndex.masterNodeTimeout())
           .settings(targetIndex.settings())
           .aliases(targetIndex.aliases())
           .customs(targetIndex.customs())
           .waitForActiveShards(targetIndex.waitForActiveShards())
           .recoverFrom(metaData.getIndex())
           .resizeType(resizeRequest.getResizeType());
   }

這個函數(shù)比較長,其中最重要的一步可以看

Objects.requireNonNull(IndexMetaData.selectSplitShard(i, metaData, numShards));

這一行代碼內(nèi)部其實體現(xiàn)了elasticsearch在split的時候是怎么分裂的嘱吗。即目標(biāo)索引的shard是從源索引的哪個shard split得到的玄组。

IndexMetaData

public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMetadata, int numTargetShards) {
        if (shardId >= numTargetShards) {
            throw new IllegalArgumentException("the number of target shards (" + numTargetShards + ") must be greater than the shard id: "
                + shardId);
        }
        int numSourceShards = sourceIndexMetadata.getNumberOfShards();
        if (numSourceShards > numTargetShards) {
            throw new IllegalArgumentException("the number of source shards [" + numSourceShards
                 + "] must be less that the number of target shards [" + numTargetShards + "]");
        }
        int routingFactor = getRoutingFactor(numSourceShards, numTargetShards);
        // now we verify that the numRoutingShards is valid in the source index
        int routingNumShards = sourceIndexMetadata.getRoutingNumShards();
        if (routingNumShards % numTargetShards != 0) {
            throw new IllegalStateException("the number of routing shards ["
                + routingNumShards + "] must be a multiple of the target shards [" + numTargetShards + "]");
        }
        // this is just an additional assertion that ensures we are a factor of the routing num shards.
        assert getRoutingFactor(numTargetShards, sourceIndexMetadata.getRoutingNumShards()) >= 0;
        return new ShardId(sourceIndexMetadata.getIndex(), shardId/routingFactor);
    }

這個函數(shù)接受三個參數(shù),分別為目標(biāo)索引中的某個shard, 源索引的metadata俄讹,目標(biāo)索引中總共有多少個shard哆致。先計算出routingFactor,這里的routingFactor其實是指split 擴大了多少倍患膛。也就是用numTargetShards / numSourceShards摊阀。到后面還有有個地方有計算routingFactor,但其實和這里的概念不一樣踪蹬。從最后的返回值可以看出胞此,最終的計算表達式為:

  sourceShardId = targetShardId / (numTargetShards / numSourceShards)

比如源索引有兩個shard,想要分裂為四個shard跃捣。那么目標(biāo)索引的shard id 和源索引的shard id關(guān)系為:

源shard 目標(biāo)shard
0 0
0 1
1 2
1 3

然后回到prepareCreateIndexRequest函數(shù)豌鹤,驗證通過后,創(chuàng)建了一個CreateIndexClusterStateUpdateRequest對象枝缔。從名字也可以看出這是一個集群狀態(tài)變更對象布疙,而且是一次創(chuàng)建索引的集群變更。創(chuàng)建后設(shè)置了一些屬性愿卸,最重要的我覺得是recoverFrom屬性灵临,該屬性用于決定目標(biāo)索引數(shù)據(jù)怎么獲取。然后繼續(xù)回退到masterOperation函數(shù)趴荸。這里將prepareCreateIndexRequest對象復(fù)制給updateRequest后儒溉,傳遞到了createIndexService的createIndex函數(shù),同時對listener又包裝了一次发钝,這次包裝主要是為了替換response對象顿涣,在這里將其替換成了ResizeResponse對象。createIndexService其實是一個MetaDataCreateIndexService對象酝豪,負責(zé)創(chuàng)建索引的請求涛碑。

MetaDataCreateIndexService

public void createIndex(final CreateIndexClusterStateUpdateRequest request,
                            final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
        onlyCreateIndex(request, ActionListener.wrap(response -> {
            if (response.isAcknowledged()) {
                activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
                    shardsAcked -> {
                        if (shardsAcked == false) {
                            logger.debug("[{}] index created, but the operation timed out while waiting for " +
                                             "enough shards to be started.", request.index());
                        }
                        listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked));
                    }, listener::onFailure);
            } else {
                listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
            }
        }, listener::onFailure));
    }

這里只是對listener又包裝了一次,用于判斷集群狀態(tài)是否被正確變更孵淘,如果是蒲障,則等待相應(yīng)的shard個數(shù)被激活√敝ぃ可以看到elasticsearch里邊采用了大量的異步方式揉阎,大量的listener包裝、使用導(dǎo)致很容易跟丟代碼背捌,并且相應(yīng)的注釋也比較少毙籽。所以elasticsearch的代碼還是比較難閱讀的徒像。廢話不多說悴晰,再次對listener包裝后進入了onlyCreateIndex函數(shù)吃媒,從名字上也可以看出這個函數(shù)僅僅只創(chuàng)建索引孝鹊。所以創(chuàng)建索引和等待相應(yīng)的shard被激活這是異步的。有可能索引創(chuàng)建成功垮衷,但shard并沒有被創(chuàng)建厅翔。

private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
                                 final ActionListener<ClusterStateUpdateResponse> listener) {
        Settings.Builder updatedSettingsBuilder = Settings.builder();
        Settings build = updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build();
        indexScopedSettings.validate(build, true); // we do validate here - index setting must be consistent
        request.settings(build);
        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
            new IndexCreationTask(logger, allocationService, request, listener, indicesService, aliasValidator, xContentRegistry, settings,
                this::validate));
    }

到這一步就向集群服務(wù)提交了一個狀態(tài)更新任務(wù),并指命令操作原因搀突,及封裝了一個IndexCreationTask任務(wù)刀闷。

ClusterService

public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
        void submitStateUpdateTask(String source, T updateTask) {
        submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
    }
public <T> void submitStateUpdateTask(String source, T task,
                                          ClusterStateTaskConfig config,
                                          ClusterStateTaskExecutor<T> executor,
                                          ClusterStateTaskListener listener) {
        submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
    }
public <T> void submitStateUpdateTasks(final String source,
                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                           final ClusterStateTaskExecutor<T> executor) {
        masterService.submitStateUpdateTasks(source, tasks, config, executor);
    }

到這里變成了調(diào)用MasterService的服務(wù)。

MasterService

public <T> void submitStateUpdateTasks(final String source,
                                           final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
                                           final ClusterStateTaskExecutor<T> executor) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
                .map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
                .collect(Collectors.toList());
            taskBatcher.submitTasks(safeTasks, config.timeout());
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }

這里用到了java1.8中的語法仰迁,其實就是對task做了一次封裝甸昏。封裝成UpdateTask對象,并調(diào)用TaskBatcher提交task徐许。

UpdateTask

UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
                       ClusterStateTaskExecutor<?> executor) {
                super(priority, source, executor, task);
                this.listener = listener;
            }

TaskBatcher

public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
        if (tasks.isEmpty()) {
            return;
        }
        final BatchedTask firstTask = tasks.get(0);
        assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
            "tasks submitted in a batch should share the same batching key: " + tasks;
        // convert to an identity map to check for dups based on task identity
        final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
            BatchedTask::getTask,
            Function.identity(),
            (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
            IdentityHashMap::new));

        synchronized (tasksPerBatchingKey) {
            LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
                k -> new LinkedHashSet<>(tasks.size()));
            for (BatchedTask existing : existingTasks) {
                // check that there won't be two tasks with the same identity for the same batching key
                BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
                if (duplicateTask != null) {
                    throw new IllegalStateException("task [" + duplicateTask.describeTasks(
                        Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
                }
            }
            existingTasks.addAll(tasks);
        }

        if (timeout != null) {
            threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
        } else {
            threadExecutor.execute(firstTask);
        }
    }

這個函數(shù)里有兩處查重的步驟施蜜,第一個是檢查函數(shù)傳入的tasks中有沒有重復(fù)的task,第二個是檢查本次提交的tasks是否和歷史提交的tasks有重復(fù)雌隅。tasksPerBatchingKey維護了同一個batchingKey對應(yīng)的所有task翻默。隨后就調(diào)用executor執(zhí)行這個task。

PrioritizedEsThreadPoolExecutor

public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
        command = wrapRunnable(command);
        doExecute(command);
        if (timeout.nanos() >= 0) {
            if (command instanceof TieBreakingPrioritizedRunnable) {
                ((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
            } else {
                // We really shouldn't be here. The only way we can get here if somebody created PrioritizedFutureTask
                // and passed it to execute, which doesn't make much sense
                throw new UnsupportedOperationException("Execute with timeout is not supported for future tasks");
            }
        }
    }

這里先會對command包裝一下恰起,先看下里邊究竟干了啥修械。

protected Runnable wrapRunnable(Runnable command) {
        if (command instanceof PrioritizedRunnable) {
            if ((command instanceof TieBreakingPrioritizedRunnable)) {
                return command;
            }
            Priority priority = ((PrioritizedRunnable) command).priority();
          //UpdateTask對象最終會被包裝成這個對象
            return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
        } else if (command instanceof PrioritizedFutureTask) {
            return command;
        } else { // it might be a callable wrapper...
            if (command instanceof TieBreakingPrioritizedRunnable) {
                return command;
            }
            return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
        }
    }

command對象其實是前文所說的UpdateTask對象,該對象是一種優(yōu)先級執(zhí)行對象检盼,所以最終會被包裝成TieBreakingPrioritizedRunnable對象肯污,這個對象實現(xiàn)了Runnable的run方法:

TieBreakingPrioritizedRunnable

public void run() {
            synchronized (this) {
                // make the task as stared. This is needed for synchronization with the timeout handling
                // see  #scheduleTimeout()
                started = true;
                FutureUtils.cancel(timeoutFuture);
            }
            runAndClean(runnable);
        }

繼續(xù)回到execute方法中,包裝完command后直接調(diào)用了doExecute方法吨枉,該方法其實是PrioritizedEsThreadPoolExecutor父類EsThreadPoolExecutor的一個方法蹦渣。

EsThreadPoolExecutor

protected void doExecute(final Runnable command) {
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

EsThreadPoolExecutor繼承了java中的ThreadPoolExecutor,有關(guān)elasticsearch中的executor后面再分析貌亭。到這里可以看到在doExecute中調(diào)用了父類的execute方法柬唯,最終提交了該任務(wù)到線程池中執(zhí)行。提交后回到execute方法中属提,如果設(shè)置了超時時間权逗,則在一段時間后調(diào)用超時回調(diào)函數(shù)美尸。
至此冤议,創(chuàng)建索引的任務(wù)已經(jīng)被提交。在下篇文章中將會分析任務(wù)是怎么執(zhí)行的师坎。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末恕酸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子胯陋,更是在濱河造成了極大的恐慌蕊温,老刑警劉巖袱箱,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異义矛,居然都是意外死亡发笔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門凉翻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來了讨,“玉大人,你說我怎么就攤上這事制轰∏凹疲” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵垃杖,是天一觀的道長男杈。 經(jīng)常有香客問我,道長调俘,這世上最難降的妖魔是什么伶棒? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮彩库,結(jié)果婚禮上苞冯,老公的妹妹穿的比我還像新娘。我一直安慰自己侧巨,他們只是感情好舅锄,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著司忱,像睡著了一般皇忿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上坦仍,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天鳍烁,我揣著相機與錄音,去河邊找鬼繁扎。 笑死幔荒,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的梳玫。 我是一名探鬼主播爹梁,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼提澎!你這毒婦竟也來了姚垃?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤盼忌,失蹤者是張志新(化名)和其女友劉穎积糯,沒想到半個月后掂墓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡看成,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年君编,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片川慌。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡啦粹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出窘游,到底是詐尸還是另有隱情唠椭,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布忍饰,位于F島的核電站贪嫂,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏艾蓝。R本人自食惡果不足惜力崇,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赢织。 院中可真熱鬧亮靴,春花似錦、人聲如沸于置。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽八毯。三九已至搓侄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間话速,已是汗流浹背讶踪。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留泊交,地道東北人乳讥。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像廓俭,于是被迫代替她去往敵國和親云石。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容