Elasticsearch源碼分析-索引分析(一)

1. 一個簡單的索引請求示例

首先,我們來看一個索引請求:

curl -XPUT 127.0.0.1:9200/item/show/28589790
{
   "id": 28589790,
   "text": "這是一個索引文本"
}

這個請求的主要作用是向item索引中添加一個索引文檔缭付,文檔信息:
文檔 id: 28589790
字段id: 28589790
字段text: 這是一個索引文本
如果索引中已經包含id為28589790的索引捐祠,elasticsearch將會使用這條數據進行覆蓋

2. 索引時序圖

3. 索引請求轉發(fā)

  1. 在elasticsearch啟動時亥至,會注入RestSearchAction 對象信不,并且會把方法苔埋、URI 和當前對象注冊到內存中
public class RestIndexAction extends BaseRestHandler {
    @Inject
    public RestIndexAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
        controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
        controller.registerHandler(POST, "/{index}/{type}/{id}", this);
        CreateHandler createHandler = new CreateHandler(settings, controller, client);
        controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
        controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
    }
}

elasticsearch使用HttpRequestHandler.messageReceived()方法接受用戶請求懦砂,然后調用dispatchRequest()方法對請求進行轉發(fā)。
當請求跳轉到RestController時组橄,會調用getHandler()方法根據請求的Path獲取對應的handler荞膘,由上文可以看出item/show/28589790 會匹配到RestIndexAction

public class RestController extends AbstractLifecycleComponent<RestController> {
    void executeHandler(RestRequest request, RestChannel channel) throws Exception {
        final RestHandler handler = getHandler(request);
        if (handler != null) {
            handler.handleRequest(request, channel);
        } else {
            if (request.method() == RestRequest.Method.OPTIONS) {
                // when we have OPTIONS request
                // simply send OK by default (with the Access Control Origin header which gets automatically added)
                channel.sendResponse(new BytesRestResponse(OK));
            } else {
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
            }
        }
    }
}

handler.handleRequest()方法最終會調用RestIndexAction.handleRequest()方法對索引參數進行解析,創(chuàng)建索引請求對象indexRequest玉工,然后調用client.index()開始創(chuàng)建索引

public class RestIndexAction extends BaseRestHandler {
    @Override
    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
        IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
        indexRequest.listenerThreaded(false);
        indexRequest.operationThreaded(true);
        indexRequest.routing(request.param("routing"));
        indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
        indexRequest.timestamp(request.param("timestamp"));
        if (request.hasParam("ttl")) {
            indexRequest.ttl(request.paramAsTime("ttl", null).millis());
        }
        indexRequest.source(request.content());
        indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
        indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
        indexRequest.version(RestActions.parseVersion(request));
        indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
        String sOpType = request.param("op_type");
        if (sOpType != null) {
            try {
                indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
            } catch (ElasticsearchIllegalArgumentException eia){
                try {
                    XContentBuilder builder = channel.newErrorBuilder();
                    channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
                } catch (IOException e1) {
                    logger.warn("Failed to send response", e1);
                    return;
                }
            }
        }
        String replicationType = request.param("replication");
        if (replicationType != null) {
            indexRequest.replicationType(ReplicationType.fromString(replicationType));
        }
        String consistencyLevel = request.param("consistency");
        if (consistencyLevel != null) {
            indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
        }
        client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
            @Override
            public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
                builder.startObject()
                        .field(Fields._INDEX, response.getIndex())
                        .field(Fields._TYPE, response.getType())
                        .field(Fields._ID, response.getId())
                        .field(Fields._VERSION, response.getVersion())
                        .field(Fields.CREATED, response.isCreated());
                builder.endObject();
                RestStatus status = OK;
                if (response.isCreated()) {
                    status = CREATED;
                }
                return new BytesRestResponse(status, builder);
            }
        });
    }
}

