Elasticsearch源码一之服务启动初始化
- 2020 年 3 月 11 日
- 笔记
源码下载与编译
- 从github下载elasticsearch源码;
- 配置好对应的jdk环境和gradle环境;
- 根目录打开cmd执行gradlew idea构建idea环境;
- 用idea打开构建好的源码环境。
源码启动流程分析
主要针对server部分的启动流程进行分析,一步一步往下分析。
Elasticsearch的main方法
/** * Main entry point for starting elasticsearch */ public static void main(final String[] args) throws Exception { overrideDnsCachePolicyProperties(); /* * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This * forces such policies to take effect immediately. */ System.setSecurityManager(new SecurityManager() { @Override public void checkPermission(Permission perm) { // grant all permissions so that we can later set the security manager to the one that we want } }); LogConfigurator.registerErrorListener(); //初始化实例,在其中进行一些命令行参数的初始化操作,并添加一个beforeMain的钩子线程 final Elasticsearch elasticsearch = new Elasticsearch(); int status = main(args, elasticsearch, Terminal.DEFAULT); if (status != ExitCodes.OK) { final String basePath = System.getProperty("es.logs.base_path"); // It's possible to fail before logging has been configured, in which case there's no point // suggesting that the user look in the log file. if (basePath != null) { Terminal.DEFAULT.errorPrintln( "ERROR: Elasticsearch did not exit normally - check the logs at " + basePath + System.getProperty("file.separator") + System.getProperty("es.logs.cluster_name") + ".log" ); } exit(status); } }
main方法主要有以下几步:
- overrideDnsCachePolicyProperties方法主要用于从环境变量中获取es.networkaddress.cache.ttl和es.networkaddress.cache.negative.ttl,然后以Integer的形式回写到Security环境变量中去。
- 添加安全管理器,来告诉jvm安全管理器已经安装了。
- new Elasticsearch 创建Elasticsearch实例,具体实例化过程在后面再进行分析。这里看下Elasticsearch类的结构图如下:

- main(args, elasticsearch, Terminal.DEFAULT)方法,实际调用的是elasticsearch的main方法,该方法继承自org.elasticsearch.cli.Command#main方法。
- es的一些日志设置
接下来逐步分析上面步骤中的几个比较重要的细节
Elasticsearch的实例化
构造方法
// visible for testing Elasticsearch() { //初始化父类的构造方法 super("Starts Elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging //添加命令行解析参数 versionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints Elasticsearch version information and exits"); //是否设置守护 daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), "Starts Elasticsearch in the background") .availableUnless(versionOption); //pid文件 pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"), "Creates a pid file in the specified path on start") .availableUnless(versionOption) .withRequiredArg() .withValuesConvertedBy(new PathConverter()); quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"), "Turns off standard output/error streams logging in console") .availableUnless(versionOption) .availableUnless(daemonizeOption); }
构造方法主要预设命令行参数解析项,将这些信息预先设置到option变量中。同时会调用父类的构造方法,会传入一个线程,这个线程就是下文中会提到的父类中的beforeMain线程。
接下来我们主要看下Elasticsearch的main方法:
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception { //在这里调用elasticsearch的main方法,该方法继承自父类EnvironmentAwareCommand return elasticsearch.main(args, terminal); }
elasticsearch的main方法继承自父类,在org.elasticsearch.cli.Command#main方法中:
/** Parses options for this command from args and executes it. */ public final int main(String[] args, Terminal terminal) throws Exception { if (addShutdownHook()) { //关闭进程时需要执行的钩子线程 shutdownHookThread = new Thread(() -> { try { //参考org.elasticsearch.bootstrap.Elasticsearch#close,用于子类重写的模板方法 this.close(); } catch (final IOException e) { //-----省略部分代码----- } }); Runtime.getRuntime().addShutdownHook(shutdownHookThread); } //实例化ElasticSearch时传入的线程,注意这里调用的是run方法而不是start方法 beforeMain.run(); try { mainWithoutErrorHandling(args, terminal); } catch (OptionException e) { ...
在这里主要做下面几个事情:
- 添加一个钩子线程,在进程关闭时会进行回调,在里面会执行elasticsearch的close方法;
- 执行Elasticsearch实例化时初始化的beforeMain线程,默认是一个空线程;
- mainWithoutErrorHandling方法,接下来我们重点来看下这个方法
org.elasticsearch.cli.Command#mainWithoutErrorHandling方法代码:
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception { final OptionSet options = parser.parse(args); if (options.has(helpOption)) { printHelp(terminal, false); return; } if (options.has(silentOption)) { terminal.setVerbosity(Terminal.Verbosity.SILENT); } else if (options.has(verboseOption)) { terminal.setVerbosity(Terminal.Verbosity.VERBOSE); } else { terminal.setVerbosity(Terminal.Verbosity.NORMAL); } execute(terminal, options); }
- 参数:先看下传入的参数,args是指进程启动时传入的命令行参数,terminal参数为Terminal.DEFAULT。
- 根据初始化时的parser对命令行参数进行解析,并对terminal进行相应的设置。
- 执行execute方法,并将terminal和options传入。这个方法实际调用的是org.elasticsearch.cli.EnvironmentAwareCommand#execute(org.elasticsearch.cli.Terminal, joptsimple.OptionSet):
@Override protected void execute(Terminal terminal, OptionSet options) throws Exception { final Map<String, String> settings = new HashMap<>(); for (final KeyValuePair kvp : settingOption.values(options)) { if (kvp.value.isEmpty()) { throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty"); } if (settings.containsKey(kvp.key)) { final String message = String.format( Locale.ROOT, "setting [%s] already set, saw [%s] and [%s]", kvp.key, settings.get(kvp.key), kvp.value); throw new UserException(ExitCodes.USAGE, message); } settings.put(kvp.key, kvp.value); } //设置path.data、path.home、path.logs等属性 putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); // 执行子类中实现的execute方法 execute(terminal, options, createEnv(settings)); }
继续来一步步解析下这个方法的执行流程,主要为以下几步:
- 将option设置放到settings里面。
- 调用putSystemPropertyIfSettingIsMissing来设置data、home和logs,具体代码逻辑如下:
/** Ensure the given setting exists, reading it from system properties if not already set. */ private static void putSystemPropertyIfSettingIsMissing(final Map<String, String> settings, final String setting, final String key) { final String value = System.getProperty(key); if (value != null) { if (settings.containsKey(setting)) { ----------省略部分----- throw new IllegalArgumentException(message); } else { settings.put(setting, value); } } }
- 调用createEnv(settings)方法,通过settings中的配置信息创建Environment对象,具体创建流程为:
/** Create an {@link Environment} for the command to use. Overrideable for tests. */ protected Environment createEnv(final Map<String, String> settings) throws UserException { //根据设置创建Environment对象 return createEnv(Settings.EMPTY, settings); } /** Create an {@link Environment} for the command to use. Overrideable for tests. */ protected final Environment createEnv(final Settings baseSettings, final Map<String, String> settings) throws UserException { final String esPathConf = System.getProperty("es.path.conf"); if (esPathConf == null) { throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set"); } return InternalSettingsPreparer.prepareEnvironment(baseSettings, settings, getConfigPath(esPathConf), // HOSTNAME is set by elasticsearch-env and elasticsearch-env.bat so it is always available () -> System.getenv("HOSTNAME")); }
- 执行execute方法,并将terminal、options和environment对象作为参数传入,org.elasticsearch.bootstrap.Elasticsearch#execute方法代码为:
@Override protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { if (options.nonOptionArguments().isEmpty() == false) { throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments()); } if (options.has(versionOption)) { //输出版本信息 final String versionOutput = String.format( Locale.ROOT, "Version: %s, Build: %s/%s/%s/%s, JVM: %s", ---------省略部分----------- JvmInfo.jvmInfo().version() ); terminal.println(versionOutput); return; } final boolean daemonize = options.has(daemonizeOption); final Path pidFile = pidfileOption.value(options); final boolean quiet = options.has(quietOption); // a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately try { env.validateTmpFile(); } catch (IOException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); } try { init(daemonize, pidFile, quiet, env); } catch (NodeValidationException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); } }
在这里会将从命令行中解析得到的daemonize、pidFile、quiet以及上面创建的env对象传入init方法中,init方法的具体执行过程为:
void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv) throws NodeValidationException, UserException { try { //启动bootstrap Bootstrap.init(!daemonize, pidFile, quiet, initialEnv); } catch (BootstrapException | RuntimeException e) { // format exceptions to the console in a special way // to avoid 2MB stacktraces from guice, etc. throw new StartupException(e); } }
可以看出这里会调用Bootstrap.init方法,方法定义为:
/** * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch. */ static void init( final boolean foreground, final Path pidFile, final boolean quiet, final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException { // force the class initializer for BootstrapInfo to run before // the security manager is installed //为了保证BootstrapInfo类的初始化(主要是一些静态代码块的执行)在security manager安装之前 BootstrapInfo.init(); INSTANCE = new Bootstrap(); //初始化设置 final SecureSettings keystore = loadSecureSettings(initialEnv); //根据环境的配置信息生成Environment对象 final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile()); // the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture // the stream objects before calling LogConfigurator to be able to close them when appropriate final Runnable sysOutCloser = getSysOutCloser(); final Runnable sysErrorCloser = getSysErrorCloser(); LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings())); try { //根据初始化的environment来配置log信息 LogConfigurator.configure(environment); } catch (IOException e) { throw new BootstrapException(e); } if (environment.pidFile() != null) { try { //创建pid文件 PidFile.create(environment.pidFile(), true); } catch (IOException e) { throw new BootstrapException(e); } } try { final boolean closeStandardStreams = (foreground == false) || quiet; if (closeStandardStreams) { final Logger rootLogger = LogManager.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } sysOutCloser.run(); } // fail if somebody replaced the lucene jars //校验lucene checkLucene(); // install the default uncaught exception handler; must be done before security is // initialized as we do not want to grant the runtime permission // setDefaultUncaughtExceptionHandler //设置默认的线程未捕获异常 Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler()); INSTANCE.setup(true, environment); try { // any secure settings must be read during node construction IOUtils.close(keystore); } catch (IOException e) { throw new BootstrapException(e); } INSTANCE.start(); ----------省略部分代码----------- } catch (NodeValidationException | RuntimeException e) { ----------省略部分代码----------- throw e; } }
针对上面的方法我们主要针对几个比较主要的进行分析:
- INSTANCE = new Bootstrap() 实例化Bootstrap对象
- INSTANCE.setup(true, environment) 根据环境配置信息对Bootstrap实例进行初始化
- INSTANCE.start() 启动Bootstrap实例
接下来一个个地来看下流程:
new Bootstrap()
直接上代码:
Bootstrap() { keepAliveThread = new Thread(new Runnable() { @Override public void run() { try { //线程一直处于等待状态,直到进程被shutdown,在下面的Runtime.getRuntime().addShutdownHook中会被唤醒 keepAliveLatch.await(); } catch (InterruptedException e) { // bail out } } }, "elasticsearch[keepAlive/" + Version.CURRENT + "]"); //设置成非守护状态 keepAliveThread.setDaemon(false); // keep this thread alive (non daemon thread) until we shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { keepAliveLatch.countDown(); } }); }
构造方法主要设置了一个钩子线程,keepAliveLatch对象为CountDownLatch类型的,相当于是一个处于wait状态的线程,在进程关闭时才会被唤醒。
org.elasticsearch.bootstrap.Bootstrap#setup方法
直接上代码:
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException { Settings settings = environment.settings(); try { //加载环境中的配置,如果有需要启动本地controller进程的则启动 spawner.spawnNativeControllers(environment); } catch (IOException e) { throw new BootstrapException(e); } //初始化本地的一些配置,如创建临时文件等 initializeNatives( environment.tmpFile(), BootstrapSettings.MEMORY_LOCK_SETTING.get(settings), BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings), BootstrapSettings.CTRLHANDLER_SETTING.get(settings)); // initialize probes before the security manager is installed initializeProbes(); if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { ---------省略钩子函数部分--------- } }); } ---------省略部分---------- // install SM after natives, shutdown hooks, etc. try { //配置安全设置 Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings)); } catch (IOException | NoSuchAlgorithmException e) { throw new BootstrapException(e); } //根据Node创建节点 node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } }; }
这个方法主要进行下面几个操作:
- 加载环境中的配置,如果有需要启动本地controller进程的则启动;
- 初始化本地的一些配置,如创建临时文件等;
- 配置安全设置;
- 创建Node节点,由于篇幅问题,这部分在接下来的文章中来分析。
org.elasticsearch.bootstrap.Bootstrap#start方法
代码部分:
private void start() throws NodeValidationException { //启动节点 node.start(); //启动使用闭锁控制的监控进程是否shutdown的线程 keepAliveThread.start(); }
关于node的start方法在下面一篇介绍Node时再专门进行介绍。
到这里关于Elasticsearch初始化流程的第一篇就介绍到这里了,接下来将用专门的篇幅来对Node的实例化和node的启动过程等环节进行分析。