您当前的位置: 首页 >  ar

衣舞晨风

暂无认证

  • 1浏览

    0关注

    1156博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【Elasticsearch源码】 节点启动分析

衣舞晨风 发布时间:2022-03-16 16:56:40 ,浏览量:1

带着疑问学源码,第五篇:Elasticsearch 节点启动分析 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于节点启动流程疑惑的点:

  • 节点启动都做了哪些检查?
  • 节点启动都初始化了哪些内容?
  • 当节点启动后,数据迁移是在哪里处理?
源码分析

先从启动脚本中找到启动类的入口:org.elasticsearch.bootstrap.Elasticsearch。

下面看一下org.elasticsearch.bootstrap.Elasticsearch,先看一下主入口函数:

    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(final String[] args) throws Exception {
        // 根据jvm.options中读取:es.networkaddress.cache.ttl和es.networkaddress.cache.negative.ttl
        // 并覆盖JVM Security中的networkaddress.cache.ttl与networkaddress.cache.negative.ttl
        overrideDnsCachePolicyProperties();
        /*
         * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
         * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
         * forces such policies to take effect immediately.
         */
        System.setSecurityManager(new SecurityManager() {

            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want
            }

        });
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        // 核心检查处理都在main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)方法中
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            final String basePath = System.getProperty("es.logs.base_path");
            // It's possible to fail before logging has been configured, in which case there's no point
            // suggesting that the user look in the log file.
            if (basePath != null) {
                Terminal.DEFAULT.errorPrintln(
                    "ERROR: Elasticsearch did not exit normally - check the logs at "
                        + basePath
                        + System.getProperty("file.separator")
                        + System.getProperty("es.logs.cluster_name") + ".log"
                );
            }
            exit(status);
        }
    }

main的处理逻辑如下:

Elasticsearch main(final String[] args)=>
Elasticsearch main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal)=>
Command main(String[] args, Terminal terminal)=>
EnvironmentAwareCommand execute(Terminal terminal, OptionSet options)=>
Elasticsearch execute(Terminal terminal, OptionSet options, Environment env)=>
Bootstrap static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv)=>