在索引請求中羽资,支持下列參數:
routing: 路由信息,具有相同路由信息的文檔存儲在同一分片上
parent: 文檔的parent id遵班, 如果未設置路由屠升,則會自動將其設置為路由
timestamp: 文檔產生的時間戳
ttl: 過期時間
timeout: 超時時間
refresh: 此索引操作之后是否執(zhí)行刷新,從而使文檔可被搜索狭郑,默認為false
version: 文檔的版本號
version_type: 版本類型弥激,默認internal,支持internal愿阐、external微服、external_gt、external_gte和force
op_type: 索引操作類型缨历,支持create和index
replication: 副本類型以蕴,支持async、sync和default
consistency: 一致性辛孵,支持one丛肮、quorum、all和default
請求的content即索引的source魄缚,文檔內容
在封裝完索引請求后宝与,就要調用 client.index() 執(zhí)行索引

4. 創(chuàng)建索引入口

在index()方法中焚廊,使用的Action是IndexAction.INSTANCE

public abstract class AbstractClient implements Client {
    @Override
    public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        execute(IndexAction.INSTANCE, request, listener);
    }
}

這個action在ActionModule中被TransportIndexAction注冊

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
    }
}

因此在NodeClient的execute()方法中根據action獲取到的transport action為TransportIndexAction

public class NodeClient extends AbstractClient {
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);  // TransportIndexAction
        transportAction.execute(request, listener);
    }
}

由于TransportIndexAction繼承了TransportAction,因此調用過程為NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
索引的大體流程為:先判斷是否需要創(chuàng)建索引习劫,如果是則先創(chuàng)建索引咆瘟,然后寫入文檔數據,否則直接寫入文檔數據

public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
            createIndexRequest.index(request.index());
            createIndexRequest.mapping(request.type());
            createIndexRequest.cause("auto(index api)");
            createIndexRequest.masterNodeTimeout(request.timeout());
            createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(CreateIndexResponse result) {
                    innerExecute(request, listener);
                }

                @Override
                public void onFailure(Throwable e) {
                    if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                        // we have the index, do it
                        try {
                            innerExecute(request, listener);
                        } catch (Throwable e1) {
                            listener.onFailure(e1);
                        }
                    } else {
                        listener.onFailure(e);
                    }
                }
            });
        } else {
            innerExecute(request, listener);
        }
    }
}

elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法來判斷是否需要創(chuàng)建索引

public class AutoCreateIndex {
    public AutoCreateIndex(Settings settings) {
        String value = settings.get("action.auto_create_index");
        if (value == null || Booleans.isExplicitTrue(value)) {
            needToCheck = true;
            globallyDisabled = false;
            matches = null;
            matches2 = null;
        } else if (Booleans.isExplicitFalse(value)) {
            needToCheck = false;
            globallyDisabled = true;
            matches = null;
            matches2 = null;
        } else {
            needToCheck = true;
            globallyDisabled = false;
            matches = Strings.commaDelimitedListToStringArray(value);
            matches2 = new String[matches.length];
            for (int i = 0; i < matches.length; i++) {
                matches2[i] = matches[i].substring(1);
            }
        }
    }

    public boolean shouldAutoCreate(String index, ClusterState state) {
        if (!needToCheck) {
            return false;
        }
        if (state.metaData().hasConcreteIndex(index)) {
            return false;
        }
        if (globallyDisabled) {
            return false;
        }
        if (matches == null) {
            return true;
        }
        for (int i = 0; i < matches.length; i++) {
            char c = matches[i].charAt(0);
            if (c == '-') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return false;
                }
            } else if (c == '+') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return true;
                }
            } else {
                if (Regex.simpleMatch(matches[i], index)) {
                    return true;
                }
            }
        }
        return false;
    }
}

其中參數和globallyDisabled的含義:
action.auto_create_index: elasticsearch配置文件的的配置項诽里,表示是否允許創(chuàng)建索引
needToCheck: 是否需要檢查能否創(chuàng)建索引袒餐,只有當action.auto_create_index為false時不需要檢查,直接返回無法創(chuàng)建索引
globallyDisabled: 是否全局禁用創(chuàng)建索引谤狡,只有當action.auto_create_index為false時全局禁用創(chuàng)建索引灸眼,直接返回無法創(chuàng)建索引
如果當前集群中已經包含了要創(chuàng)建的索引,那么也不需要創(chuàng)建索引墓懂。其他情況則根據action.auto_create_index配置的正則表達式來判斷
如果允許創(chuàng)建索引焰宣,則開始創(chuàng)建索引名的流程

