上篇文章中最后提到索引創(chuàng)建完畢后會(huì)生成一個(gè)集群變更事件扛门,而該事件是通過(guò)clusterStatePublisher發(fā)送給集群中每個(gè)節(jié)點(diǎn)的蒙揣。本文繼續(xù)分析在shard split中,目標(biāo)shard底層的數(shù)據(jù)是怎么恢復(fù)的。
clusterStatePublisher是一個(gè)函數(shù)式接口對(duì)象,被設(shè)置為ZenDiscovery模塊中的publish方法父泳。
ZenDiscovery
public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
ClusterState newState = clusterChangedEvent.state();
assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();
// state got changed locally (maybe because another master published to us)
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
}
pendingStatesQueue.addPending(newState);
try {
//將本次集群狀態(tài)更改事件發(fā)到其它節(jié)點(diǎn),內(nèi)部會(huì)等到至少minMaster個(gè)節(jié)點(diǎn)響應(yīng)后才將這次更改標(biāo)記為commited吴汪,同時(shí)會(huì)發(fā)送commited信息到其它節(jié)點(diǎn)惠窄。
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
newState.version(), electMaster.minimumMasterNodes());
synchronized (stateMutex) {
pendingStatesQueue.failAllStatesAndClear(
new ElasticsearchException("failed to publish cluster state"));
rejoin("zen-disco-failed-to-publish");
}
throw t;
}
//執(zhí)行到這里說(shuō)明集群狀態(tài)已經(jīng)被commited了。
final DiscoveryNode localNode = newState.getNodes().getLocalNode();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean processedOrFailed = new AtomicBoolean();
pendingStatesQueue.markAsCommitted(newState.stateUUID(),
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedOrFailed.set(true);
latch.countDown();
ackListener.onNodeAck(localNode, null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedOrFailed.set(true);
latch.countDown();
ackListener.onNodeAck(localNode, e);
logger.warn(
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"failed while applying cluster state locally [{}]",
clusterChangedEvent.source()),
e);
}
});
synchronized (stateMutex) {
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes");
}
//開(kāi)始應(yīng)用本次集群變更漾橙,這里只是發(fā)消息的節(jié)點(diǎn)處理本次集群變更杆融,其它節(jié)點(diǎn)是收到commit信息后才會(huì)調(diào)用該函數(shù)處理本次集群變更。
boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
" committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");
if (sentToApplier == false && processedOrFailed.get() == false) {
assert false : "cluster state published locally neither processed nor failed: " + newState;
logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
newState.version());
return;
}
}
// indefinitely wait for cluster state to be applied locally
try {
latch.await();
} catch (InterruptedException e) {
logger.debug(
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"interrupted while applying cluster state locally [{}]",
clusterChangedEvent.source()),
e);
Thread.currentThread().interrupt();
}
}
這里也不詳細(xì)講解集群狀態(tài)怎么publish的霜运。先簡(jiǎn)單介紹下脾歇,該函數(shù)會(huì)把新的集群狀態(tài)發(fā)到集群中的除自身節(jié)點(diǎn)外的所有節(jié)點(diǎn)。當(dāng)至少有minMaster節(jié)點(diǎn)響應(yīng)后淘捡,將這次集群狀態(tài)修改變?yōu)閏ommited藕各。同時(shí)會(huì)發(fā)送commit信息到其它節(jié)點(diǎn)。當(dāng)其它節(jié)點(diǎn)收到commit信息后會(huì)processNextCommittedClusterState函數(shù)處理已經(jīng)被commit的集群狀態(tài)焦除。在該函數(shù)中就是應(yīng)用這次集群狀態(tài)更改的地方激况,來(lái)保證底層數(shù)據(jù)和commited的集群狀態(tài)一致。
boolean processNextCommittedClusterState(String reason) {
assert Thread.holdsLock(stateMutex);
...
//前面主要是一些異常檢測(cè)和狀態(tài)檢測(cè)膘魄,到這里才是應(yīng)用集群狀態(tài)的入口
clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
this::clusterState,
new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {
onFailure(source, e);
}
}
@Override
public void onFailure(String source, Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);
try {
// TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
// for too long.
pendingStatesQueue.markAsFailed(newClusterState, e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
}
}
});
return true;
}
這里省略了一些異澄谥穑控制代碼,先直接找到應(yīng)用集群狀態(tài)的入口瓣距。這里是調(diào)用了clusterApplier的onNewClusterState函數(shù)黔帕,clusterAplier是一個(gè)ClusterApplierService對(duì)象代咸。并且傳入了一個(gè)listener蹈丸,當(dāng)處理完成會(huì)調(diào)用listener的clusterStateProcessed函數(shù),將當(dāng)前這次集群狀態(tài)更改標(biāo)記為已經(jīng)處理。
ClusterApplierService
public void onNewClusterState(final String source, final java.util.function.Supplier<ClusterState> clusterStateSupplier,
final ClusterStateTaskListener listener) {
Function<ClusterState, ClusterState> applyFunction = currentState -> {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
return nextState;
} else {
return currentState;
}
};
submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
}
private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
final Function<ClusterState, ClusterState> executor,
final ClusterStateTaskListener listener) {
if (!lifecycle.started()) {
return;
}
try {
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),
() -> threadPool.generic().execute(
() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
} else {
threadPoolExecutor.execute(updateTask);
}
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
看到這里是不是覺(jué)得和前面介紹的創(chuàng)建索引的任務(wù)執(zhí)行有點(diǎn)類(lèi)似逻杖,其實(shí)創(chuàng)建索引的任務(wù)提交是在ClusterService中完成的奋岁,而集群狀態(tài)應(yīng)用任務(wù)是在ClusterApplierService中完成的。創(chuàng)建索引會(huì)異步執(zhí)行一個(gè)task來(lái)完成荸百,最終結(jié)果是影響到集群狀態(tài)闻伶,生成新的集群狀態(tài)。而當(dāng)生成新的集群狀態(tài)并且該新的其群狀態(tài)被commit后够话,保證背后的數(shù)據(jù)處于新的集群狀態(tài)又會(huì)創(chuàng)建一個(gè)異步的task來(lái)執(zhí)行蓝翰,該task是在ClusterApplierService中提交的。
這里同樣是先產(chǎn)生一個(gè)UpdateTask任務(wù)女嘲,傳入的config對(duì)象標(biāo)記為Priority.HIGH畜份,并且沒(méi)有設(shè)置超時(shí)。所以直接調(diào)用threadPoolExecutor的execute方法欣尼。這里的threadPoolExecutor也是一個(gè)PrioritizedEsThreadPoolExecutor對(duì)象爆雹,因此執(zhí)行任務(wù)的過(guò)程和之前創(chuàng)建索引的任務(wù)執(zhí)行過(guò)程一樣了。這里就不在分析愕鼓。直接看UpdateTask的run方法
ClusterApplierService.UpdateTask
public void run() {
runTask(this);
}
ClusterApplierService
protected void runTask(UpdateTask task) {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
return;
}
logger.debug("processing [{}]: execute", task.source);
final ClusterState previousClusterState = state.get();
long startTimeNS = currentTimeInNanos();
final ClusterState newClusterState;
try {
//獲取新的待處理的集群狀態(tài)
newClusterState = task.apply(previousClusterState);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
if (logger.isTraceEnabled()) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
executionTime,
previousClusterState.version(),
task.source,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()),
e);
}
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onFailure(task.source, e);
return;
}
if (previousClusterState == newClusterState) {
task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
} else {
//新的集群狀態(tài)和老的集群狀態(tài)不相等
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
}
try {
applyChanges(task, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
task.source,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
當(dāng)新的集群狀態(tài)和老的集群狀態(tài)不相等時(shí)钙态,調(diào)用applyChanges方法
private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
//封裝一個(gè)ClusterChanedEvent
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, task.source);
}
}
//判斷所有的節(jié)點(diǎn)是否都能連接上
nodeConnectionsService.connectToNodes(newClusterState.nodes());
logger.debug("applying cluster state version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}
} catch (Exception ex) {
logger.warn("failed to apply cluster settings", ex);
}
//這里是入口
logger.debug("apply cluster state with version {}", newClusterState.version());
callClusterStateAppliers(clusterChangedEvent);
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
logger.debug("set locally applied cluster state to version {}", newClusterState.version());
state.set(newClusterState);
callClusterStateListeners(clusterChangedEvent);
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}
直接進(jìn)入callClusterStateAppliers函數(shù)
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
clusterStateAppliers.forEach(applier -> {
try {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateApplier", ex);
}
});
}
這里會(huì)對(duì)clusterStateAppliers數(shù)組中的每個(gè)applier都調(diào)用一次applyClusterState函數(shù),我們直接看IndicesClusterStateService把菇晃,這也是一個(gè)ClusterStateApplier册倒。最終是在IndicesClusterStateService里面處理的。
IndicesClusterStateService
public synchronized void applyClusterState(final ClusterChangedEvent event) {
if (!lifecycle.started()) {
return;
}
final ClusterState state = event.state();
// we need to clean the shards and indices we have on this node, since we
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
if (state.blocks().disableStatePersistence()) {
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
"cleaning index (disabled block persistence)"); // also cleans shards
}
return;
}
updateFailedShardsCache(state);
deleteIndices(event); // also deletes shards of deleted indices
removeUnallocatedIndices(event); // also removes shards of removed indices
failMissingShards(state);
removeShards(state); // removes any local shards that doesn't match what the master expects
updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache
createIndices(state);
createOrUpdateShards(state);
}
在這里統(tǒng)一對(duì)各種狀態(tài)處理磺送,比如刪除索引剩失,刪除shard等。因?yàn)閟plit其實(shí)是一種創(chuàng)建索引册着,所以直接進(jìn)入createIndices函數(shù)拴孤。
private void createIndices(final ClusterState state) {
// we only create indices for shards that are allocated
//從這里可以看到每個(gè)節(jié)點(diǎn)只創(chuàng)建那些分配給自己的索引
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode == null) {
return;
}
// create map of indices to create with shards to fail if index creation fails
final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
for (ShardRouting shardRouting : localRoutingNode) {
//遍歷分配到該節(jié)點(diǎn)的每個(gè)shard
if (failedShardsCache.containsKey(shardRouting.shardId()) == false) {
final Index index = shardRouting.index();
//如果該節(jié)點(diǎn)上還沒(méi)有創(chuàng)建該索引,則放入indicesToCreate甲捏,表示需要?jiǎng)?chuàng)建
if (indicesService.indexService(index) == null) {
indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
}
}
}
for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
final Index index = entry.getKey();
final IndexMetaData indexMetaData = state.metaData().index(index);
logger.debug("[{}] creating index", index);
AllocatedIndex<? extends Shard> indexService = null;
try {
//到這里開(kāi)始創(chuàng)建索引演熟,執(zhí)行過(guò)程和前面講過(guò)的一樣,這里不再分析司顿。
indexService = indicesService.createIndex(indexMetaData, buildInIndexListener);
if (indexService.updateMapping(indexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(),
indexMetaData.getIndexUUID(), state.nodes().getLocalNodeId())
);
}
} catch (Exception e) {
final String failShardReason;
if (indexService == null) {
failShardReason = "failed to create index";
} else {
failShardReason = "failed to update mapping for index";
indicesService.removeIndex(index, FAILURE, "removing index (mapping update failed)");
}
for (ShardRouting shardRouting : entry.getValue()) {
sendFailShard(shardRouting, failShardReason, e, state);
}
}
}
}
這里有人可能會(huì)好奇芒粹,之前不是都已經(jīng)創(chuàng)建好索引了嗎,為什么這里還要在創(chuàng)建索引大溜。其實(shí)之前創(chuàng)建索引是在master節(jié)點(diǎn)上創(chuàng)建的化漆,并且生成一個(gè)集群變更事件。然后把這個(gè)事件發(fā)送到其它節(jié)點(diǎn)上钦奋。這個(gè)時(shí)候其它節(jié)點(diǎn)上其實(shí)還沒(méi)有創(chuàng)建該索引座云,當(dāng)其它節(jié)點(diǎn)收到集群變更事件后疙赠,就會(huì)執(zhí)行到這一步,即進(jìn)入到createIndices里邊創(chuàng)建索引朦拖。創(chuàng)建索引也是調(diào)用IndicesService的createIndex函數(shù)圃阳,和前文介紹的過(guò)程是一樣的。
當(dāng)索引創(chuàng)建完后回到applyClusterState函數(shù)中璧帝,后面還有一步創(chuàng)建shard的過(guò)程捍岳。接下來(lái)看下createOrUpdateShards是怎么實(shí)現(xiàn)的。
private void createOrUpdateShards(final ClusterState state) {
//同樣只創(chuàng)建分配給自己的shard
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode == null) {
return;
}
DiscoveryNodes nodes = state.nodes();
RoutingTable routingTable = state.routingTable();
for (final ShardRouting shardRouting : localRoutingNode) {
//遍歷每一個(gè)shard
ShardId shardId = shardRouting.shardId();
if (failedShardsCache.containsKey(shardId) == false) {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
//索引必須在shard之前建好
assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";
Shard shard = indexService.getShardOrNull(shardId.id());
if (shard == null) {
assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
//創(chuàng)建shard
createShard(nodes, routingTable, shardRouting, state);
} else {
updateShard(nodes, shardRouting, shard, routingTable, state);
}
}
}
}
這里遍歷每一個(gè)分配給自己的shard睬隶,如果shard還沒(méi)創(chuàng)建锣夹,則調(diào)用createShard創(chuàng)建shard
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;
DiscoveryNode sourceNode = null;
if (shardRouting.recoverySource().getType() == Type.PEER) {
//如果shard是從peer恢復(fù),則找到恢復(fù)源節(jié)點(diǎn)苏潜,之前講過(guò)split創(chuàng)建的索引的主shard恢復(fù)都是local shard 類(lèi)型晕城,也就是從本地的shard恢復(fù)
sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
if (sourceNode == null) {
logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
return;
}
}
try {
logger.debug("{} creating shard", shardRouting.shardId());
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
repositoriesService, failedShardHandler, globalCheckpointSyncer);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
}
這里直接調(diào)用IndicesService的createShard函數(shù)創(chuàng)建索引
IndicesService
public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer) throws IOException {
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
//生成IndexShard對(duì)象
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
indexShard.addShardFailureCallback(onShardFailure);
//開(kāi)始恢復(fù)該shard的數(shù)據(jù)集
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
(type, mapping) -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS:
"mapping update consumer only required by local shards recovery";
try {
client.admin().indices().preparePutMapping()
.setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
.setType(type)
.setSource(mapping.source().string(), XContentType.JSON)
.get();
} catch (IOException ex) {
throw new ElasticsearchException("failed to stringify mapping source", ex);
}
}, this);
return indexShard;
}
這里首先調(diào)用IndexService創(chuàng)建一個(gè)shard,其僅僅是生成一個(gè)IndexShard對(duì)象窖贤,并確定一些shard的元數(shù)據(jù)砖顷,比如shard的目錄等信息。然后把創(chuàng)建好的shard保存在IndexService中shards成員變量中赃梧。shard的元數(shù)據(jù)創(chuàng)建好了滤蝠,但shard的底層數(shù)據(jù)并沒(méi)有創(chuàng)建好,所以會(huì)調(diào)用IndexShard的startRecovery函數(shù)開(kāi)始為shard準(zhǔn)備數(shù)據(jù)授嘀。
IndexShard
public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
IndicesService indicesService) {
// TODO: Create a proper object to encapsulate the recovery context
// all of the current methods here follow a pattern of:
// resolve context which isn't really dependent on the local shards and then async
// call some external method with this pointer.
// with a proper recovery context object we can simply change this to:
// startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
// markAsRecovery("from " + source.getShortDescription(), recoveryState);
// threadPool.generic().execute() {
// onFailure () { listener.failure() };
// doRun() {
// if (source.recover(this)) {
// recoveryListener.onRecoveryDone(recoveryState);
// }
// }
// }}
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
...
//省略了其它幾種類(lèi)型的恢復(fù)挫以,因?yàn)閟plit的主shard的恢復(fù)類(lèi)型為這種
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
final Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
final List<IndexShard> startedShards = new ArrayList<>();
final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
final Set<ShardId> requiredShards;
final int numShards;
if (sourceIndexService != null) {
//選擇恢復(fù)的源shard
requiredShards = IndexMetaData.selectRecoverFromShards(shardId().id(),
sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards());
for (IndexShard shard : sourceIndexService) {
if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
判斷所有的源shard是不是都處于started狀態(tài)
startedShards.add(shard);
}
}
numShards = requiredShards.size();
} else {
numShards = -1;
requiredShards = Collections.emptySet();
}
//如果需要的源shard中有shard不屬于started狀態(tài)則報(bào)錯(cuò)
if (numShards == startedShards.size()) {
assert requiredShards.isEmpty() == false;
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
//另起一個(gè)線(xiàn)程開(kāi)始執(zhí)行數(shù)據(jù)恢復(fù)
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
.filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
} else {
final RuntimeException e;
if (numShards == -1) {
e = new IndexNotFoundException(resizeSourceIndex);
} else {
e = new IllegalStateException("not all required shards of index " + resizeSourceIndex
+ " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
+ shardId());
}
throw e;
}
break;
default:
throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
}
}
數(shù)據(jù)恢復(fù)會(huì)新起一個(gè)線(xiàn)程息尺,執(zhí)行recoverFromLocalShards函數(shù)
public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards) throws IOException {
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " + recoveryState.getRecoverySource();
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
try {
for (IndexShard shard : localShards) {
snapshots.add(new LocalShardSnapshot(shard));
}
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
} finally {
IOUtils.close(snapshots);
}
}
這里使用StoreRecovery來(lái)恢復(fù)數(shù)據(jù)
StoreRecovery
boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, final IndexShard indexShard, final List<LocalShardSnapshot> shards) throws IOException {
if (canRecover(indexShard)) {
...
//開(kāi)始執(zhí)行恢復(fù)
return executeRecovery(indexShard, () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
final long maxUnsafeAutoIdTimestamp =
shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
//在這個(gè)函數(shù)里生成shard底層的lucene索引
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp,
indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false, false, false);
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
}
});
}
return false;
}
void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
boolean hasNested) throws IOException {
// clean target directory (if previous recovery attempt failed) and create a fresh segment file with the proper lucene version
Lucene.cleanLuceneIndex(target);
assert sources.length > 0;
final int luceneIndexCreatedVersionMajor = Lucene.readSegmentInfos(sources[0]).getIndexCreatedVersionMajor();
new SegmentInfos(luceneIndexCreatedVersionMajor).commit(target);
//如果支持硬鏈接,則通過(guò)硬鏈接的方式將目標(biāo)索引的底層文件指向源索引的文件,否則需要拷貝源索引的文件
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
//將源索引的目錄下的文件添加到目標(biāo)索引
writer.addIndexes(sources);
if (split) {
//通過(guò)deletebyquery的方式將不屬于該shard的數(shù)據(jù)刪掉稚照,ShardSplittingQuery就是負(fù)責(zé)查出源shard中split后不屬于該目標(biāo)shard的數(shù)據(jù)潜支。
writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested));
}
/*
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
* the source shards. This ensures that history after this maximum sequence number can advance and we have correct
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(3);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
return liveCommitData.entrySet().iterator();
});
writer.commit();
}
}
通過(guò)addIndices方法將shard底層的lucene索引構(gòu)造出來(lái)后會(huì)進(jìn)入internalRecoverFromStore將該shard 的engine創(chuàng)建淘钟,該shard基本上就恢復(fù)完畢惧所。回到IndicesClusterStateService的createShard函數(shù)锻拘,在調(diào)用IndicesService的createShard的時(shí)候傳入了一個(gè)RecoveryListener對(duì)象油吭。
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
private final ShardRouting shardRouting;
private RecoveryListener(ShardRouting shardRouting) {
this.shardRouting = shardRouting;
}
@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
當(dāng)恢復(fù)完畢后就會(huì)調(diào)用該對(duì)象的onRecoveryDone方法,在這方法里邊會(huì)發(fā)起一個(gè)shardStarted命令署拟,將該shard的狀態(tài)從INITIALIZING狀態(tài)變?yōu)镾TARTED狀態(tài)婉宰。
至此,shard就已經(jīng)創(chuàng)建完畢推穷,可以接受外邊的請(qǐng)求了