下面看一下Bootstrap.init

     /**
     * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
     */
    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
        // force the class initializer for BootstrapInfo to run before
        // the security manager is installed
        BootstrapInfo.init();

        INSTANCE = new Bootstrap();

        final SecureSettings keystore = loadSecureSettings(initialEnv);
        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());

        // the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture
        // the stream objects before calling LogConfigurator to be able to close them when appropriate
        final Runnable sysOutCloser = getSysOutCloser();
        final Runnable sysErrorCloser = getSysErrorCloser();

        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
        try {
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        if (environment.pidFile() != null) {
            try {
                PidFile.create(environment.pidFile(), true);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
        }


        try {
            final boolean closeStandardStreams = (foreground == false) || quiet;
            if (closeStandardStreams) {
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                sysOutCloser.run();
            }

            // fail if somebody replaced the lucene jars
            // 检查 Lucene 版本,ES 各个版本对使用的 Lucene 版本是有要求的
            // 在这里检查Lucene版本以防止有人替换不兼容的jar包。
            checkLucene();

            // install the default uncaught exception handler; must be done before security is
            // initialized as we do not want to grant the runtime permission
            // setDefaultUncaughtExceptionHandler
            // 会根据不同的异常,设置不同的exit code
            // InternalError 128
            // OutOfMemoryError 127
            // StackOverflowError 126
            // UnknownError 125
            // IOError 124
            // 其它 1
            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());

            // 检查启动es的用户
            // 检查JNA(系统调用)
            // 检查MEMORY_LOCK
            // 检查MaxNumberOfThreads
            // 检查MaxSizeVirtualMemory
            // 检查MaxFileSize
            // init lucene random seed
            // 注册JVM addShutdownHook(Node退出的时候,会用到)
            // 检查jar冲突
            // 初始化JVM Security
            // Node实例添加validateNodeBeforeAcceptingRequests,并初始化Node实例。
            INSTANCE.setup(true, environment);

            try {
                // any secure settings must be read during node construction
                IOUtils.close(keystore);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }

            // 1、开始启动各子模块。
            // 子模块在Node类中创建、启动
            // 子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。
            // 2、调用keepAliveThread.start()方法启动keepalive线程,线程本身不做具体的工作。
            // 主线程执行完启动流程后会退出,keepalive线程是唯一的用户线程,
            // 作用是保持进程运行。在Java程序中,至少要有一个用户线程。当用户线程数为零时退出进程。
            INSTANCE.start();

            // We don't close stderr if `--quiet` is passed, because that
            // hides fatal startup errors. For example, if Elasticsearch is
            // running via systemd, the init script only specifies
            // `--quiet`, not `-d`, so we want users to be able to see
            // startup errors via journalctl.
            if (foreground == false) {
                sysErrorCloser.run();
            }

        } catch (NodeValidationException | RuntimeException e) {
            // disable console logging, so user does not see the exception twice (jvm will show it already)
            final Logger rootLogger = LogManager.getRootLogger();
            final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
            if (foreground && maybeConsoleAppender != null) {
                Loggers.removeAppender(rootLogger, maybeConsoleAppender);
            }
            Logger logger = LogManager.getLogger(Bootstrap.class);
            // HACK, it sucks to do this, but we will run users out of disk space otherwise
            if (e instanceof CreationException) {
                // guice: log the shortened exc to the log file
                ByteArrayOutputStream os = new ByteArrayOutputStream();
                PrintStream ps = null;
                try {
                    ps = new PrintStream(os, false, "UTF-8");
                } catch (UnsupportedEncodingException uee) {
                    assert false;
                    e.addSuppressed(uee);
                }
                new StartupException(e).printStackTrace(ps);
                ps.flush();
                try {
                    logger.error("Guice Exception: {}", os.toString("UTF-8"));
                } catch (UnsupportedEncodingException uee) {
                    assert false;
                    e.addSuppressed(uee);
                }
            } else if (e instanceof NodeValidationException) {
                logger.error("node validation exception\n{}", e.getMessage());
            } else {
                // full exception
                logger.error("Exception", e);
            }
            // re-enable it if appropriate, so they can see any logging during the shutdown process
            if (foreground && maybeConsoleAppender != null) {
                Loggers.addAppender(rootLogger, maybeConsoleAppender);
            }

            throw e;
        }
    }

下面看一下Node实例初始化及启动部分:

// 环境变量中携带的信息主要节点的配置信息:
  // dataFiles、configFile、pluginsFile、modulesFile等等
  // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/env/Environment.java
  public Node(Environment environment) {
    this(environment, Collections.emptyList(), true);
  }

  /**
   * Constructs a node
   *
   * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
   * @param classpathPlugins           the plugins to be loaded from the classpath
   * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
   *                                   test framework for tests that rely on being able to set private settings
   */
  protected Node(
    final Environment initialEnvironment,
    Collection builder : threadPool.builders()) {
        additionalSettings.addAll(builder.getRegisteredSettings());
      }
      // 创建NodeClient
      client = new NodeClient(settings, threadPool);

      // 创建各种***Service对象和各种模***Module对象
      final ScriptModule scriptModule = new ScriptModule(
        settings,
        pluginsService.filterPlugins(ScriptPlugin.class)
      );
      final ScriptService scriptService = newScriptService(
        settings,
        scriptModule.engines,
        scriptModule.contexts
      );
      AnalysisModule analysisModule = new AnalysisModule(
        this.environment,
        pluginsService.filterPlugins(AnalysisPlugin.class)
      );
      // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
      // so we might be late here already

      final Set> consistentSettings = settingsModule.getConsistentSettings();
      if (consistentSettings.isEmpty() == false) {
        clusterService.addLocalNodeMasterListener(
          new ConsistentSettingsService(
            settings,
            clusterService,
            consistentSettings
          )
            .newHashPublisher()
        );
      }
      final IngestService ingestService = new IngestService(
        clusterService,
        threadPool,
        this.environment,
        scriptService,
        analysisModule.getAnalysisRegistry(),
        pluginsService.filterPlugins(IngestPlugin.class),
        client
      );
      final SetOnce repositoriesServiceReference = new SetOnce();
      final ClusterInfoService clusterInfoService = newClusterInfoService(
        settings,
        clusterService,
        threadPool,
        client
      );
      final UsageService usageService = new UsageService();

      ModulesBuilder modules = new ModulesBuilder();
      final MonitorService monitorService = new MonitorService(
        settings,
        nodeEnvironment,
        threadPool
      );
      final FsHealthService fsHealthService = new FsHealthService(
        settings,
        clusterService.getClusterSettings(),
        threadPool,
        nodeEnvironment
      );
      final SetOnce rerouteServiceReference = new SetOnce();
      final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
        settings,
        clusterService,
        repositoriesServiceReference::get,
        rerouteServiceReference::get
      );
      final ClusterModule clusterModule = new ClusterModule(
        settings,
        clusterService,
        clusterPlugins,
        clusterInfoService,
        snapshotsInfoService,
        threadPool.getThreadContext()
      );
      modules.add(clusterModule);
      IndicesModule indicesModule = new IndicesModule(
        pluginsService.filterPlugins(MapperPlugin.class)
      );
      modules.add(indicesModule);

      SearchModule searchModule = new SearchModule(
        settings,
        pluginsService.filterPlugins(SearchPlugin.class)
      );
      List pluginCircuitBreakers = pluginsService
        .filterPlugins(CircuitBreakerPlugin.class)
        .stream()
        .map(plugin -> plugin.getCircuitBreaker(settings))
        .collect(Collectors.toList());
      final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
        settingsModule.getSettings(),
        pluginCircuitBreakers,
        settingsModule.getClusterSettings()
      );
      pluginsService
        .filterPlugins(CircuitBreakerPlugin.class)
        .forEach(
          plugin -> {
            CircuitBreaker breaker = circuitBreakerService.getBreaker(
              plugin.getCircuitBreaker(settings).getName()
            );
            plugin.setCircuitBreaker(breaker);
          }
        );
      resourcesToClose.add(circuitBreakerService);
      modules.add(new GatewayModule());

      PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
      BigArrays bigArrays = createBigArrays(
        pageCacheRecycler,
        circuitBreakerService
      );
      modules.add(settingsModule);
      List namedWriteables = Stream
        .of(
          NetworkModule.getNamedWriteables().stream(),
          IndicesModule.getNamedWriteables().stream(),
          searchModule.getNamedWriteables().stream(),
          pluginsService
            .filterPlugins(Plugin.class)
            .stream()
            .flatMap(p -> p.getNamedWriteables().stream()),
          ClusterModule.getNamedWriteables().stream()
        )
        .flatMap(Function.identity())
        .collect(Collectors.toList());
      final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
        namedWriteables
      );
      NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
        Stream
          .of(
            NetworkModule.getNamedXContents().stream(),
            IndicesModule.getNamedXContents().stream(),
            searchModule.getNamedXContents().stream(),
            pluginsService
              .filterPlugins(Plugin.class)
              .stream()
              .flatMap(p -> p.getNamedXContent().stream()),
            ClusterModule.getNamedXWriteables().stream()
          )
          .flatMap(Function.identity())
          .collect(toList())
      );
      final MetaStateService metaStateService = new MetaStateService(
        nodeEnvironment,
        xContentRegistry
      );
      final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(
        nodeEnvironment,
        xContentRegistry,
        bigArrays,
        clusterService.getClusterSettings(),
        threadPool::relativeTimeInMillis
      );

      // collect engine factory providers from plugins
      final Collection enginePlugins = pluginsService.filterPlugins(
        EnginePlugin.class
      );
      final Collection engineFactoryProviders = enginePlugins
        .stream()
        .map(
          plugin ->
            (Function) plugin::getEngineFactory
        )
        .collect(Collectors.toList());

      final Map indexStoreFactories = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getDirectoryFactories)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final Map recoveryStateFactories = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getRecoveryStateFactories)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final List indexFoldersDeletionListeners = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getIndexFoldersDeletionListeners)
        .flatMap(List::stream)
        .collect(Collectors.toList());

      final Map snapshotCommitSuppliers = pluginsService
        .filterPlugins(IndexStorePlugin.class)
        .stream()
        .map(IndexStorePlugin::getSnapshotCommitSuppliers)
        .flatMap(m -> m.entrySet().stream())
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

      final Map systemIndexDescriptorMap = pluginsService
        .filterPlugins(SystemIndexPlugin.class)
        .stream()
        .collect(
          Collectors.toUnmodifiableMap(
            plugin -> plugin.getClass().getSimpleName(),
            plugin -> plugin.getSystemIndexDescriptors(settings)
          )
        );
      final SystemIndices systemIndices = new SystemIndices(
        systemIndexDescriptorMap
      );

      final SystemIndexManager systemIndexManager = new SystemIndexManager(
        systemIndices,
        client
      );
      clusterService.addListener(systemIndexManager);

      final RerouteService rerouteService = new BatchedRerouteService(
        clusterService,
        clusterModule.getAllocationService()::reroute
      );
      rerouteServiceReference.set(rerouteService);
      clusterService.setRerouteService(rerouteService);

      final IndicesService indicesService = new IndicesService(
        settings,
        pluginsService,
        nodeEnvironment,
        xContentRegistry,
        analysisModule.getAnalysisRegistry(),
        clusterModule.getIndexNameExpressionResolver(),
        indicesModule.getMapperRegistry(),
        namedWriteableRegistry,
        threadPool,
        settingsModule.getIndexScopedSettings(),
        circuitBreakerService,
        bigArrays,
        scriptService,
        clusterService,
        client,
        metaStateService,
        engineFactoryProviders,
        indexStoreFactories,
        searchModule.getValuesSourceRegistry(),
        recoveryStateFactories,
        indexFoldersDeletionListeners,
        snapshotCommitSuppliers
      );

      final AliasValidator aliasValidator = new AliasValidator();

      final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(
        settings,
        clusterService
      );
      final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
        settings,
        clusterService,
        indicesService,
        clusterModule.getAllocationService(),
        aliasValidator,
        shardLimitValidator,
        environment,
        settingsModule.getIndexScopedSettings(),
        threadPool,
        xContentRegistry,
        systemIndices,
        forbidPrivateIndexSettings
      );
      pluginsService
        .filterPlugins(Plugin.class)
        .forEach(
          p ->
            p
              .getAdditionalIndexSettingProviders()
              .forEach(
                metadataCreateIndexService::addAdditionalIndexSettingProvider
              )
        );

      final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(
        threadPool,
        clusterService,
        metadataCreateIndexService
      );

      Collection pluginComponents = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .flatMap(
          p ->
            p
              .createComponents(
                client,
                clusterService,
                threadPool,
                resourceWatcherService,
                scriptService,
                xContentRegistry,
                environment,
                nodeEnvironment,
                namedWriteableRegistry,
                clusterModule.getIndexNameExpressionResolver(),
                repositoriesServiceReference::get
              )
              .stream()
        )
        .collect(Collectors.toList());

      ActionModule actionModule = new ActionModule(
        settings,
        clusterModule.getIndexNameExpressionResolver(),
        settingsModule.getIndexScopedSettings(),
        settingsModule.getClusterSettings(),
        settingsModule.getSettingsFilter(),
        threadPool,
        pluginsService.filterPlugins(ActionPlugin.class),
        client,
        circuitBreakerService,
        usageService,
        systemIndices,
        getRestCompatibleFunction()
      );
      modules.add(actionModule);

      final RestController restController = actionModule.getRestController();
      final NetworkModule networkModule = new NetworkModule(
        settings,
        pluginsService.filterPlugins(NetworkPlugin.class),
        threadPool,
        bigArrays,
        pageCacheRecycler,
        circuitBreakerService,
        namedWriteableRegistry,
        xContentRegistry,
        networkService,
        restController,
        clusterService.getClusterSettings()
      );
      Collection indexTemplateMetadataUpgraders = pluginsService
        .filterPlugins(Plugin.class)
        .stream()
        .map(Plugin::getIndexTemplateMetadataUpgrader)
        .collect(Collectors.toList());
      final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(
        indexTemplateMetadataUpgraders
      );
      final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(
        settings,
        xContentRegistry,
        indicesModule.getMapperRegistry(),
        settingsModule.getIndexScopedSettings(),
        systemIndices,
        scriptService
      );
      if (DiscoveryNode.isMasterNode(settings)) {
        clusterService.addListener(
          new SystemIndexMetadataUpgradeService(systemIndices, clusterService)
        );
      }
      new TemplateUpgradeService(
        client,
        clusterService,
        threadPool,
        indexTemplateMetadataUpgraders
      );
      final Transport transport = networkModule.getTransportSupplier().get();
      Set taskHeaders = Stream
        .concat(
          pluginsService
            .filterPlugins(ActionPlugin.class)
            .stream()
            .flatMap(p -> p.getTaskHeaders().stream()),
          Stream.of(Task.X_OPAQUE_ID)
        )
        .collect(Collectors.toSet());
      final TransportService transportService = newTransportService(
        settings,
        transport,
        threadPool,
        networkModule.getTransportInterceptor(),
        localNodeFactory,
        settingsModule.getClusterSettings(),
        taskHeaders
      );
      final GatewayMetaState gatewayMetaState = new GatewayMetaState();
      final ResponseCollectorService responseCollectorService = new ResponseCollectorService(
        clusterService
      );
      final SearchTransportService searchTransportService = new SearchTransportService(
        transportService,
        client,
        SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
      );
      final HttpServerTransport httpServerTransport = newHttpTransport(
        networkModule
      );
      final IndexingPressure indexingLimits = new IndexingPressure(settings);

      final RecoverySettings recoverySettings = new RecoverySettings(
        settings,
        settingsModule.getClusterSettings()
      );
      RepositoriesModule repositoriesModule = new RepositoriesModule(
        this.environment,
        pluginsService.filterPlugins(RepositoryPlugin.class),
        transportService,
        clusterService,
        bigArrays,
        xContentRegistry,
        recoverySettings
      );
      RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
      repositoriesServiceReference.set(repositoryService);
      SnapshotsService snapshotsService = new SnapshotsService(
        settings,
        clusterService,
        clusterModule.getIndexNameExpressionResolver(),
        repositoryService,
        transportService,
        actionModule.getActionFilters()
      );
      SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
        settings,
        clusterService,
        repositoryService,
        transportService,
        indicesService
      );
      RestoreService restoreService = new RestoreService(
        clusterService,
        repositoryService,
        clusterModule.getAllocationService(),
        metadataCreateIndexService,
        metadataIndexUpgradeService,
        clusterService.getClusterSettings(),
        shardLimitValidator
      );

      final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
        settings,
        clusterService::state,
        clusterService.getClusterSettings(),
        client,
        threadPool::relativeTimeInMillis,
        rerouteService
      );
      clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

      final DiscoveryModule discoveryModule = new DiscoveryModule(
        settings,
        transportService,
        namedWriteableRegistry,
        networkService,
        clusterService.getMasterService(),
        clusterService.getClusterApplierService(),
        clusterService.getClusterSettings(),
        pluginsService.filterPlugins(DiscoveryPlugin.class),
        clusterModule.getAllocationService(),
        environment.configFile(),
        gatewayMetaState,
        rerouteService,
        fsHealthService
      );
      this.nodeService =
        new NodeService(
          settings,
          threadPool,
          monitorService,
          discoveryModule.getDiscovery(),
          transportService,
          indicesService,
          pluginsService,
          circuitBreakerService,
          scriptService,
          httpServerTransport,
          ingestService,
          clusterService,
          settingsModule.getSettingsFilter(),
          responseCollectorService,
          searchTransportService,
          indexingLimits,
          searchModule.getValuesSourceRegistry().getUsageService()
        );

      final SearchService searchService = newSearchService(
        clusterService,
        indicesService,
        threadPool,
        scriptService,
        bigArrays,
        searchModule.getFetchPhase(),
        responseCollectorService,
        circuitBreakerService
      );

      final List            
关注
打赏
1647422595
查看更多评论
0.0582s