5. 創(chuàng)建索引名

首先創(chuàng)建創(chuàng)建索引的請求createIndexRequest,設置了4個參數捕仔,分別是索引名index匕积、索引mapping、創(chuàng)建索引的原因cause和master節(jié)點超時時間masterNodeTimeout

CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());

然后開始調用createIndexAction.execute()方法創(chuàng)建索引名

public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
    public final void execute(Request request, ActionListener<Response> listener) {
        if (forceThreadedListener()) {
            request.listenerThreaded(true);
        }
        if (request.listenerThreaded()) {
            listener = new ThreadedActionListener<>(threadPool, listener, logger);
        }

        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (filters.length == 0) {
            try {
                // TransportAction 子類都要重寫這個方法
                doExecute(request, listener);
            } catch(Throwable t) {
                logger.trace("Error during transport action execution.", t);
                listener.onFailure(t);
            }
        } else {
            RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(actionName, request, listener);
        }
    }

    rotected abstract void doExecute(Request request, ActionListener<Response> listener);
}

從下面的類圖可以看出逻澳,TransportCreateIndexAction繼承了TransportMasterNodeOperation,調用過程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法來完成操作


TransportCreateIndexAction類圖

在TransportMasterNodeOperation中主要是保證操作在master節(jié)點上執(zhí)行

public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    @Override
    protected void doExecute(final Request request, final ActionListener<Response> listener) {
        innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
    }

    private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
        final ClusterState clusterState = observer.observedState();
        final DiscoveryNodes nodes = clusterState.nodes();
        if (nodes.localNodeMaster() || localExecute(request)) {
            final ClusterBlockException blockException = checkBlock(request, clusterState);
            if (blockException != null) {
                if (!blockException.retryable()) {
                    listener.onFailure(blockException);
                    return;
                }
                logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
                observer.waitForNextChange(
                        new ClusterStateObserver.Listener() {
                            @Override
                            public void onNewClusterState(ClusterState state) {
                                innerExecute(request, listener, observer, false);
                            }

                            @Override
                            public void onClusterServiceClose() {
                                listener.onFailure(blockException);
                            }

                            @Override
                            public void onTimeout(TimeValue timeout) {
                                listener.onFailure(blockException);
                            }
                        }, new ClusterStateObserver.ValidationPredicate() {
                            @Override
                            protected boolean validate(ClusterState newState) {
                                ClusterBlockException blockException = checkBlock(request, newState);
                                return (blockException == null || !blockException.retryable());
                            }
                        }
                );

            } else {
                try {
                    threadPool.executor(executor).execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                masterOperation(request, clusterService.state(), listener);
                            } catch (Throwable e) {
                                listener.onFailure(e);
                            }
                        }
                    });
                } catch (Throwable t) {
                    listener.onFailure(t);
                }
            }
        } else {
            if (nodes.masterNode() == null) {
                if (retrying) {
                    listener.onFailure(new MasterNotDiscoveredException());
                } else {
                    logger.debug("no known master node, scheduling a retry");
                    observer.waitForNextChange(
                            new ClusterStateObserver.Listener() {
                                @Override
                                public void onNewClusterState(ClusterState state) {
                                    // 集群狀態(tài)發(fā)生了改變, 重新執(zhí)行該方法
                                    innerExecute(request, listener, observer, true);
                                }

                                @Override
                                public void onClusterServiceClose() {
                                    listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                }

                                @Override
                                public void onTimeout(TimeValue timeout) {
                                    listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
                                }
                            }, new ClusterStateObserver.ChangePredicate() {
                                @Override
                                public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
                                                     ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
                                    return newState.nodes().masterNodeId() != null;
                                }

                                @Override
                                public boolean apply(ClusterChangedEvent event) {
                                    return event.nodesDelta().masterNodeChanged();
                                }
                            }
                    );
                }
                return;
            }
            processBeforeDelegationToMaster(request, clusterState);

            transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
                @Override
                public Response newInstance() {
                    return newResponse();
                }

                @Override
                public void handleResponse(Response response) {
                    listener.onResponse(response);
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }

                @Override
                public void handleException(final TransportException exp) {
                    if (exp.unwrapCause() instanceof ConnectTransportException) {
                        // we want to retry here a bit to see if a new master is elected
                        logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
                                nodes.masterNode(), exp.getDetailedMessage());
                        observer.waitForNextChange(new ClusterStateObserver.Listener() {
                                                       @Override
                                                       public void onNewClusterState(ClusterState state) {
                                                           innerExecute(request, listener, observer, false);
                                                       }

                                                       @Override
                                                       public void onClusterServiceClose() {
                                                           listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                                       }

                                                       @Override
                                                       public void onTimeout(TimeValue timeout) {
                                                           listener.onFailure(new MasterNotDiscoveredException());
                                                       }
                                                   }, new ClusterStateObserver.EventPredicate() {
                                                       @Override
                                                       public boolean apply(ClusterChangedEvent event) {
                                                           return event.nodesDelta().masterNodeChanged();
                                                       }
                                                   }
                        );
                    } else {
                        listener.onFailure(exp);
                    }
                }
            });
        }
    }
}

