Elasticsearch源码一之服务启动初始化

  • 2020 年 3 月 11 日
  • 笔记

源码下载与编译

  1. 从github下载elasticsearch源码;
  2. 配置好对应的jdk环境和gradle环境;
  3. 根目录打开cmd执行gradlew idea构建idea环境;
  4. 用idea打开构建好的源码环境。

源码启动流程分析

主要针对server部分的启动流程进行分析,一步一步往下分析。

Elasticsearch的main方法

/**       * Main entry point for starting elasticsearch       */      public static void main(final String[] args) throws Exception {          overrideDnsCachePolicyProperties();          /*           * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the           * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This           * forces such policies to take effect immediately.           */          System.setSecurityManager(new SecurityManager() {                @Override              public void checkPermission(Permission perm) {                  // grant all permissions so that we can later set the security manager to the one that we want              }            });          LogConfigurator.registerErrorListener();          //初始化实例,在其中进行一些命令行参数的初始化操作,并添加一个beforeMain的钩子线程          final Elasticsearch elasticsearch = new Elasticsearch();          int status = main(args, elasticsearch, Terminal.DEFAULT);          if (status != ExitCodes.OK) {              final String basePath = System.getProperty("es.logs.base_path");              // It's possible to fail before logging has been configured, in which case there's no point              // suggesting that the user look in the log file.              if (basePath != null) {                  Terminal.DEFAULT.errorPrintln(                      "ERROR: Elasticsearch did not exit normally - check the logs at "                          + basePath                          + System.getProperty("file.separator")                          + System.getProperty("es.logs.cluster_name") + ".log"                  );              }              exit(status);          }      }

main方法主要有以下几步:

  • overrideDnsCachePolicyProperties方法主要用于从环境变量中获取es.networkaddress.cache.ttl和es.networkaddress.cache.negative.ttl,然后以Integer的形式回写到Security环境变量中去。
  • 添加安全管理器,来告诉jvm安全管理器已经安装了。
  • new Elasticsearch 创建Elasticsearch实例,具体实例化过程在后面再进行分析。这里看下Elasticsearch类的结构图如下:
  • main(args, elasticsearch, Terminal.DEFAULT)方法,实际调用的是elasticsearch的main方法,该方法继承自org.elasticsearch.cli.Command#main方法。
  • es的一些日志设置

接下来逐步分析上面步骤中的几个比较重要的细节

Elasticsearch的实例化

构造方法

// visible for testing      Elasticsearch() {          //初始化父类的构造方法          super("Starts Elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging          //添加命令行解析参数          versionOption = parser.acceptsAll(Arrays.asList("V", "version"),              "Prints Elasticsearch version information and exits");          //是否设置守护          daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),              "Starts Elasticsearch in the background")              .availableUnless(versionOption);          //pid文件          pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),              "Creates a pid file in the specified path on start")              .availableUnless(versionOption)              .withRequiredArg()              .withValuesConvertedBy(new PathConverter());          quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),              "Turns off standard output/error streams logging in console")              .availableUnless(versionOption)              .availableUnless(daemonizeOption);      }

构造方法主要预设命令行参数解析项,将这些信息预先设置到option变量中。同时会调用父类的构造方法,会传入一个线程,这个线程就是下文中会提到的父类中的beforeMain线程。

接下来我们主要看下Elasticsearch的main方法:

static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {          //在这里调用elasticsearch的main方法,该方法继承自父类EnvironmentAwareCommand          return elasticsearch.main(args, terminal);      }

elasticsearch的main方法继承自父类,在org.elasticsearch.cli.Command#main方法中:

/** Parses options for this command from args and executes it. */      public final int main(String[] args, Terminal terminal) throws Exception {          if (addShutdownHook()) {              //关闭进程时需要执行的钩子线程              shutdownHookThread = new Thread(() -> {                  try {                      //参考org.elasticsearch.bootstrap.Elasticsearch#close,用于子类重写的模板方法                      this.close();                  } catch (final IOException e) {                      //-----省略部分代码-----                  }              });              Runtime.getRuntime().addShutdownHook(shutdownHookThread);          }          //实例化ElasticSearch时传入的线程,注意这里调用的是run方法而不是start方法          beforeMain.run();            try {              mainWithoutErrorHandling(args, terminal);          } catch (OptionException e) {          ...

在这里主要做下面几个事情:

  • 添加一个钩子线程,在进程关闭时会进行回调,在里面会执行elasticsearch的close方法;
  • 执行Elasticsearch实例化时初始化的beforeMain线程,默认是一个空线程;
  • mainWithoutErrorHandling方法,接下来我们重点来看下这个方法

org.elasticsearch.cli.Command#mainWithoutErrorHandling方法代码:

void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {          final OptionSet options = parser.parse(args);            if (options.has(helpOption)) {              printHelp(terminal, false);              return;          }            if (options.has(silentOption)) {              terminal.setVerbosity(Terminal.Verbosity.SILENT);          } else if (options.has(verboseOption)) {              terminal.setVerbosity(Terminal.Verbosity.VERBOSE);          } else {              terminal.setVerbosity(Terminal.Verbosity.NORMAL);          }            execute(terminal, options);      }
  • 参数:先看下传入的参数,args是指进程启动时传入的命令行参数,terminal参数为Terminal.DEFAULT。
  • 根据初始化时的parser对命令行参数进行解析,并对terminal进行相应的设置。
  • 执行execute方法,并将terminal和options传入。这个方法实际调用的是org.elasticsearch.cli.EnvironmentAwareCommand#execute(org.elasticsearch.cli.Terminal, joptsimple.OptionSet):
@Override      protected void execute(Terminal terminal, OptionSet options) throws Exception {          final Map<String, String> settings = new HashMap<>();          for (final KeyValuePair kvp : settingOption.values(options)) {              if (kvp.value.isEmpty()) {                  throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");              }              if (settings.containsKey(kvp.key)) {                  final String message = String.format(                          Locale.ROOT,                          "setting [%s] already set, saw [%s] and [%s]",                          kvp.key,                          settings.get(kvp.key),                          kvp.value);                  throw new UserException(ExitCodes.USAGE, message);              }              settings.put(kvp.key, kvp.value);          }          //设置path.data、path.home、path.logs等属性          putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");          putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");          putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");          // 执行子类中实现的execute方法          execute(terminal, options, createEnv(settings));      }

继续来一步步解析下这个方法的执行流程,主要为以下几步:

  1. 将option设置放到settings里面。
  2. 调用putSystemPropertyIfSettingIsMissing来设置data、home和logs,具体代码逻辑如下:
/** Ensure the given setting exists, reading it from system properties if not already set. */      private static void putSystemPropertyIfSettingIsMissing(final Map<String, String> settings, final String setting, final String key) {          final String value = System.getProperty(key);          if (value != null) {              if (settings.containsKey(setting)) {                  ----------省略部分-----                  throw new IllegalArgumentException(message);              } else {                  settings.put(setting, value);              }          }      }
  1. 调用createEnv(settings)方法,通过settings中的配置信息创建Environment对象,具体创建流程为:
/** Create an {@link Environment} for the command to use. Overrideable for tests. */      protected Environment createEnv(final Map<String, String> settings) throws UserException {          //根据设置创建Environment对象          return createEnv(Settings.EMPTY, settings);      }        /** Create an {@link Environment} for the command to use. Overrideable for tests. */      protected final Environment createEnv(final Settings baseSettings, final Map<String, String> settings) throws UserException {          final String esPathConf = System.getProperty("es.path.conf");          if (esPathConf == null) {              throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");          }          return InternalSettingsPreparer.prepareEnvironment(baseSettings, settings,              getConfigPath(esPathConf),              // HOSTNAME is set by elasticsearch-env and elasticsearch-env.bat so it is always available              () -> System.getenv("HOSTNAME"));      }
  1. 执行execute方法,并将terminal、options和environment对象作为参数传入,org.elasticsearch.bootstrap.Elasticsearch#execute方法代码为:
@Override      protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {          if (options.nonOptionArguments().isEmpty() == false) {              throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());          }          if (options.has(versionOption)) {              //输出版本信息              final String versionOutput = String.format(                  Locale.ROOT,                  "Version: %s, Build: %s/%s/%s/%s, JVM: %s",                  ---------省略部分-----------                  JvmInfo.jvmInfo().version()              );              terminal.println(versionOutput);              return;          }            final boolean daemonize = options.has(daemonizeOption);          final Path pidFile = pidfileOption.value(options);          final boolean quiet = options.has(quietOption);            // a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately          try {              env.validateTmpFile();          } catch (IOException e) {              throw new UserException(ExitCodes.CONFIG, e.getMessage());          }            try {              init(daemonize, pidFile, quiet, env);          } catch (NodeValidationException e) {              throw new UserException(ExitCodes.CONFIG, e.getMessage());          }      }

在这里会将从命令行中解析得到的daemonize、pidFile、quiet以及上面创建的env对象传入init方法中,init方法的具体执行过程为:

void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)          throws NodeValidationException, UserException {          try {              //启动bootstrap              Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);          } catch (BootstrapException | RuntimeException e) {              // format exceptions to the console in a special way              // to avoid 2MB stacktraces from guice, etc.              throw new StartupException(e);          }      }

可以看出这里会调用Bootstrap.init方法,方法定义为:

/**       * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.       */      static void init(              final boolean foreground,              final Path pidFile,              final boolean quiet,              final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {          // force the class initializer for BootstrapInfo to run before          // the security manager is installed          //为了保证BootstrapInfo类的初始化(主要是一些静态代码块的执行)在security manager安装之前          BootstrapInfo.init();            INSTANCE = new Bootstrap();          //初始化设置          final SecureSettings keystore = loadSecureSettings(initialEnv);          //根据环境的配置信息生成Environment对象          final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());            // the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture          // the stream objects before calling LogConfigurator to be able to close them when appropriate          final Runnable sysOutCloser = getSysOutCloser();          final Runnable sysErrorCloser = getSysErrorCloser();            LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));          try {              //根据初始化的environment来配置log信息              LogConfigurator.configure(environment);          } catch (IOException e) {              throw new BootstrapException(e);          }          if (environment.pidFile() != null) {              try {                  //创建pid文件                  PidFile.create(environment.pidFile(), true);              } catch (IOException e) {                  throw new BootstrapException(e);              }          }              try {              final boolean closeStandardStreams = (foreground == false) || quiet;              if (closeStandardStreams) {                  final Logger rootLogger = LogManager.getRootLogger();                  final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);                  if (maybeConsoleAppender != null) {                      Loggers.removeAppender(rootLogger, maybeConsoleAppender);                  }                  sysOutCloser.run();              }                // fail if somebody replaced the lucene jars              //校验lucene              checkLucene();                // install the default uncaught exception handler; must be done before security is              // initialized as we do not want to grant the runtime permission              // setDefaultUncaughtExceptionHandler              //设置默认的线程未捕获异常              Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());                INSTANCE.setup(true, environment);                try {                  // any secure settings must be read during node construction                  IOUtils.close(keystore);              } catch (IOException e) {                  throw new BootstrapException(e);              }                INSTANCE.start();              ----------省略部分代码-----------          } catch (NodeValidationException | RuntimeException e) {              ----------省略部分代码-----------              throw e;          }      }

针对上面的方法我们主要针对几个比较主要的进行分析:

  • INSTANCE = new Bootstrap() 实例化Bootstrap对象
  • INSTANCE.setup(true, environment) 根据环境配置信息对Bootstrap实例进行初始化
  • INSTANCE.start() 启动Bootstrap实例

接下来一个个地来看下流程:

new Bootstrap()

直接上代码:

Bootstrap() {          keepAliveThread = new Thread(new Runnable() {              @Override              public void run() {                  try {                      //线程一直处于等待状态,直到进程被shutdown,在下面的Runtime.getRuntime().addShutdownHook中会被唤醒                      keepAliveLatch.await();                  } catch (InterruptedException e) {                      // bail out                  }              }          }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");          //设置成非守护状态          keepAliveThread.setDaemon(false);          // keep this thread alive (non daemon thread) until we shutdown          Runtime.getRuntime().addShutdownHook(new Thread() {              @Override              public void run() {                  keepAliveLatch.countDown();              }          });      }

构造方法主要设置了一个钩子线程,keepAliveLatch对象为CountDownLatch类型的,相当于是一个处于wait状态的线程,在进程关闭时才会被唤醒。

org.elasticsearch.bootstrap.Bootstrap#setup方法

直接上代码:

private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {      Settings settings = environment.settings();        try {          //加载环境中的配置,如果有需要启动本地controller进程的则启动          spawner.spawnNativeControllers(environment);      } catch (IOException e) {          throw new BootstrapException(e);      }      //初始化本地的一些配置,如创建临时文件等      initializeNatives(              environment.tmpFile(),              BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),              BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),              BootstrapSettings.CTRLHANDLER_SETTING.get(settings));        // initialize probes before the security manager is installed      initializeProbes();        if (addShutdownHook) {          Runtime.getRuntime().addShutdownHook(new Thread() {              @Override              public void run() {                 ---------省略钩子函数部分---------              }          });      }        ---------省略部分----------      // install SM after natives, shutdown hooks, etc.      try {          //配置安全设置          Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));      } catch (IOException | NoSuchAlgorithmException e) {          throw new BootstrapException(e);      }      //根据Node创建节点      node = new Node(environment) {          @Override          protected void validateNodeBeforeAcceptingRequests(              final BootstrapContext context,              final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {              BootstrapChecks.check(context, boundTransportAddress, checks);          }      };  }

这个方法主要进行下面几个操作:

  1. 加载环境中的配置,如果有需要启动本地controller进程的则启动;
  2. 初始化本地的一些配置,如创建临时文件等;
  3. 配置安全设置;
  4. 创建Node节点,由于篇幅问题,这部分在接下来的文章中来分析。

org.elasticsearch.bootstrap.Bootstrap#start方法

代码部分:

private void start() throws NodeValidationException {          //启动节点          node.start();          //启动使用闭锁控制的监控进程是否shutdown的线程          keepAliveThread.start();      }

关于node的start方法在下面一篇介绍Node时再专门进行介绍。

到这里关于Elasticsearch初始化流程的第一篇就介绍到这里了,接下来将用专门的篇幅来对Node的实例化和node的启动过程等环节进行分析。