1. 一個簡單的索引請求示例
首先,我們來看一個索引請求:
curl -XPUT 127.0.0.1:9200/item/show/28589790
{
"id": 28589790,
"text": "這是一個索引文本"
}
這個請求的主要作用是向item索引中添加一個索引文檔缭付,文檔信息:
文檔 id: 28589790
字段id: 28589790
字段text: 這是一個索引文本
如果索引中已經包含id為28589790的索引捐祠,elasticsearch將會使用這條數據進行覆蓋
2. 索引時序圖
3. 索引請求轉發(fā)
- 在elasticsearch啟動時亥至,會注入RestSearchAction 對象信不,并且會把方法苔埋、URI 和當前對象注冊到內存中
public class RestIndexAction extends BaseRestHandler {
@Inject
public RestIndexAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
controller.registerHandler(POST, "/{index}/{type}/{id}", this);
CreateHandler createHandler = new CreateHandler(settings, controller, client);
controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
}
}
elasticsearch使用HttpRequestHandler.messageReceived()方法接受用戶請求懦砂,然后調用dispatchRequest()方法對請求進行轉發(fā)。
當請求跳轉到RestController時组橄,會調用getHandler()方法根據請求的Path獲取對應的handler荞膘,由上文可以看出item/show/28589790
會匹配到RestIndexAction
public class RestController extends AbstractLifecycleComponent<RestController> {
void executeHandler(RestRequest request, RestChannel channel) throws Exception {
final RestHandler handler = getHandler(request);
if (handler != null) {
handler.handleRequest(request, channel);
} else {
if (request.method() == RestRequest.Method.OPTIONS) {
// when we have OPTIONS request
// simply send OK by default (with the Access Control Origin header which gets automatically added)
channel.sendResponse(new BytesRestResponse(OK));
} else {
channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
}
}
}
}
handler.handleRequest()方法最終會調用RestIndexAction.handleRequest()方法對索引參數進行解析,創(chuàng)建索引請求對象indexRequest玉工,然后調用client.index()開始創(chuàng)建索引
public class RestIndexAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(true);
indexRequest.routing(request.param("routing"));
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
indexRequest.timestamp(request.param("timestamp"));
if (request.hasParam("ttl")) {
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
}
indexRequest.source(request.content());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
String sOpType = request.param("op_type");
if (sOpType != null) {
try {
indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
} catch (ElasticsearchIllegalArgumentException eia){
try {
XContentBuilder builder = channel.newErrorBuilder();
channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
return;
}
}
}
String replicationType = request.param("replication");
if (replicationType != null) {
indexRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
@Override
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
builder.startObject()
.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion())
.field(Fields.CREATED, response.isCreated());
builder.endObject();
RestStatus status = OK;
if (response.isCreated()) {
status = CREATED;
}
return new BytesRestResponse(status, builder);
}
});
}
}
在索引請求中羽资,支持下列參數:
routing: 路由信息,具有相同路由信息的文檔存儲在同一分片上
parent: 文檔的parent id遵班, 如果未設置路由屠升,則會自動將其設置為路由
timestamp: 文檔產生的時間戳
ttl: 過期時間
timeout: 超時時間
refresh: 此索引操作之后是否執(zhí)行刷新,從而使文檔可被搜索狭郑,默認為false
version: 文檔的版本號
version_type: 版本類型弥激,默認internal,支持internal愿阐、external微服、external_gt、external_gte和force
op_type: 索引操作類型缨历,支持create和index
replication: 副本類型以蕴,支持async、sync和default
consistency: 一致性辛孵,支持one丛肮、quorum、all和default
請求的content即索引的source魄缚,文檔內容
在封裝完索引請求后宝与,就要調用 client.index() 執(zhí)行索引
4. 創(chuàng)建索引入口
在index()方法中焚廊,使用的Action是IndexAction.INSTANCE
public abstract class AbstractClient implements Client {
@Override
public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
execute(IndexAction.INSTANCE, request, listener);
}
}
這個action在ActionModule中被TransportIndexAction注冊
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
}
}
因此在NodeClient的execute()方法中根據action獲取到的transport action為TransportIndexAction
public class NodeClient extends AbstractClient {
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); // TransportIndexAction
transportAction.execute(request, listener);
}
}
由于TransportIndexAction繼承了TransportAction,因此調用過程為NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
索引的大體流程為:先判斷是否需要創(chuàng)建索引习劫,如果是則先創(chuàng)建索引咆瘟,然后寫入文檔數據,否則直接寫入文檔數據
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
@Override
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
}
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(request, listener);
}
}
}
elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法來判斷是否需要創(chuàng)建索引
public class AutoCreateIndex {
public AutoCreateIndex(Settings settings) {
String value = settings.get("action.auto_create_index");
if (value == null || Booleans.isExplicitTrue(value)) {
needToCheck = true;
globallyDisabled = false;
matches = null;
matches2 = null;
} else if (Booleans.isExplicitFalse(value)) {
needToCheck = false;
globallyDisabled = true;
matches = null;
matches2 = null;
} else {
needToCheck = true;
globallyDisabled = false;
matches = Strings.commaDelimitedListToStringArray(value);
matches2 = new String[matches.length];
for (int i = 0; i < matches.length; i++) {
matches2[i] = matches[i].substring(1);
}
}
}
public boolean shouldAutoCreate(String index, ClusterState state) {
if (!needToCheck) {
return false;
}
if (state.metaData().hasConcreteIndex(index)) {
return false;
}
if (globallyDisabled) {
return false;
}
if (matches == null) {
return true;
}
for (int i = 0; i < matches.length; i++) {
char c = matches[i].charAt(0);
if (c == '-') {
if (Regex.simpleMatch(matches2[i], index)) {
return false;
}
} else if (c == '+') {
if (Regex.simpleMatch(matches2[i], index)) {
return true;
}
} else {
if (Regex.simpleMatch(matches[i], index)) {
return true;
}
}
}
return false;
}
}
其中參數和globallyDisabled的含義:
action.auto_create_index: elasticsearch配置文件的的配置項诽里,表示是否允許創(chuàng)建索引
needToCheck: 是否需要檢查能否創(chuàng)建索引袒餐,只有當action.auto_create_index為false時不需要檢查,直接返回無法創(chuàng)建索引
globallyDisabled: 是否全局禁用創(chuàng)建索引谤狡,只有當action.auto_create_index為false時全局禁用創(chuàng)建索引灸眼,直接返回無法創(chuàng)建索引
如果當前集群中已經包含了要創(chuàng)建的索引,那么也不需要創(chuàng)建索引墓懂。其他情況則根據action.auto_create_index配置的正則表達式來判斷
如果允許創(chuàng)建索引焰宣,則開始創(chuàng)建索引名的流程
5. 創(chuàng)建索引名
首先創(chuàng)建創(chuàng)建索引
的請求createIndexRequest,設置了4個參數捕仔,分別是索引名index匕积、索引mapping、創(chuàng)建索引的原因cause和master節(jié)點超時時間masterNodeTimeout
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());
然后開始調用createIndexAction.execute()方法創(chuàng)建索引名
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
public final void execute(Request request, ActionListener<Response> listener) {
if (forceThreadedListener()) {
request.listenerThreaded(true);
}
if (request.listenerThreaded()) {
listener = new ThreadedActionListener<>(threadPool, listener, logger);
}
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (filters.length == 0) {
try {
// TransportAction 子類都要重寫這個方法
doExecute(request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
}
} else {
RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(actionName, request, listener);
}
}
rotected abstract void doExecute(Request request, ActionListener<Response> listener);
}
從下面的類圖可以看出逻澳,TransportCreateIndexAction繼承了TransportMasterNodeOperation,調用過程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法來完成操作
在TransportMasterNodeOperation中主要是保證操作在master節(jié)點上執(zhí)行
public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
}
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
final ClusterState clusterState = observer.observedState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
return;
}
logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(blockException);
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
}
);
} else {
try {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
masterOperation(request, clusterService.state(), listener);
} catch (Throwable e) {
listener.onFailure(e);
}
}
});
} catch (Throwable t) {
listener.onFailure(t);
}
}
} else {
if (nodes.masterNode() == null) {
if (retrying) {
listener.onFailure(new MasterNotDiscoveredException());
} else {
logger.debug("no known master node, scheduling a retry");
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
// 集群狀態(tài)發(fā)生了改變, 重新執(zhí)行該方法
innerExecute(request, listener, observer, true);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
}, new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return newState.nodes().masterNodeId() != null;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
}
return;
}
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleException(final TransportException exp) {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
nodes.masterNode(), exp.getDetailedMessage());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException());
}
}, new ClusterStateObserver.EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
} else {
listener.onFailure(exp);
}
}
});
}
}
}
這個操作主要保證了兩點:
(1)如果當前節(jié)點不是master暖呕,則將請求發(fā)送到master節(jié)點執(zhí)行masterOperation()方法
(2)如果當前集群block了斜做,則等待集群狀態(tài)更新,然后重新執(zhí)行完整的innerExecute()方法
然后進入到TransportCreateIndexAction.masterOperation()方法中湾揽,創(chuàng)建CreateIndexClusterStateUpdateRequest對象瓤逼,用來創(chuàng)建索引時更新集群狀態(tài)信息的請求,其中settings和mappings及aliases默認為空集合
public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
private final MetaDataCreateIndexService createIndexService;
@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
}
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
// 執(zhí)行創(chuàng)建索引
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
}
}
然后調用MetaDataCreateIndexService的createIndex()方法库物,如果能獲取到鎖信息則直接執(zhí)行重載的createIndex()方法霸旗,否則交給線程池去執(zhí)行
public class MetaDataCreateIndexService extends AbstractComponent {
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
if (mdLock.tryAcquire()) {
createIndex(request, listener, mdLock);
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
@Override
public void doRun() throws InterruptedException {
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
return;
}
createIndex(request, listener, mdLock);
}
});
}
}
在重載從createIndex()方法中,通過提交一個更新集群狀態(tài)的任務來實現創(chuàng)建索引的具體邏輯
public class MetaDataCreateIndexService extends AbstractComponent {
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
Priority.URGENT,
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
mdLock.release();
super.onAllNodesAcked(t);
}
@Override
public void onAckTimeout() {
mdLock.release();
super.onAckTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
mdLock.release();
super.onFailure(source, t);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// 創(chuàng)建索引的具體邏輯
// ...
}
});
}
}
提交StateUpdateTask任務時戚揭,會創(chuàng)建一個UpdateTask對象诱告,然后執(zhí)行其run()方法,即MetaDataCreateIndexService中創(chuàng)建的AckedClusterStateUpdateTask匿名對象
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
try {
final UpdateTask task = new UpdateTask(source, priority, updateTask);
if (updateTask instanceof TimeoutClusterStateUpdateTask) {
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
}
});
}
});
} else {
updateTasksExecutor.execute(task);
}
} catch (EsRejectedExecutionException e) {
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
}
在UpdateTask的run()方法中民晒,會調用ClusterStateUpdateTask.execute()方法獲取新的集群狀態(tài)精居,
class UpdateTask extends TimedPrioritizedRunnable {
public final ClusterStateUpdateTask updateTask;
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority, source);
this.updateTask = updateTask;
}
@Override
public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
ClusterState previousClusterState = clusterState;
// 當前節(jié)點是否為master
if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", source);
updateTask.onNoLongerMaster(source);
return;
}
// 新的集群狀態(tài)
ClusterState newClusterState;
long startTimeNS = System.nanoTime();
try {
// 調用task的execute方法,獲取新的集群狀態(tài)
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
updateTask.onFailure(source, e);
return;
}
// 集群狀態(tài)沒有發(fā)生更改
if (previousClusterState == newClusterState) {
if (updateTask instanceof AckedClusterStateUpdateTask) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}
try {
Discovery.AckListener ackListener = new NoOpAckListener();
// 當前節(jié)點是master
if (newClusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
// 重新構建routing table
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
}
// 重新構建meta data
if (previousClusterState.metaData() != newClusterState.metaData()) {
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
}
newClusterState = builder.build();
if (updateTask instanceof AckedClusterStateUpdateTask) {
final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
ackedUpdateTask.onAckTimeout();
} else {
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
}
//timeout straightaway, otherwise we could wait forever as the timeout thread has not started
ackedUpdateTask.onAckTimeout();
}
}
}
}
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
sb.append(newClusterState.prettyPrint());
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
}
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, source);
}
}
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
if (!nodeRequiresConnection(node)) {
continue;
}
try {
transportService.connectToNode(node);
} catch (Throwable e) {
// the fault detection will detect it as failed as well
logger.warn("failed to connect to node [" + node + "]", e);
}
}
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version {}", newClusterState.version());
discoveryService.publish(newClusterState, ackListener);
}
// update the current cluster state
// 更新集群的state
clusterState = newClusterState;
logger.debug("set local cluster state to version {}", newClusterState.version());
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
for (DiscoveryNode node : nodesDelta.removedNodes()) {
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
for (ClusterStateListener listener : postAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
} catch (Throwable t) {
logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
}
}
// 調用task的clusterStateProcessed()方法
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {})", source, executionTime, newClusterState.version());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
logger.warn(sb.toString(), t);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
在完成索引創(chuàng)建完成后,集群狀態(tài)信息會發(fā)生變化潜必,elasticsearch會將這個變化發(fā)布到其他節(jié)點靴姿,以維持集群統一的狀態(tài)信息