這個操作主要保證了兩點:
(1)如果當前節(jié)點不是master暖呕,則將請求發(fā)送到master節(jié)點執(zhí)行masterOperation()方法
(2)如果當前集群block了斜做,則等待集群狀態(tài)更新,然后重新執(zhí)行完整的innerExecute()方法

然后進入到TransportCreateIndexAction.masterOperation()方法中湾揽,創(chuàng)建CreateIndexClusterStateUpdateRequest對象瓤逼,用來創(chuàng)建索引時更新集群狀態(tài)信息的請求,其中settings和mappings及aliases默認為空集合

public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
    private final MetaDataCreateIndexService createIndexService;

    @Override
    protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
        String cause = request.cause();
        if (cause.length() == 0) {
            cause = "api";
        }

        final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                .settings(request.settings()).mappings(request.mappings())
                .aliases(request.aliases()).customs(request.customs());

        // 執(zhí)行創(chuàng)建索引
        createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
            @Override
            public void onResponse(ClusterStateUpdateResponse response) {
                listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
            }

            @Override
            public void onFailure(Throwable t) {
                if (t instanceof IndexAlreadyExistsException) {
                    logger.trace("[{}] failed to create", t, request.index());
                } else {
                    logger.debug("[{}] failed to create", t, request.index());
                }
                listener.onFailure(t);
            }
        });
    }
}

然后調用MetaDataCreateIndexService的createIndex()方法库物,如果能獲取到鎖信息則直接執(zhí)行重載的createIndex()方法霸旗,否則交給線程池去執(zhí)行

public class MetaDataCreateIndexService extends AbstractComponent {
    public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
        final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());

        if (mdLock.tryAcquire()) {
            createIndex(request, listener, mdLock);
            return;
        }
        threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
            @Override
            public void doRun() throws InterruptedException {
                if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
                    listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
                    return;
                }
                createIndex(request, listener, mdLock);
            }
        });
    }
}

在重載從createIndex()方法中,通過提交一個更新集群狀態(tài)的任務來實現創(chuàng)建索引的具體邏輯

public class MetaDataCreateIndexService extends AbstractComponent {
    private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {

        ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
        updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
        request.settings(updatedSettingsBuilder.build());

        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
                Priority.URGENT,
                new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {

            @Override
            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                return new ClusterStateUpdateResponse(acknowledged);
            }

            @Override
            public void onAllNodesAcked(@Nullable Throwable t) {
                mdLock.release();
                super.onAllNodesAcked(t);
            }

            @Override
            public void onAckTimeout() {
                mdLock.release();
                super.onAckTimeout();
            }

            @Override
            public void onFailure(String source, Throwable t) {
                mdLock.release();
                super.onFailure(source, t);
            }

            @Override
            public ClusterState execute(ClusterState currentState) throws Exception {
                // 創(chuàng)建索引的具體邏輯
                // ...
            }
        });
    }
}

