1.前面我们在《(一)elasticsearch 编译和启动》和 《(二)elasticsearch 源码目录 》简单了解下es(elasticsearch,下同),现在我们来看下启动代码
(相关资料图)
下面是启动流程图,我们按照流程图的顺序依次描述
2.启动流程
org.elasticsearch.bootstrap.Elasticsearchpublic static void main(final String[] args) throws Exception { overrideDnsCachePolicyProperties(); /* * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This * forces such policies to take effect immediately. */ System.setSecurityManager(new SecurityManager() { @Override public void checkPermission(Permission perm) { // grant all permissions so that we can later set the security manager to the one that we want } }); LogConfigurator.registerErrorListener(); final Elasticsearch elasticsearch = new Elasticsearch(); int status = main(args, elasticsearch, Terminal.DEFAULT); if (status != ExitCodes.OK) { exit(status); } }
后续执行 Elasticsearch.execute -> Elasticsearch.init -> Bootstrap.init
org.elasticsearch.bootstrap.Bootstrapstatic void init( final boolean foreground, final Path pidFile, final boolean quiet, final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException { // force the class initializer for BootstrapInfo to run before // the security manager is installed BootstrapInfo.init(); INSTANCE = new Bootstrap(); // 安全配置文件 final SecureSettings keystore = loadSecureSettings(initialEnv); final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile()); LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings())); try { LogConfigurator.configure(environment); } catch (IOException e) { throw new BootstrapException(e); } if (JavaVersion.current().compareTo(JavaVersion.parse("11")) < 0) { final String message = String.format( Locale.ROOT, "future versions of Elasticsearch will require Java 11; " + "your Java version from [%s] does not meet this requirement", System.getProperty("java.home")); new DeprecationLogger(LogManager.getLogger(Bootstrap.class)).deprecated(message); } // 处理pidFile if (environment.pidFile() != null) { try { PidFile.create(environment.pidFile(), true); } catch (IOException e) { throw new BootstrapException(e); } } // 如果是后台启动,则不打印日志 final boolean closeStandardStreams = (foreground == false) || quiet; try { if (closeStandardStreams) { final Logger rootLogger = LogManager.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } closeSystOut(); } // fail if somebody replaced the lucene jars checkLucene(); // 通用异常捕获 // install the default uncaught exception handler; must be done before security is // initialized as we do not want to grant the runtime permission // setDefaultUncaughtExceptionHandler Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler()); INSTANCE.setup(true, environment); try { // any secure settings must be read during node construction IOUtils.close(keystore); } catch (IOException e) { throw new BootstrapException(e); } INSTANCE.start(); if (closeStandardStreams) { closeSysError(); } }
这里我们可以关注下 INSTANCE.setup(true, environment);
org.elasticsearch.bootstrap.Bootstrapprivate void setup(boolean addShutdownHook, Environment environment) throws BootstrapException { Settings settings = environment.settings(); try { spawner.spawnNativeControllers(environment); } catch (IOException e) { throw new BootstrapException(e); } // 检查一些mlock设定 initializeNatives( environment.tmpFile(), BootstrapSettings.MEMORY_LOCK_SETTING.get(settings), BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings), BootstrapSettings.CTRLHANDLER_SETTING.get(settings)); // 探针 // initialize probes before the security manager is installed initializeProbes(); if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) { throw new IllegalStateException("Node didn"t stop within 10 seconds. " + "Any outstanding requests or tasks might get killed."); } } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); } catch (InterruptedException e) { LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown."); Thread.currentThread().interrupt(); } } }); } try { // 检查类加载的一些问题 // look for jar hell final Logger logger = LogManager.getLogger(JarHell.class); JarHell.checkJarHell(logger::debug); } catch (IOException | URISyntaxException e) { throw new BootstrapException(e); } // Log ifconfig output before SecurityManager is installed IfConfig.logIfNecessary(); // 安全处理 // install SM after natives, shutdown hooks, etc. try { Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings)); } catch (IOException | NoSuchAlgorithmException e) { throw new BootstrapException(e); } node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } }; }
最后一句 node = new Node(environment) 初始化了节点,里面做了许多工作
org.elasticsearch.node.Nodeprotected Node( final Environment environment, Collection> classpathPlugins, boolean forbidPrivateIndexSettings) { ... // 打印jvm信息 final JvmInfo jvmInfo = JvmInfo.jvmInfo(); logger.info( "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]", Build.CURRENT.getQualifiedVersion(), jvmInfo.pid(), Build.CURRENT.flavor().displayName(), Build.CURRENT.type().displayName(), Build.CURRENT.hash(), Build.CURRENT.date(), Constants.OS_NAME, Constants.OS_VERSION, Constants.OS_ARCH, Constants.JVM_VENDOR, Constants.JVM_NAME, Constants.JAVA_VERSION, Constants.JVM_VERSION);... // 初始化各类服务,以及他们相关的依赖 this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); final Settings settings = pluginsService.updatedSettings(); final Set possibleRoles = Stream.concat( DiscoveryNodeRole.BUILT_IN_ROLES.stream(), pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getRoles) .flatMap(Set::stream)) .collect(Collectors.toSet()); DiscoveryNode.setPossibleRoles(possibleRoles); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());... // guice注入 modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool);
es 使用 guice注入框架,guice是个非常轻量级的依赖注入框架,既然各个组件都已经注入好了,我们现在可以启动了。
INSTANCE.start -> Bootstrap.start
org.elasticsearch.bootstrap.Bootstrapprivate void start() throws NodeValidationException { node.start(); keepAliveThread.start(); }
node.start中启动各个组件。es中的各个组件继承了 AbstractLifecycleComponent。start方法会调用组件的doStart方法。
org.elasticsearch.node.Nodepublic Node start() throws NodeValidationException { if (!lifecycle.moveToStarted()) { return this; } logger.info("starting ..."); pluginLifecycleComponents.forEach(LifecycleComponent::start); injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(SnapshotShardsService.class).start(); injector.getInstance(SearchService.class).start(); nodeService.getMonitorService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); clusterService.setNodeConnectionsService(nodeConnectionsService); ...
具体的我们看两个比较重要的服务 transportService.start();
org.elasticsearch.transport.TransportService@Override protected void doStart() { transport.setMessageListener(this); connectionManager.addListener(this); // 建立网络连接 transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); for (Map.Entry entry : transport.profileBoundAddresses().entrySet()) { logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); } } localNode = localNodeFactory.apply(transport.boundAddress()); if (connectToRemoteCluster) { // here we start to connect to the remote clusters remoteClusterService.initializeRemoteClusters(); } }
启动transport的实现类是 SecurityNetty4HttpServerTransport
另一个比较重要的服务,discovery.start(),具体实现类是 Coordinator
org.elasticsearch.cluster.coordination.Coordinator@Override protected void doStart() { synchronized (mutex) { CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); if (lastAcceptedState.metaData().clusterUUIDCommitted()) { logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID()); } final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration(); if (singleNodeDiscovery && votingConfiguration.isEmpty() == false && votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) { throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" + DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() + " does not have quorum in voting configuration " + votingConfiguration); } ...