带着疑问学源码,第五篇:Elasticsearch 节点启动分析 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10.2+
目的在看源码之前先梳理一下,自己对于节点启动流程疑惑的点:
- 节点启动都做了哪些检查?
- 节点启动都初始化了哪些内容?
- 当节点启动后,数据迁移是在哪里处理?
先从启动脚本中找到启动类的入口:org.elasticsearch.bootstrap.Elasticsearch。
下面看一下org.elasticsearch.bootstrap.Elasticsearch,先看一下主入口函数:
/**
* Main entry point for starting elasticsearch
*/
public static void main(final String[] args) throws Exception {
// 根据jvm.options中读取:es.networkaddress.cache.ttl和es.networkaddress.cache.negative.ttl
// 并覆盖JVM Security中的networkaddress.cache.ttl与networkaddress.cache.negative.ttl
overrideDnsCachePolicyProperties();
/*
* We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
* presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
* forces such policies to take effect immediately.
*/
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
// 核心检查处理都在main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)方法中
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
final String basePath = System.getProperty("es.logs.base_path");
// It's possible to fail before logging has been configured, in which case there's no point
// suggesting that the user look in the log file.
if (basePath != null) {
Terminal.DEFAULT.errorPrintln(
"ERROR: Elasticsearch did not exit normally - check the logs at "
+ basePath
+ System.getProperty("file.separator")
+ System.getProperty("es.logs.cluster_name") + ".log"
);
}
exit(status);
}
}
main的处理逻辑如下:
Elasticsearch main(final String[] args)=>
Elasticsearch main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)=>
Command main(String[] args, Terminal terminal)=>
EnvironmentAwareCommand execute(Terminal terminal, OptionSet options)=>
Elasticsearch execute(Terminal terminal, OptionSet options, Environment env)=>
Bootstrap static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv)=>
下面看一下Bootstrap.init
/**
* This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
*/
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// force the class initializer for BootstrapInfo to run before
// the security manager is installed
BootstrapInfo.init();
INSTANCE = new Bootstrap();
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
// the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture
// the stream objects before calling LogConfigurator to be able to close them when appropriate
final Runnable sysOutCloser = getSysOutCloser();
final Runnable sysErrorCloser = getSysErrorCloser();
LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
try {
final boolean closeStandardStreams = (foreground == false) || quiet;
if (closeStandardStreams) {
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
sysOutCloser.run();
}
// fail if somebody replaced the lucene jars
// 检查 Lucene 版本,ES 各个版本对使用的 Lucene 版本是有要求的
// 在这里检查Lucene版本以防止有人替换不兼容的jar包。
checkLucene();
// install the default uncaught exception handler; must be done before security is
// initialized as we do not want to grant the runtime permission
// setDefaultUncaughtExceptionHandler
// 会根据不同的异常,设置不同的exit code
// InternalError 128
// OutOfMemoryError 127
// StackOverflowError 126
// UnknownError 125
// IOError 124
// 其它 1
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
// 检查启动es的用户
// 检查JNA(系统调用)
// 检查MEMORY_LOCK
// 检查MaxNumberOfThreads
// 检查MaxSizeVirtualMemory
// 检查MaxFileSize
// init lucene random seed
// 注册JVM addShutdownHook(Node退出的时候,会用到)
// 检查jar冲突
// 初始化JVM Security
// Node实例添加validateNodeBeforeAcceptingRequests,并初始化Node实例。
INSTANCE.setup(true, environment);
try {
// any secure settings must be read during node construction
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
// 1、开始启动各子模块。
// 子模块在Node类中创建、启动
// 子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。
// 2、调用keepAliveThread.start()方法启动keepalive线程,线程本身不做具体的工作。
// 主线程执行完启动流程后会退出,keepalive线程是唯一的用户线程,
// 作用是保持进程运行。在Java程序中,至少要有一个用户线程。当用户线程数为零时退出进程。
INSTANCE.start();
// We don't close stderr if `--quiet` is passed, because that
// hides fatal startup errors. For example, if Elasticsearch is
// running via systemd, the init script only specifies
// `--quiet`, not `-d`, so we want users to be able to see
// startup errors via journalctl.
if (foreground == false) {
sysErrorCloser.run();
}
} catch (NodeValidationException | RuntimeException e) {
// disable console logging, so user does not see the exception twice (jvm will show it already)
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (foreground && maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
Logger logger = LogManager.getLogger(Bootstrap.class);
// HACK, it sucks to do this, but we will run users out of disk space otherwise
if (e instanceof CreationException) {
// guice: log the shortened exc to the log file
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = null;
try {
ps = new PrintStream(os, false, "UTF-8");
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
new StartupException(e).printStackTrace(ps);
ps.flush();
try {
logger.error("Guice Exception: {}", os.toString("UTF-8"));
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
} else if (e instanceof NodeValidationException) {
logger.error("node validation exception\n{}", e.getMessage());
} else {
// full exception
logger.error("Exception", e);
}
// re-enable it if appropriate, so they can see any logging during the shutdown process
if (foreground && maybeConsoleAppender != null) {
Loggers.addAppender(rootLogger, maybeConsoleAppender);
}
throw e;
}
}
下面看一下Node实例初始化及启动部分:
// 环境变量中携带的信息主要节点的配置信息:
// dataFiles、configFile、pluginsFile、modulesFile等等
// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/env/Environment.java
public Node(Environment environment) {
this(environment, Collections.emptyList(), true);
}
/**
* Constructs a node
*
* @param initialEnvironment the initial environment for this node, which will be added to by plugins
* @param classpathPlugins the plugins to be loaded from the classpath
* @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
* test framework for tests that rely on being able to set private settings
*/
protected Node(
final Environment initialEnvironment,
Collection builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
// 创建NodeClient
client = new NodeClient(settings, threadPool);
// 创建各种***Service对象和各种模***Module对象
final ScriptModule scriptModule = new ScriptModule(
settings,
pluginsService.filterPlugins(ScriptPlugin.class)
);
final ScriptService scriptService = newScriptService(
settings,
scriptModule.engines,
scriptModule.contexts
);
AnalysisModule analysisModule = new AnalysisModule(
this.environment,
pluginsService.filterPlugins(AnalysisPlugin.class)
);
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final Set> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(
settings,
clusterService,
consistentSettings
)
.newHashPublisher()
);
}
final IngestService ingestService = new IngestService(
clusterService,
threadPool,
this.environment,
scriptService,
analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class),
client
);
final SetOnce repositoriesServiceReference = new SetOnce();
final ClusterInfoService clusterInfoService = newClusterInfoService(
settings,
clusterService,
threadPool,
client
);
final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder();
final MonitorService monitorService = new MonitorService(
settings,
nodeEnvironment,
threadPool
);
final FsHealthService fsHealthService = new FsHealthService(
settings,
clusterService.getClusterSettings(),
threadPool,
nodeEnvironment
);
final SetOnce rerouteServiceReference = new SetOnce();
final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
settings,
clusterService,
repositoriesServiceReference::get,
rerouteServiceReference::get
);
final ClusterModule clusterModule = new ClusterModule(
settings,
clusterService,
clusterPlugins,
clusterInfoService,
snapshotsInfoService,
threadPool.getThreadContext()
);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(
pluginsService.filterPlugins(MapperPlugin.class)
);
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(
settings,
pluginsService.filterPlugins(SearchPlugin.class)
);
List pluginCircuitBreakers = pluginsService
.filterPlugins(CircuitBreakerPlugin.class)
.stream()
.map(plugin -> plugin.getCircuitBreaker(settings))
.collect(Collectors.toList());
final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
settingsModule.getSettings(),
pluginCircuitBreakers,
settingsModule.getClusterSettings()
);
pluginsService
.filterPlugins(CircuitBreakerPlugin.class)
.forEach(
plugin -> {
CircuitBreaker breaker = circuitBreakerService.getBreaker(
plugin.getCircuitBreaker(settings).getName()
);
plugin.setCircuitBreaker(breaker);
}
);
resourcesToClose.add(circuitBreakerService);
modules.add(new GatewayModule());
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(
pageCacheRecycler,
circuitBreakerService
);
modules.add(settingsModule);
List namedWriteables = Stream
.of(
NetworkModule.getNamedWriteables().stream(),
IndicesModule.getNamedWriteables().stream(),
searchModule.getNamedWriteables().stream(),
pluginsService
.filterPlugins(Plugin.class)
.stream()
.flatMap(p -> p.getNamedWriteables().stream()),
ClusterModule.getNamedWriteables().stream()
)
.flatMap(Function.identity())
.collect(Collectors.toList());
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
namedWriteables
);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
Stream
.of(
NetworkModule.getNamedXContents().stream(),
IndicesModule.getNamedXContents().stream(),
searchModule.getNamedXContents().stream(),
pluginsService
.filterPlugins(Plugin.class)
.stream()
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream()
)
.flatMap(Function.identity())
.collect(toList())
);
final MetaStateService metaStateService = new MetaStateService(
nodeEnvironment,
xContentRegistry
);
final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(
nodeEnvironment,
xContentRegistry,
bigArrays,
clusterService.getClusterSettings(),
threadPool::relativeTimeInMillis
);
// collect engine factory providers from plugins
final Collection enginePlugins = pluginsService.filterPlugins(
EnginePlugin.class
);
final Collection engineFactoryProviders = enginePlugins
.stream()
.map(
plugin ->
(Function) plugin::getEngineFactory
)
.collect(Collectors.toList());
final Map indexStoreFactories = pluginsService
.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getDirectoryFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map recoveryStateFactories = pluginsService
.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getRecoveryStateFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final List indexFoldersDeletionListeners = pluginsService
.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexFoldersDeletionListeners)
.flatMap(List::stream)
.collect(Collectors.toList());
final Map snapshotCommitSuppliers = pluginsService
.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getSnapshotCommitSuppliers)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map systemIndexDescriptorMap = pluginsService
.filterPlugins(SystemIndexPlugin.class)
.stream()
.collect(
Collectors.toUnmodifiableMap(
plugin -> plugin.getClass().getSimpleName(),
plugin -> plugin.getSystemIndexDescriptors(settings)
)
);
final SystemIndices systemIndices = new SystemIndices(
systemIndexDescriptorMap
);
final SystemIndexManager systemIndexManager = new SystemIndexManager(
systemIndices,
client
);
clusterService.addListener(systemIndexManager);
final RerouteService rerouteService = new BatchedRerouteService(
clusterService,
clusterModule.getAllocationService()::reroute
);
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);
final IndicesService indicesService = new IndicesService(
settings,
pluginsService,
nodeEnvironment,
xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(),
indicesModule.getMapperRegistry(),
namedWriteableRegistry,
threadPool,
settingsModule.getIndexScopedSettings(),
circuitBreakerService,
bigArrays,
scriptService,
clusterService,
client,
metaStateService,
engineFactoryProviders,
indexStoreFactories,
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
indexFoldersDeletionListeners,
snapshotCommitSuppliers
);
final AliasValidator aliasValidator = new AliasValidator();
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(
settings,
clusterService
);
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
indicesService,
clusterModule.getAllocationService(),
aliasValidator,
shardLimitValidator,
environment,
settingsModule.getIndexScopedSettings(),
threadPool,
xContentRegistry,
systemIndices,
forbidPrivateIndexSettings
);
pluginsService
.filterPlugins(Plugin.class)
.forEach(
p ->
p
.getAdditionalIndexSettingProviders()
.forEach(
metadataCreateIndexService::addAdditionalIndexSettingProvider
)
);
final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(
threadPool,
clusterService,
metadataCreateIndexService
);
Collection pluginComponents = pluginsService
.filterPlugins(Plugin.class)
.stream()
.flatMap(
p ->
p
.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
clusterModule.getIndexNameExpressionResolver(),
repositoriesServiceReference::get
)
.stream()
)
.collect(Collectors.toList());
ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(),
settingsModule.getSettingsFilter(),
threadPool,
pluginsService.filterPlugins(ActionPlugin.class),
client,
circuitBreakerService,
usageService,
systemIndices,
getRestCompatibleFunction()
);
modules.add(actionModule);
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
threadPool,
bigArrays,
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
xContentRegistry,
networkService,
restController,
clusterService.getClusterSettings()
);
Collection indexTemplateMetadataUpgraders = pluginsService
.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getIndexTemplateMetadataUpgrader)
.collect(Collectors.toList());
final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(
indexTemplateMetadataUpgraders
);
final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(
settings,
xContentRegistry,
indicesModule.getMapperRegistry(),
settingsModule.getIndexScopedSettings(),
systemIndices,
scriptService
);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(
new SystemIndexMetadataUpgradeService(systemIndices, clusterService)
);
}
new TemplateUpgradeService(
client,
clusterService,
threadPool,
indexTemplateMetadataUpgraders
);
final Transport transport = networkModule.getTransportSupplier().get();
Set taskHeaders = Stream
.concat(
pluginsService
.filterPlugins(ActionPlugin.class)
.stream()
.flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
)
.collect(Collectors.toSet());
final TransportService transportService = newTransportService(
settings,
transport,
threadPool,
networkModule.getTransportInterceptor(),
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders
);
final GatewayMetaState gatewayMetaState = new GatewayMetaState();
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(
clusterService
);
final SearchTransportService searchTransportService = new SearchTransportService(
transportService,
client,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
);
final HttpServerTransport httpServerTransport = newHttpTransport(
networkModule
);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final RecoverySettings recoverySettings = new RecoverySettings(
settings,
settingsModule.getClusterSettings()
);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
transportService,
clusterService,
bigArrays,
xContentRegistry,
recoverySettings
);
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(
settings,
clusterService,
clusterModule.getIndexNameExpressionResolver(),
repositoryService,
transportService,
actionModule.getActionFilters()
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,
clusterService,
repositoryService,
transportService,
indicesService
);
RestoreService restoreService = new RestoreService(
clusterService,
repositoryService,
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
settings,
clusterService::state,
clusterService.getClusterSettings(),
client,
threadPool::relativeTimeInMillis,
rerouteService
);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
final DiscoveryModule discoveryModule = new DiscoveryModule(
settings,
transportService,
namedWriteableRegistry,
networkService,
clusterService.getMasterService(),
clusterService.getClusterApplierService(),
clusterService.getClusterSettings(),
pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(),
environment.configFile(),
gatewayMetaState,
rerouteService,
fsHealthService
);
this.nodeService =
new NodeService(
settings,
threadPool,
monitorService,
discoveryModule.getDiscovery(),
transportService,
indicesService,
pluginsService,
circuitBreakerService,
scriptService,
httpServerTransport,
ingestService,
clusterService,
settingsModule.getSettingsFilter(),
responseCollectorService,
searchTransportService,
indexingLimits,
searchModule.getValuesSourceRegistry().getUsageService()
);
final SearchService searchService = newSearchService(
clusterService,
indicesService,
threadPool,
scriptService,
bigArrays,
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService
);
final List
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?