提交StateUpdateTask任務時戚揭,會創(chuàng)建一個UpdateTask對象诱告,然后執(zhí)行其run()方法,即MetaDataCreateIndexService中創(chuàng)建的AckedClusterStateUpdateTask匿名對象

public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
    public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            final UpdateTask task = new UpdateTask(source, priority, updateTask);
            if (updateTask instanceof TimeoutClusterStateUpdateTask) {
                final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
                updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
                    @Override
                    public void run() {
                        threadPool.generic().execute(new Runnable() {
                            @Override
                            public void run() {
                                timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
                            }
                        });
                    }
                });
            } else {
                updateTasksExecutor.execute(task);
            }
        } catch (EsRejectedExecutionException e) {
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }
}

在UpdateTask的run()方法中民晒,會調用ClusterStateUpdateTask.execute()方法獲取新的集群狀態(tài)精居,

class UpdateTask extends TimedPrioritizedRunnable {

        public final ClusterStateUpdateTask updateTask;


        UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
            super(priority, source);
            this.updateTask = updateTask;
        }

        @Override
        public void run() {
            if (!lifecycle.started()) {
                logger.debug("processing [{}]: ignoring, cluster_service not started", source);
                return;
            }
            logger.debug("processing [{}]: execute", source);
            ClusterState previousClusterState = clusterState;
            // 當前節(jié)點是否為master
            if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
                logger.debug("failing [{}]: local node is no longer master", source);
                updateTask.onNoLongerMaster(source);
                return;
            }
            // 新的集群狀態(tài)
            ClusterState newClusterState;
            long startTimeNS = System.nanoTime();
            try {
                // 調用task的execute方法,獲取新的集群狀態(tài)
                newClusterState = updateTask.execute(previousClusterState);
            } catch (Throwable e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                if (logger.isTraceEnabled()) {
                    StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
                    sb.append(previousClusterState.nodes().prettyPrint());
                    sb.append(previousClusterState.routingTable().prettyPrint());
                    sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
                    logger.trace(sb.toString(), e);
                }
                warnAboutSlowTaskIfNeeded(executionTime, source);
                updateTask.onFailure(source, e);
                return;
            }

            // 集群狀態(tài)沒有發(fā)生更改
            if (previousClusterState == newClusterState) {
                if (updateTask instanceof AckedClusterStateUpdateTask) {
                    //no need to wait for ack if nothing changed, the update can be counted as acknowledged
                    ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
                }
                if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                    ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                }
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
                warnAboutSlowTaskIfNeeded(executionTime, source);
                return;
            }

            try {
                Discovery.AckListener ackListener = new NoOpAckListener();
                // 當前節(jié)點是master
                if (newClusterState.nodes().localNodeMaster()) {
                    // only the master controls the version numbers
                    Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
                    // 重新構建routing table
                    if (previousClusterState.routingTable() != newClusterState.routingTable()) {
                        builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
                    }
                    // 重新構建meta data
                    if (previousClusterState.metaData() != newClusterState.metaData()) {
                        builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
                    }
                    newClusterState = builder.build();

                    if (updateTask instanceof AckedClusterStateUpdateTask) {
                        final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
                        if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
                            ackedUpdateTask.onAckTimeout();
                        } else {
                            try {
                                ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
                            } catch (EsRejectedExecutionException ex) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
                                }
                                //timeout straightaway, otherwise we could wait forever as the timeout thread has not started
                                ackedUpdateTask.onAckTimeout();
                            }
                        }
                    }
                }

                newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);

                if (logger.isTraceEnabled()) {
                    StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
                    sb.append(newClusterState.prettyPrint());
                    logger.trace(sb.toString());
                } else if (logger.isDebugEnabled()) {
                    logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
                }

                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
                // new cluster state, notify all listeners
                final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                    String summary = nodesDelta.shortSummary();
                    if (summary.length() > 0) {
                        logger.info("{}, reason: {}", summary, source);
                    }
                }

                // TODO, do this in parallel (and wait)
                for (DiscoveryNode node : nodesDelta.addedNodes()) {
                    if (!nodeRequiresConnection(node)) {
                        continue;
                    }
                    try {
                        transportService.connectToNode(node);
                    } catch (Throwable e) {
                        // the fault detection will detect it as failed as well
                        logger.warn("failed to connect to node [" + node + "]", e);
                    }
                }

                // if we are the master, publish the new state to all nodes
                // we publish here before we send a notification to all the listeners, since if it fails
                // we don't want to notify
                if (newClusterState.nodes().localNodeMaster()) {
                    logger.debug("publishing cluster state version {}", newClusterState.version());
                    discoveryService.publish(newClusterState, ackListener);
                }

                // update the current cluster state
                // 更新集群的state
                clusterState = newClusterState;
                logger.debug("set local cluster state to version {}", newClusterState.version());
                for (ClusterStateListener listener : preAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }

                for (DiscoveryNode node : nodesDelta.removedNodes()) {
                    try {
                        transportService.disconnectFromNode(node);
                    } catch (Throwable e) {
                        logger.warn("failed to disconnect to node [" + node + "]", e);
                    }
                }

                newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);

                for (ClusterStateListener listener : postAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }

                //manual ack only from the master at the end of the publish
                if (newClusterState.nodes().localNodeMaster()) {
                    try {
                        ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
                    } catch (Throwable t) {
                        logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
                    }
                }

                // 調用task的clusterStateProcessed()方法
                if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                    ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                }

                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {})", source, executionTime, newClusterState.version());
                warnAboutSlowTaskIfNeeded(executionTime, source);
            } catch (Throwable t) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append("], source [").append(source).append("]\n");
                sb.append(newClusterState.nodes().prettyPrint());
                sb.append(newClusterState.routingTable().prettyPrint());
                sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
                logger.warn(sb.toString(), t);
                // TODO: do we want to call updateTask.onFailure here?
            }
        }
    }

