開篇
?這篇文章主要目的是想梳理下elasticserach在啟動(dòng)過程中的核心步驟,宏觀上講解清楚elasticsearch啟動(dòng)過程中都做了哪些事情。
?原本想通過流程圖來進(jìn)行畫,后來網(wǎng)上有人通過xmind來分析整個(gè)過程祷舀,發(fā)現(xiàn)也能夠講解的非常清楚,因此同樣采用xmind來自上而下講解整個(gè)過程。
啟動(dòng)過程圖
說明:
1.通過XMind記錄ES啟動(dòng)流程的整個(gè)過程洒疚。
2.閱讀順序從上往下,標(biāo)紅色旗子的部分是核心流程坯屿。
3.核心流程我概括為:配置加載油湖;Bootstrap 初始化; Bootstrap setup過程;Bootstrap start過程领跛。
4.每個(gè)步驟當(dāng)中細(xì)分下去很多邏輯乏德,這里只講解能夠串聯(lián)整個(gè)過程的邏輯。
配置加載過程
Bootstrap 初始化
- Elasticsearch的一個(gè)重要作用是解析命令參數(shù)吠昭。
- 執(zhí)行帶 -h 參數(shù)的Elasticsearch啟動(dòng)命令喊括。
- Elasticsearch的構(gòu)造函數(shù)如下所示,跟幫助信息是一致的矢棚。
// elasticsearch啟動(dòng)命令幫助
Elasticsearch() {
super("starts elasticsearch", () -> {});
versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
"Prints elasticsearch version information and exits");
daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
"Starts Elasticsearch in the background")
.availableUnless(versionOption);
pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
"Creates a pid file in the specified path on start")
.availableUnless(versionOption)
.withRequiredArg()
.withValuesConvertedBy(new PathConverter());
quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
"Turns off standard output/error streams logging in console")
.availableUnless(versionOption)
.availableUnless(daemonizeOption);
}
Elasticsearch.main過程
// elasticsearch啟動(dòng)入口函數(shù)
public static void main(final String[] args) throws Exception {
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}
// 調(diào)用elasticsearch對(duì)象的main函數(shù)
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
return elasticsearch.main(args, terminal);
}
1.創(chuàng)建 SecurityManager 安全管理器
2.LogConfigurator.registerErrorListener() 注冊(cè)偵聽器
3.創(chuàng)建Elasticsearch對(duì)象
4.進(jìn)入elasticsearch.main()過程郑什。
elasticsearch.main過程
說明:
- Elasticsearch繼承關(guān)系如上圖。
- elasticsearch.main()過程當(dāng)中會(huì)調(diào)用EnvironmentAwareCommand蒲肋、Command等類蘑拯。
class Elasticsearch extends EnvironmentAwareCommand {
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
return elasticsearch.main(args, terminal);
}
}
public abstract class Command implements Closeable {
public final int main(String[] args, Terminal terminal) throws Exception {
if (addShutdownHook()) {
shutdownHookThread = new Thread(() -> {
try {
this.close();
} catch (final IOException e) {
try (
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
e.printStackTrace(pw);
terminal.println(sw.toString());
} catch (final IOException impossible) {
}
}
});
Runtime.getRuntime().addShutdownHook(shutdownHookThread);
}
beforeMain.run();
try {
mainWithoutErrorHandling(args, terminal);
} catch (OptionException e) {
return ExitCodes.USAGE;
} catch (UserException e) {
return e.exitCode;
}
return ExitCodes.OK;
}
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
final OptionSet options = parser.parse(args);
if (options.has(helpOption)) {
printHelp(terminal);
return;
}
if (options.has(silentOption)) {
terminal.setVerbosity(Terminal.Verbosity.SILENT);
} else if (options.has(verboseOption)) {
terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
} else {
terminal.setVerbosity(Terminal.Verbosity.NORMAL);
}
execute(terminal, options);
}
}
5.elasticsearch.main()直接進(jìn)入Command.main()方法钝满。
6.Command.main()給Runtime類添加一個(gè)hook線程,該線程作用是:當(dāng)Runtime異常關(guān)閉時(shí)打印異常信息申窘。
7.Command.mainWithoutErrorHandling 方法弯蚜,根據(jù)命令行參數(shù)打印或者設(shè)置參數(shù),然后執(zhí)行命令偶洋。
8.進(jìn)入EnvironmentAwareCommand.execute()方法熟吏。
EnvironmentAwareCommand.execute過程
public abstract class EnvironmentAwareCommand extends Command {
protected void execute(Terminal terminal, OptionSet options) throws Exception {
final Map<String, String> settings = new HashMap<>();
for (final KeyValuePair kvp : settingOption.values(options)) {
if (kvp.value.isEmpty()) {
throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
}
if (settings.containsKey(kvp.key)) {
final String message = String.format(
Locale.ROOT,
"setting [%s] already set, saw [%s] and [%s]",
kvp.key,
settings.get(kvp.key),
kvp.value);
throw new UserException(ExitCodes.USAGE, message);
}
settings.put(kvp.key, kvp.value);
}
putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
execute(terminal, options, createEnv(terminal, settings));
}
protected Environment createEnv(final Terminal terminal, final Map<String, String> settings) throws UserException {
final String esPathConf = System.getProperty("es.path.conf");
if (esPathConf == null) {
throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
}
return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf));
}
protected abstract void execute(Terminal terminal, OptionSet options,
Environment env) throws Exception;
}
9.EnvironmentAwareCommand.execute,確保 es.path.data, es.path.home, es.path.logs 等參數(shù)已設(shè)置玄窝,否則從 System.properties 中讀取牵寺。
10.createEnv最后返回一個(gè) Environment 對(duì)象,包含plugins恩脂,bin帽氓,lib,modules等目錄下的文件信息
11.進(jìn)入elasticsearch的execute()方法俩块。
prepareEnvironment過程
public class InternalSettingsPreparer {
public static Environment prepareEnvironment(Settings input, Terminal terminal,
Map<String, String> properties, Path configPath) {
// just create enough settings to build the environment, to get the config dir
Settings.Builder output = Settings.builder();
initializeSettings(output, input, properties);
Environment environment = new Environment(output.build(), configPath);
if (Files.exists(environment.configFile().resolve("elasticsearch.yaml"))) {
throw new SettingsException("elasticsearch.yaml was deprecated in 5.5.0 and must be renamed to elasticsearch.yml");
}
if (Files.exists(environment.configFile().resolve("elasticsearch.json"))) {
throw new SettingsException("elasticsearch.json was deprecated in 5.5.0 and must be converted to elasticsearch.yml");
}
output = Settings.builder(); // start with a fresh output
Path path = environment.configFile().resolve("elasticsearch.yml");
if (Files.exists(path)) {
try {
output.loadFromPath(path);
} catch (IOException e) {
throw new SettingsException("Failed to load settings from " + path.toString(), e);
}
}
// re-initialize settings now that the config file has been loaded
initializeSettings(output, input, properties);
finalizeSettings(output, terminal);
environment = new Environment(output.build(), configPath);
// we put back the path.logs so we can use it in the logging configuration file
output.put(Environment.PATH_LOGS_SETTING.getKey(), environment.logsFile().toAbsolutePath().normalize().toString());
return new Environment(output.build(), configPath);
}
}
public class Environment {
public Environment(final Settings settings, final Path configPath) {
final Path homeFile;
if (PATH_HOME_SETTING.exists(settings)) {
homeFile = PathUtils.get(PATH_HOME_SETTING.get(settings)).normalize();
} else {
throw new IllegalStateException(PATH_HOME_SETTING.getKey() + " is not configured");
}
if (configPath != null) {
configFile = configPath.normalize();
} else {
configFile = homeFile.resolve("config");
}
pluginsFile = homeFile.resolve("plugins");
List<String> dataPaths = PATH_DATA_SETTING.get(settings);
final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
if (DiscoveryNode.nodeRequiresLocalStorage(settings)) {
if (dataPaths.isEmpty() == false) {
dataFiles = new Path[dataPaths.size()];
dataWithClusterFiles = new Path[dataPaths.size()];
for (int i = 0; i < dataPaths.size(); i++) {
dataFiles[i] = PathUtils.get(dataPaths.get(i));
dataWithClusterFiles[i] = dataFiles[i].resolve(clusterName.value());
}
} else {
dataFiles = new Path[]{homeFile.resolve("data")};
dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(clusterName.value())};
}
} else {
if (dataPaths.isEmpty()) {
dataFiles = dataWithClusterFiles = EMPTY_PATH_ARRAY;
} else {
final String paths = String.join(",", dataPaths);
throw new IllegalStateException("node does not require local storage yet path.data is set to [" + paths + "]");
}
}
if (PATH_SHARED_DATA_SETTING.exists(settings)) {
sharedDataFile = PathUtils.get(PATH_SHARED_DATA_SETTING.get(settings)).normalize();
} else {
sharedDataFile = null;
}
List<String> repoPaths = PATH_REPO_SETTING.get(settings);
if (repoPaths.isEmpty()) {
repoFiles = EMPTY_PATH_ARRAY;
} else {
repoFiles = new Path[repoPaths.size()];
for (int i = 0; i < repoPaths.size(); i++) {
repoFiles[i] = PathUtils.get(repoPaths.get(i));
}
}
// this is trappy, Setting#get(Settings) will get a fallback setting yet return false for Settings#exists(Settings)
if (PATH_LOGS_SETTING.exists(settings)) {
logsFile = PathUtils.get(PATH_LOGS_SETTING.get(settings)).normalize();
} else {
logsFile = homeFile.resolve("logs");
}
if (PIDFILE_SETTING.exists(settings)) {
pidFile = PathUtils.get(PIDFILE_SETTING.get(settings)).normalize();
} else {
pidFile = null;
}
binFile = homeFile.resolve("bin");
libFile = homeFile.resolve("lib");
modulesFile = homeFile.resolve("modules");
Settings.Builder finalSettings = Settings.builder().put(settings);
finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile);
if (PATH_DATA_SETTING.exists(settings)) {
finalSettings.putList(PATH_DATA_SETTING.getKey(), dataPaths);
}
finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString());
this.settings = finalSettings.build();
}
}
- 12.createEnv最后返回一個(gè) Environment 對(duì)象黎休。
elasticsearch.execute過程
class Elasticsearch extends EnvironmentAwareCommand {
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
if (options.nonOptionArguments().isEmpty() == false) {
}
if (options.has(versionOption)) {
terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
+ ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
+ ", JVM: " + JvmInfo.jvmInfo().version());
return;
}
final boolean daemonize = options.has(daemonizeOption);
final Path pidFile = pidfileOption.value(options);
final boolean quiet = options.has(quietOption);
try {
init(daemonize, pidFile, quiet, env);
} catch (NodeValidationException e) {
}
}
void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
throws NodeValidationException, UserException {
try {
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
} catch (BootstrapException | RuntimeException e) {
}
}
}
13.Elasticsearch.execute ,讀取daemonize玉凯, pidFile势腮,quiet 的值,并 確保配置的臨時(shí)目錄(temp)是有效目錄
14.進(jìn)入Bootstrap初始化階段
Bootstrap init過程
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
BootstrapInfo.init();
INSTANCE = new Bootstrap();
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(foreground,
pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
final boolean closeStandardStreams = (foreground == false) || quiet;
try {
if (closeStandardStreams) {
final Logger rootLogger = ESLoggerFactory.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
closeSystOut();
}
checkLucene();
Thread.setDefaultUncaughtExceptionHandler(
new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
INSTANCE.setup(true, environment);
try {
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
INSTANCE.start();
if (closeStandardStreams) {
closeSysError();
}
} catch (NodeValidationException | RuntimeException e) {
throw e;
}
}
15.創(chuàng)建Bootstrap對(duì)象漫仆, INSTANCE = new Bootstrap()捎拯。
16.初始化Bootstrap對(duì)象,INSTANCE.setup(true, environment)盲厌。
17.啟動(dòng)Bootstrap對(duì)象INSTANCE.start()署照。
Bootstrap new過程
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await();
} catch (InterruptedException e) {
// bail out
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false);
// keep this thread alive (non daemon thread) until we shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
keepAliveLatch.countDown();
}
});
}
- 18.創(chuàng)建Bootstrap對(duì)象,該類構(gòu)造函數(shù)會(huì)創(chuàng)建一個(gè)用戶線程添加到Runtime Hook中吗浩,進(jìn)行 countDown 操作建芙。
Bootstrap setup過程
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
spawner.spawnNativePluginControllers(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
// initialize probes before the security manager is installed
initializeProbes();
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
}
});
}
try {
JarHell.checkJarHell();
} catch (IOException | URISyntaxException e) {
}
IfConfig.logIfNecessary();
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
}
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks)
throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}
19.為每個(gè)插件生成生成本機(jī)控制類,spawner.spawnNativePluginControllers(environment)懂扼;嘗試為給定模塊生成控制器(native Controller)守護(hù)程序禁荸。 生成的進(jìn)程將通過其stdin,stdout和stderr流保持與此JVM的連接阀湿,但對(duì)此包之外的代碼不能使用對(duì)這些流的引用屡限。
20.初始化本地資源 initializeNatives()。
21.使用 JarHell 檢查重復(fù)的 jar 文件 JarHell.checkJarHell()炕倘。
22.創(chuàng)建 node 節(jié)點(diǎn) new Node(environment),核心:渤拧U中啊央!。
Bootstrap start過程
private void start() throws NodeValidationException {
node.start();
keepAliveThread.start();
}
23.啟動(dòng)node對(duì)象涨醋,node.start()瓜饥。
24.啟動(dòng)守護(hù)進(jìn)程,keepAliveThread.start()浴骂。
node創(chuàng)建過程
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
try {
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
Logger logger = Loggers.getLogger(Node.class, tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(),
environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
this.settings = pluginsService.updatedSettings();
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
this.environment = new Environment(this.settings, environment.configFile());
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
..................
}
- 25.創(chuàng)建一個(gè) NodeEnvironment 對(duì)象保存節(jié)點(diǎn)環(huán)境信息乓土,如各種數(shù)據(jù)文件的路徑
- 26.讀取JVM信息
- 27.創(chuàng)建 PluginsService 對(duì)象,創(chuàng)建過程中會(huì)讀取并加載所有的模塊和插件
- 28.創(chuàng)建一個(gè)最終的 Environment 對(duì)象
- 29.創(chuàng)建線程池 ThreadPool 后面各類對(duì)象基本都是通過線程來提供服務(wù)溯警,這個(gè)線程池可以管理各類線程
- 30.創(chuàng)建 節(jié)點(diǎn)客戶端 NodeClient
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
..................
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
AnalysisModule analysisModule = new AnalysisModule(this.environment,
pluginsService.filterPlugins(AnalysisPlugin.class));
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings,
additionalSettingsFilter);
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
clusterService.addListener(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
listener::onNewInfo);
final UsageService usageService = new UsageService(settings);
}
- 31.創(chuàng)建各種服務(wù)類對(duì)象 ResourceWatcherService趣苏、NetworkService、ClusterService梯轻、IngestService食磕、ClusterInfoService、UsageService喳挑、MonitorService彬伦、CircuitBreakerService、MetaStateService伊诵、IndicesService单绑、MetaDataIndexUpgradeService、TemplateUpgradeService曹宴、TransportService搂橙、ResponseCollectorService、SearchTransportService浙炼、NodeService份氧、SearchService、PersistentTasksClusterService,這些服務(wù)類是的功能可以根據(jù)名稱做一個(gè)大概的判斷弯屈。
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
..................
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment,
threadPool, clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());
ActionModule actionModule = new ActionModule(false, settings,
clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
modules.add(actionModule);
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false,
pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class)
.stream().map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getIndexTemplateMetaDataUpgrader)
.collect(Collectors.toList());
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class)
.stream().map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders,
indexTemplateMetaDataUpgraders);
final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings,
xContentRegistry,indicesModule.getMapperRegistry(),
settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
metaDataIndexUpgradeService, metaDataUpgrader);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final ResponseCollectorService responseCollectorService =
new ResponseCollectorService(this.settings, clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(settings,
transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
httpServerTransport = null;
}
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings,
threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService());
this.nodeService = new NodeService(settings, threadPool, monitorService,
discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService,
scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService,
settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);
- 32.ModulesBuilder類加入各種模塊 ScriptModule蜗帜、AnalysisModule、SettingsModule资厉、pluginModule厅缺、ClusterModule、IndicesModule宴偿、SearchModule湘捎、GatewayModule、RepositoriesModule窄刘、ActionModule窥妇、NetworkModule、DiscoveryModule
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
..................
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService));
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings,
settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(
new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class).toInstance(
new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
}
);
injector = modules.createInjector();
clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId());
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
success = true;
} catch (IOException ex) {
} finally {
}
}
- 33.elasticsearch里面的組件基本都進(jìn)行進(jìn)行了模塊化管理娩践,elasticsearch對(duì)guice進(jìn)行了封裝通過ModulesBuilder類構(gòu)建es的模塊.
node啟動(dòng)過程
public Node start() throws NodeValidationException {
if (!lifecycle.moveToStarted()) {
return this;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("starting ...");
pluginLifecycleComponents.forEach(LifecycleComponent::start);
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
final ClusterService clusterService = injector.getInstance(ClusterService.class);
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
final MetaData onDiskMetadata;
try {
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
} catch (IOException e) {
throw new UncheckedIOException(e);
}
validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata),
transportService.boundAddress(), pluginsService
.filterPlugins(Plugin
.class)
.stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
clusterService.addStateApplier(transportService.getTaskManager());
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();
assert clusterService.localNode().equals(localNodeFactory.getNode())
: "clusterService has a different local node than the factory provided";
transportService.acceptIncomingRequests();
discovery.startInitialJoin();
// tribe nodes don't have a master so we shouldn't register an observer s
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService,
null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
}
if (WRITE_PORTS_FILE_SETTING.get(settings)) {
if (NetworkModule.HTTP_ENABLED.get(settings)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}
pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
return this;
}
- 通過 injector 獲取各個(gè)類的對(duì)象活翩,調(diào)用 start() 方法啟動(dòng)(實(shí)際進(jìn)入各個(gè)類的中 doStart 方法): LifecycleComponent烹骨、IndicesService、IndicesClusterStateService材泄、SnapshotsService沮焕、SnapshotShardsService、RoutingService拉宗、SearchService峦树、MonitorService、NodeConnectionsService旦事、ResourceWatcherService魁巩、GatewayService、Discovery族檬、TransportService
- IndicesService:索引管理
IndicesClusterStateService:跨集群同步
SnapshotsService:負(fù)責(zé)創(chuàng)建快照
SnapshotShardsService:此服務(wù)在數(shù)據(jù)和主節(jié)點(diǎn)上運(yùn)行歪赢,并控制這些節(jié)點(diǎn)上當(dāng)前快照的分片。
RoutingService:偵聽集群狀態(tài)单料,當(dāng)它收到集群改變事件將驗(yàn)證集群狀態(tài)埋凯,路由表可能會(huì)更新
SearchService:搜索服務(wù)
MonitorService:監(jiān)控
NodeConnectionsService:此組件負(fù)責(zé)在節(jié)點(diǎn)添加到群集狀態(tài)后連接到節(jié)點(diǎn),并在刪除它們時(shí)斷開連接扫尖。 此外白对,它會(huì)定期檢查所有連接是否仍處于打開狀態(tài),并在需要時(shí)還原它們换怖。 請(qǐng)注意甩恼,如果節(jié)點(diǎn)斷開/不響應(yīng)ping,則此組件不負(fù)責(zé)從群集中刪除節(jié)點(diǎn)沉颂。 這是由NodesFaultDetection完成的条摸。 主故障檢測(cè)由鏈接MasterFaultDetection完成。
ResourceWatcherService:通用資源觀察器服務(wù)
GatewayService:網(wǎng)關(guān)
- IndicesService:索引管理
- 集群發(fā)現(xiàn)與監(jiān)控等铸屉,啟動(dòng) HttpServerTransport钉蒲, 綁定服務(wù)端口。
線程池
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4,
genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX,
new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK,
new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200));
builders.put(Names.GET,
new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH,
new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT,
new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5,
TimeValue.timeValueMinutes(5)));
builders.put(Names.LISTENER,
new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
builders.put(Names.FLUSH,
new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH,
new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER,
new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT,
new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
this.scheduler = Scheduler.initScheduler(settings);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(
settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
線程池類型 ThreadPoolType
fixed(固定):fixed線程池?fù)碛泄潭〝?shù)量的線程來處理請(qǐng)求彻坛,在沒有空閑線程時(shí)請(qǐng)求將被掛在隊(duì)列中顷啼。queue_size參數(shù)可以控制在沒有空閑線程時(shí),能排隊(duì)掛起的請(qǐng)求數(shù)
fixed_auto_queue_size:此類型為實(shí)驗(yàn)性的昌屉,將被更改或刪除钙蒙,不關(guān)注
scaling(彈性):scaling線程池?fù)碛械木€程數(shù)量是動(dòng)態(tài)的,這個(gè)數(shù)字介于core和max參數(shù)的配置之間變化间驮。keep_alive參數(shù)用來控制線程在線程池中空閑的最長(zhǎng)時(shí)間
direct:此類線程是一種不支持關(guān)閉的線程,就意味著一旦使用,則會(huì)一直存活下去.
一些重要的線程池
generic:用于通用的請(qǐng)求(例如:后臺(tái)節(jié)點(diǎn)發(fā)現(xiàn))躬厌,線程池類型為 scaling。
index:用于index/delete請(qǐng)求竞帽,線程池類型為 fixed烤咧, 大小的為處理器數(shù)量偏陪,隊(duì)列大小為200,最大線程數(shù)為 1 + 處理器數(shù)量煮嫌。
search:用于count/search/suggest請(qǐng)求。線程池類型為 fixed抱虐, 大小的為 int((處理器數(shù)量 3) / 2) +1昌阿,隊(duì)列大小為1000。*
get:用于get請(qǐng)求恳邀。線程池類型為 fixed懦冰,大小的為處理器數(shù)量,隊(duì)列大小為1000谣沸。
analyze:用于analyze請(qǐng)求刷钢。線程池類型為 fixed,大小的1乳附,隊(duì)列大小為16
write:用于單個(gè)文檔的 index/delete/update 請(qǐng)求以及 bulk 請(qǐng)求内地,線程池類型為 fixed,大小的為處理器數(shù)量赋除,隊(duì)列大小為200阱缓,最大線程數(shù)為 1 + 處理器數(shù)量。
snapshot:用于snaphost/restore請(qǐng)求举农。線程池類型為 scaling荆针,線程保持存活時(shí)間為5分鐘,最大線程數(shù)為min(5, (處理器數(shù)量)/2)颁糟。
warmer:用于segment warm-up請(qǐng)求航背。線程池類型為 scaling,線程保持存活時(shí)間為5分鐘棱貌,最大線程數(shù)為min(5, (處理器數(shù)量)/2)玖媚。
refresh:用于refresh請(qǐng)求。線程池類型為 scaling,線程空閑保持存活時(shí)間為5分鐘肝劲,最大線程數(shù)為min(10, (處理器數(shù)量)/2)损搬。
listener:主要用于Java客戶端線程監(jiān)聽器被設(shè)置為true時(shí)執(zhí)行動(dòng)作。線程池類型為 scaling涡贱,最大線程數(shù)為min(10, (處理器數(shù)量)/2)。
PluginsService插件服務(wù)
public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";
public static final String ES_PLUGIN_POLICY = "plugin-security.policy";
public static PluginInfo readFromProperties(final Path path) throws IOException {
final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
final Properties props = new Properties();
try (InputStream stream = Files.newInputStream(descriptor)) {
props.load(stream);
}
final String name = props.getProperty("name");
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"property [name] is missing in [" + descriptor + "]");
}
final String description = props.getProperty("description");
if (description == null) {
throw new IllegalArgumentException(
"property [description] is missing for plugin [" + name + "]");
}
final String version = props.getProperty("version");
if (version == null) {
throw new IllegalArgumentException(
"property [version] is missing for plugin [" + name + "]");
}
final String esVersionString = props.getProperty("elasticsearch.version");
if (esVersionString == null) {
throw new IllegalArgumentException(
"property [elasticsearch.version] is missing for plugin [" + name + "]");
}
final Version esVersion = Version.fromString(esVersionString);
if (esVersion.equals(Version.CURRENT) == false) {
final String message = String.format(
Locale.ROOT,
"plugin [%s] is incompatible with version [%s]; was designed for version [%s]",
name,
Version.CURRENT.toString(),
esVersionString);
throw new IllegalArgumentException(message);
}
final String javaVersionString = props.getProperty("java.version");
if (javaVersionString == null) {
throw new IllegalArgumentException(
"property [java.version] is missing for plugin [" + name + "]");
}
JarHell.checkVersionFormat(javaVersionString);
JarHell.checkJavaVersion(name, javaVersionString);
final String classname = props.getProperty("classname");
if (classname == null) {
throw new IllegalArgumentException(
"property [classname] is missing for plugin [" + name + "]");
}
final String hasNativeControllerValue = props.getProperty("has.native.controller");
final boolean hasNativeController;
if (hasNativeControllerValue == null) {
hasNativeController = false;
} else {
switch (hasNativeControllerValue) {
case "true":
hasNativeController = true;
break;
case "false":
hasNativeController = false;
break;
default:
final String message = String.format(
Locale.ROOT,
"property [%s] must be [%s], [%s], or unspecified but was [%s]",
"has_native_controller",
"true",
"false",
hasNativeControllerValue);
throw new IllegalArgumentException(message);
}
}
final String requiresKeystoreValue = props.getProperty("requires.keystore", "false");
final boolean requiresKeystore;
try {
requiresKeystore = Booleans.parseBoolean(requiresKeystoreValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("property [requires.keystore] must be [true] or [false]," +
" but was [" + requiresKeystoreValue + "]", e);
}
return new PluginInfo(name, description, version, classname, hasNativeController, requiresKeystore);
}
讀取模塊的配置文件 plugin-descriptor.properties惹想,解析出內(nèi)容并存儲(chǔ)到 Map中问词。
分別校驗(yàn) name, description, version, elasticsearch.version, java.version, classname, extended.plugins, has.native.controller, requires.keystore 這些配置項(xiàng),缺失或者不按要求則拋出異常嘀粱。
根據(jù)配置項(xiàng)構(gòu)造一個(gè) PluginInfo 對(duì)象返回激挪。