在完成索引創(chuàng)建完成后,集群狀態(tài)信息會發(fā)生變化潜必,elasticsearch會將這個變化發(fā)布到其他節(jié)點靴姿,以維持集群統一的狀態(tài)信息

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市磁滚,隨后出現的幾起案子佛吓,更是在濱河造成了極大的恐慌宵晚,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件维雇,死亡現場離奇詭異淤刃,居然都是意外死亡,警方通過查閱死者的電腦和手機谆沃,發(fā)現死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門钝凶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人唁影,你說我怎么就攤上這事耕陷。” “怎么了据沈?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵哟沫,是天一觀的道長。 經常有香客問我锌介,道長嗜诀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任孔祸,我火速辦了婚禮隆敢,結果婚禮上,老公的妹妹穿的比我還像新娘崔慧。我一直安慰自己拂蝎,他們只是感情好,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布惶室。 她就那樣靜靜地躺著温自,像睡著了一般。 火紅的嫁衣襯著肌膚如雪皇钞。 梳的紋絲不亂的頭發(fā)上悼泌,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天,我揣著相機與錄音夹界,去河邊找鬼馆里。 笑死,一個胖子當著我的面吹牛可柿,可吹牛的內容都是我干的也拜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼趾痘,長吁一口氣:“原來是場噩夢啊……” “哼慢哈!你這毒婦竟也來了?” 一聲冷哼從身側響起永票,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤卵贱,失蹤者是張志新(化名)和其女友劉穎滥沫,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體键俱,經...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡兰绣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了编振。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缀辩。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖踪央,靈堂內的尸體忽然破棺而出臀玄,到底是詐尸還是另有隱情,我是刑警寧澤畅蹂,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布健无,位于F島的核電站,受9級特大地震影響液斜,放射性物質發(fā)生泄漏累贤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一少漆、第九天 我趴在偏房一處隱蔽的房頂上張望臼膏。 院中可真熱鬧,春花似錦示损、人聲如沸渗磅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夺溢。三九已至论巍,卻和暖如春烛谊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背嘉汰。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工丹禀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鞋怀。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓双泪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親密似。 傳聞我的和親對象是個殘疾皇子焙矛,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353