ES系列(三):网络通信模块解析

  ES是一个分布式搜索引擎,其除了用户提供必要的通信服务外,集群间也必须保持紧密的通信联系,才能在必要的时候给出正确的结果。其则必然涉及到各种繁多且要求高的通信场景,那么如何实现高性能的通信,则是其必须要考虑的问题。

  今天,我们就以es的transportService的实现为窥点,观察es的高性能的通信模块实现吧。

 

1. 前言概要

  谈到高性能的网络通信,相信很多人都明白大概是什么道理,或者看过我之前的一些文章,也必然清楚其核心原理。总结来说,其实就是利用io多路复用技术,充分利用带宽,从而达到高性能的目标。

  而具体到java语言上来,能聊的点也许就更少了。比如nio, netty, akka… 

  所以,其实本文所讨论的目标,看起来没有那么神秘,也没必要神秘。我们仅站在研究ES实现细节的方向,去深入理解一些实际的问题,目的仅是为了解惑。

  

2. transportService的初始化

  es中几乎所有的模块,都是在服务启动的时候进行初始化的,这是自然。一来是启动时缓慢一点是可以的,二来是启动的时候有非常多的上下文信息可用非常方便各种初始化,三来是能够提前发现问题而不是运行了很久之后才发现不可解决的问题。

  而transportService是在创建Node时进行初始化的。

    // org.elasticsearch.node.Node#start
    /**
     * Constructs a node
     *
     * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
     * @param classpathPlugins           the plugins to be loaded from the classpath
     * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
     *                                   test framework for tests that rely on being able to set private settings
     */
    protected Node(final Environment initialEnvironment,
                   Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        ...
        try {
            ...
            new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
            final Transport transport = networkModule.getTransportSupplier().get();
            Set<String> taskHeaders = Stream.concat(
                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                Stream.of(Task.X_OPAQUE_ID)
            ).collect(Collectors.toSet());
            // 创建 transportService
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
            final GatewayMetaState gatewayMetaState = new GatewayMetaState();
            ...
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

  即要初始化 transportService , 重点就要看 newTransportService() 如何处理了。在当然了,要进行这个方法的调用,它其实比较多的前提,即各种入参的初始化。重要一点的就是:线程池的创建,transport 的初始化。线程池咱们略去不说,主要是它会在非常多的地方用到,单独在这里讲也不合适。那么就主要看看 transport 是如何初始化的即可。

 

2.1. NetworkModule 的实例化

  从上面的实现中,我们看到要获取 transport 实例,还需要先拿到 networkModule ,这又是如何初始化的呢?

            // 在 Node() 的构造方法中,直接new出来的 。
            final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController, clusterService.getClusterSettings());
    // org.elasticsearch.common.network.NetworkModule#NetworkModule
    /**
     * Creates a network module that custom networking classes can be plugged into.
     * @param settings The settings for the node
     * @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
     */
    public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                         ClusterSettings clusterSettings) {
        this.settings = settings;
        this.transportClient = transportClient;
        // 这里的 plugin 可能有多个,如 XPackPlugin, Netty4Plugin, Security, VotingOnlyNodePlugin
        for (NetworkPlugin plugin : plugins) {
            Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
            if (transportClient == false) {
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    // 向 transportHttpFactories 中注册相关信息
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }
            }
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                // 向 transportFactories 中注册相关信息
                registerTransport(entry.getKey(), entry.getValue());
            }
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) {
                // 向 transportIntercetors 中注册拦截器
                registerTransportInterceptor(interceptor);
            }
        }
    }

  可见,整个 NetworkModule 的工作,重点在于注册相关的组件到自身,以便将来取用。这个容器则有可能是 map 形式的,也有可能是 list 形式的。总之,能够起到注册的作用即可。感兴趣的同学可以展开以下查看更多注册实现:

    private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
    private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
    private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();

    /** Adds an http transport implementation that can be selected by setting {@link #HTTP_TYPE_KEY}. */
    // TODO: we need another name than "http transport"....so confusing with transportClient...
    private void registerHttpTransport(String key, Supplier<HttpServerTransport> factory) {
        if (transportClient) {
            throw new IllegalArgumentException("Cannot register http transport " + key + " for transport client");
        }
        if (transportHttpFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }
    /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
    private void registerTransport(String key, Supplier<Transport> factory) {
        if (transportFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }

    /**
     * Registers a new {@link TransportInterceptor}
     */
    private void registerTransportInterceptor(TransportInterceptor interceptor) {
        this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
    }

View Code

  当然了,还有很重要的东西,就是 NetworkPlugin 中的方法的实现。因为所有的注册来源,都是基于这些方法的。这也就为我们的 plugin 提供了方便的入口,我们先来看看ES都会 NetworkPlugin 提供了哪些入口:

// org.elasticsearch.plugins
/**
 * Plugin for extending network and transport related classes
 */
public interface NetworkPlugin {

    /**
     * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
     * transport (inter-node) requests. This must not return <code>null</code>
     *
     * @param namedWriteableRegistry registry of all named writeables registered
     * @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional
     *                      headers in the interceptors
     */
    default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
                                                                ThreadContext threadContext) {
        return Collections.emptyList();
    }

    /**
     * Returns a map of {@link Transport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
     */
    default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                           CircuitBreakerService circuitBreakerService,
                                                           NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.emptyMap();
    }

    /**
     * Returns a map of {@link HttpServerTransport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
     */
    default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                         PageCacheRecycler pageCacheRecycler,
                                                                         CircuitBreakerService circuitBreakerService,
                                                                         NamedXContentRegistry xContentRegistry,
                                                                         NetworkService networkService,
                                                                         HttpServerTransport.Dispatcher dispatcher,
                                                                         ClusterSettings clusterSettings) {
        return Collections.emptyMap();
    }
}

  方法不多,刚好够前面的 NetworkModule 初始化场景用。且都有默认实现方法,即如果相应plugin不关注这块东西,就直接忽略即可。

  因为我们是冲着es的高性能服务来的,所以有必要看看netty的相关实现。netty中,将实现了两个获取 transport 的方法,而拦截器都未做处理,因为业务处理框架有需要。

    // org.elasticsearch.transport.Netty4Plugin#getTransports
    @Override
    public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                          CircuitBreakerService circuitBreakerService,
                                                          NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
            networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
    }

    @Override
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                        PageCacheRecycler pageCacheRecycler,
                                                                        CircuitBreakerService circuitBreakerService,
                                                                        NamedXContentRegistry xContentRegistry,
                                                                        NetworkService networkService,
                                                                        HttpServerTransport.Dispatcher dispatcher,
                                                                        ClusterSettings clusterSettings) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
                clusterSettings, getSharedGroupFactory(settings)));
    }

  无它,返回两个Netty相关的服务实例,备用。

 

2.2. transportService 的实例化

  上一节只是讲一些必要条件,本节才讲真正的初始化的逻辑。

    // org.elasticsearch.node.Node#newTransportService
    protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                   TransportInterceptor interceptor,
                                                   Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
                                                   ClusterSettings clusterSettings, Set<String> taskHeaders) {
        return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
    }

  没有多余的,TransportService 就是一个完整的实现类。看一下其构建方法即可。

    // org.elasticsearch.transport.TransportService#TransportService
    /**
     * Build the service.
     *
     * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
     *    updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
     */
    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                            Set<String> taskHeaders) {
        // ClusterConnectionManager 重要
        this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
            new ClusterConnectionManager(settings, transport));
    }
    
    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                            Set<String> taskHeaders, ConnectionManager connectionManager) {

        final boolean isTransportClient = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));

        // If we are a transport client then we skip the check that the remote node has a compatible build hash
        this.requireCompatibleBuild = isTransportClient == false;

        // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
        this.validateConnections = isTransportClient == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
        // 保存各配置及服务上下文
        this.transport = transport;
        transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
        this.threadPool = threadPool;
        this.localNodeFactory = localNodeFactory;
        this.connectionManager = connectionManager;
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
        setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
        tracerLog = Loggers.getLogger(logger, ".tracer");
        // 任务管理器
        taskManager = createTaskManager(settings, threadPool, taskHeaders);
        // 拦截器获取
        this.interceptor = transportInterceptor;
        this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
        this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
        // 集群服务管理
        remoteClusterService = new RemoteClusterService(settings, this);
        responseHandlers = transport.getResponseHandlers();
        if (clusterSettings != null) {
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
            if (remoteClusterClient) {
                // 监听配置更新操作
                remoteClusterService.listenForUpdates(clusterSettings);
            }
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold);
        }
        // 注册握手方法的处理器 internal:transport/handshake
        registerRequestHandler(
            HANDSHAKE_ACTION_NAME,
            ThreadPool.Names.SAME,
            false, false,
            HandshakeRequest::new,
            (request, channel, task) -> channel.sendResponse(
                new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));

        if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
            logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
                    PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning");
            DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds",
                "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed");
        }
    }

  无它,就是实例化各种必要的服务,保存必要配置信息。其中每个点都值得去深挖,但这不是我们的目的。我们只需了解大致即可。不过有一个 ClusterConnectionManager 还是需要我们重视,因为它的作用是维持和集群各节点通信的特性,此处实例化后,后面将会被完美利用。实例化时,更多的是保存 transport 实例,以便真正实现远程连接。

    // org.elasticsearch.transport.ClusterConnectionManager#ClusterConnectionManager
    public ClusterConnectionManager(Settings settings, Transport transport) {
        this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
    }

    public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
        this.transport = transport;
        this.defaultProfile = connectionProfile;
    }
    // org.elasticsearch.transport.ConnectionProfile#buildDefaultConnectionProfile
    /**
     * Builds a default connection profile based on the provided settings.
     *
     * @param settings to build the connection profile from
     * @return the connection profile
     */
    public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
        int connectionsPerNodeRecovery = TransportSettings.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
        int connectionsPerNodeBulk = TransportSettings.CONNECTIONS_PER_NODE_BULK.get(settings);
        int connectionsPerNodeReg = TransportSettings.CONNECTIONS_PER_NODE_REG.get(settings);
        int connectionsPerNodeState = TransportSettings.CONNECTIONS_PER_NODE_STATE.get(settings);
        int connectionsPerNodePing = TransportSettings.CONNECTIONS_PER_NODE_PING.get(settings);
        Builder builder = new Builder();
        builder.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings));
        builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings));
        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
        // if we are not master eligible we don't need a dedicated channel to publish the state
        builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
        // if we are not a data-node we don't need any dedicated channels for recovery
        builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
        return builder.build();
    }

  到此,整个 transportService 的实例化工作就算是完成了。至于其何真正work起来,则需要留到整个es框架的start的生命周期节点时才会体现。且看下节分解。

 

3. transportService的启动核心

  即它是如何开始工作的,以及它的工作模式是怎么样的?

  事实上,整个ES的框架,是一个生命周期管理模式存在的。而它的所有组件真正的启动时机,也是在start() 周期中统一进行的的。

    // org.elasticsearch.node.Node#start
    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);
        ...
        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        // 获取 transportService 实例
        TransportService transportService = injector.getInstance(TransportService.class);
        // 设置 taskManager 的两个任务管理器
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
        // transportService 生命周期开始
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        injector.getInstance(PeerRecoverySourceService.class).start();
        ...
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

  在该周期内,要处理的组件非常多,而我们则只挑关注点:transportService 的初始化,一窥其行为。其过程主要为,通过injector获取前面实例化的 transportService, 然后设置taskManager的必要属性, 最后调用transportService的start()方法,开启真正的服务。

  即核心就是 transportService.start() , 这是一个统一的生命周期入口方法:

    // org.elasticsearch.common.component.AbstractLifecycleComponent#start
    @Override
    public void start() {
        synchronized (lifecycle) {
            // 安全启动,不允许重复初始化,或者其他
            if (!lifecycle.canMoveToStarted()) {
                return;
            }
            // 监听者处理
            for (LifecycleListener listener : listeners) {
                listener.beforeStart();
            }
            // 各真实组件完成必要任务的地方
            doStart();
            // 设置状态为已启动,为下次判断做好依据
            lifecycle.moveToStarted();
            // 后置监听
            for (LifecycleListener listener : listeners) {
                listener.afterStart();
            }
        }
    }

  这一生命周期管理,可以非常完整了。首先,它是线程安全的,然后不允许重复初始化或在不必要的时候初始化,然后还有前置和后置监听钩子供用户扩展。监听处理自不必多说,但如何管理组件的状态,可以一起看看:

    // org.elasticsearch.common.component.Lifecycle#canMoveToStarted
    public boolean canMoveToStarted() throws IllegalStateException {
        State localState = this.state;
        if (localState == State.INITIALIZED || localState == State.STOPPED) {
            return true;
        }
        if (localState == State.STARTED) {
            return false;
        }
        if (localState == State.CLOSED) {
            throw new IllegalStateException("Can't move to started state when closed");
        }
        throw new IllegalStateException("Can't move to started with unknown state");
    }
    
    public synchronized boolean moveToStarted() throws IllegalStateException {
        State localState = this.state;
        if (localState == State.INITIALIZED || localState == State.STOPPED) {
            state = State.STARTED;
            return true;
        }
        if (localState == State.STARTED) {
            return false;
        }
        if (localState == State.CLOSED) {
            throw new IllegalStateException("Can't move to started state when closed");
        }
        throw new IllegalStateException("Can't move to started with unknown state");
    }

  状态判定,一切尽在代码中。

  接下来,是我们真正的 transportService 的启动实现了。即 transport.doStart() 方法:

    // org.elasticsearch.transport.TransportService#doStart
    @Override
    protected void doStart() {
        // transport 先start
        transport.setMessageListener(this);
        connectionManager.addListener(this);
        transport.start();
        if (transport.boundAddress() != null && logger.isInfoEnabled()) {
            logger.info("{}", transport.boundAddress());
            for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
                logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
            }
        }
        // 设置本地节点标识
        localNode = localNodeFactory.apply(transport.boundAddress());
        // 连接到集群
        if (remoteClusterClient) {
            // here we start to connect to the remote clusters
            remoteClusterService.initializeRemoteClusters();
        }
    }

  以上 transportService.doStart(), 看起来并没有实际什么工作,而只是将start()又交给了 transport 组件了。而 transportService 只是一些前置和后置工作。也难怪,transport 承担着各节点的连接能力,由其进行真正的网络通信启动,再合适不过了。

  而同样的,transport 也是一个受ES生命周期管理的组件,如同前面我们看到的一样的,它会再执行一遍。只是此时,它拥有了更多的监听器了。而它的 doStart() 则体现了其工作过程。

    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport#doStart
    @Override
    protected void doStart() {
        super.doStart();
        if (authenticator != null) {
            authenticator.setBoundTransportAddress(boundAddress(), profileBoundAddresses());
        }
    }
    // org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport#doStart
    @Override
    protected void doStart() {
        super.doStart();
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#doStart
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            // 获取 netty 的 eventGroup, 复用目的
            sharedGroup = sharedGroupFactory.getTransportGroup();
            // 创建 bootstrap, client 版本
            clientBootstrap = createClientBootstrap(sharedGroup);
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                for (ProfileSettings profileSettings : profileSettings) {
                    // 创建 bootsrap, server 版本
                    createServerBootstrap(profileSettings, sharedGroup);
                    bindServer(profileSettings);
                }
            }
            // TcpTransport 默认为空 
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#createClientBootstrap
    private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) {
        // netty 的 bootsrap 的创建过程,编程范式而已
        // 设置各可控参数
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(sharedGroup.getLowLevelGroup());

        // NettyAllocator will return the channel type designed to work with the configured allocator
        assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType());
        bootstrap.channel(NettyAllocator.getChannelType());
        bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());

        bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
        bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
        if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
            // Note that Netty logs a warning if it can't set the option
            if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
                final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                if (keepIdleOption != null) {
                    bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
                }
            }
            if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
                final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                if (keepIntervalOption != null) {
                    bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
                }
            }
            if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
                final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                if (keepCountOption != null) {
                    bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
                }
            }
        }

        final ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings);
        if (tcpSendBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
        }

        final ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
        if (tcpReceiveBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
        }

        bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

        final boolean reuseAddress = TransportSettings.TCP_REUSE_ADDRESS.get(settings);
        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);

        return bootstrap;
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#createServerBootstrap
    private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) {
        String name = profileSettings.profileName;
        if (logger.isDebugEnabled()) {
            logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
                name, sharedGroupFactory.getTransportWorkerCount(), profileSettings.portOrRange, profileSettings.bindHosts,
                profileSettings.publishHosts, receivePredictorMin, receivePredictorMax);
        }
        // serverBootstrap 的编程范式
        final ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(sharedGroup.getLowLevelGroup());

        // NettyAllocator will return the channel type designed to work with the configuredAllocator
        serverBootstrap.channel(NettyAllocator.getServerChannelType());

        // Set the allocators for both the server channel and the child channels created
        serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
        // 设置handler, 未来数据处理入口从此入
        serverBootstrap.childHandler(getServerChannelInitializer(name));
        serverBootstrap.handler(new ServerChannelExceptionHandler());

        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
        if (profileSettings.tcpKeepAlive) {
            // Note that Netty logs a warning if it can't set the option
            if (profileSettings.tcpKeepIdle >= 0) {
                final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                if (keepIdleOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
                }
            }
            if (profileSettings.tcpKeepInterval >= 0) {
                final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                if (keepIntervalOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
                }

            }
            if (profileSettings.tcpKeepCount >= 0) {
                final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                if (keepCountOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
                }
            }
        }

        if (profileSettings.sendBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes()));
        }

        if (profileSettings.receiveBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt()));
        }

        serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

        serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.validate();

        serverBootstraps.put(name, serverBootstrap);
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#getServerChannelInitializer
    protected ChannelHandler getServerChannelInitializer(String name) {
        return new ServerChannelInitializer(name);
    }
    
    protected class ServerChannelInitializer extends ChannelInitializer<Channel> {

        protected final String name;
        private final NettyByteBufSizer sizer = new NettyByteBufSizer();

        protected ServerChannelInitializer(String name) {
            this.name = name;
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            addClosedExceptionLogger(ch);
            assert ch instanceof Netty4NioSocketChannel;
            NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
            Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
            ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
            ch.pipeline().addLast("byte_buf_sizer", sizer);
            // 通过 logging 记录请求日志
            ch.pipeline().addLast("logging", new ESLoggingHandler());
            // 通过 dispatcher 分发处理请求
            ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
            serverAcceptedChannel(nettyTcpChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ExceptionsHelper.maybeDieOnAnotherThread(cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    bindServer 是将我们初始化好的 serverBootstrap, 绑定到某个端口上,以便其可以真正监听请求的到来。
    // org.elasticsearch.transport.TcpTransport#bindServer
    protected void bindServer(ProfileSettings profileSettings) {
        // Bind and start to accept incoming connections.
        InetAddress[] hostAddresses;
        List<String> profileBindHosts = profileSettings.bindHosts;
        try {
            hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
        } catch (IOException e) {
            throw new BindTransportException("Failed to resolve host " + profileBindHosts, e);
        }
        if (logger.isDebugEnabled()) {
            String[] addresses = new String[hostAddresses.length];
            for (int i = 0; i < hostAddresses.length; i++) {
                addresses[i] = NetworkAddress.format(hostAddresses[i]);
            }
            logger.debug("binding server bootstrap to: {}", (Object) addresses);
        }

        assert hostAddresses.length > 0;

        List<InetSocketAddress> boundAddresses = new ArrayList<>();
        for (InetAddress hostAddress : hostAddresses) {
            // 调用 bindToPort() 绑定端口到 serverBootstrap 上
            boundAddresses.add(bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange));
        }
        // 保存已发布的端口信息
        final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(profileSettings, boundAddresses);

        if (profileSettings.isDefaultProfile) {
            this.boundAddress = boundTransportAddress;
        } else {
            profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress);
        }
    }

    private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) {
        PortsRange portsRange = new PortsRange(port);
        final AtomicReference<Exception> lastException = new AtomicReference<>();
        final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
        closeLock.writeLock().lock();
        try {
            // No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
            if (lifecycle.initialized() == false && lifecycle.started() == false) {
                throw new IllegalStateException("transport has been stopped");
            }
            // 此处将会迭代可用端口,比如从 9300-9400 依次查找可用端口,提供服务
            boolean success = portsRange.iterate(portNumber -> {
                try {
                    // 绑定到 serverBootstrap 中
                    TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
                    serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel);
                    boundSocket.set(channel.getLocalAddress());
                } catch (Exception e) {
                    lastException.set(e);
                    return false;
                }
                return true;
            });
            if (!success) {
                throw new BindTransportException(
                    "Failed to bind to " + NetworkAddress.format(hostAddress, portsRange),
                    lastException.get()
                );
            }
        } finally {
            closeLock.writeLock().unlock();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
        }

        return boundSocket.get();
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#bind
    @Override
    protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
        // 调用 netty 的端口绑定方法,到此对外服务功能开启
        Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
        Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel);
        channel.attr(SERVER_CHANNEL_KEY).set(esChannel);
        return esChannel;
    }

  以上,就是es的transport的初始化过程了。至于后续连接或更新集群信息到其他节点,则是另一堆问题了。至少我们明白了,es是通过netty来开启服务端口,然后通过 Netty4MessageChannelHandler 来分发网络请求。

 

4. http请求处理器的初始化

  上面的分析中,我们看到了es对于transportService的实例化和初始化过程,大致明白了其处理网络请求的方式。但是,当我们细查时,发现以上提供的服务为9300端口的服务,而非我们常看到的 //localhost:9200 那种。最终,再经过一番查看后,发现原来,在启动时还会有另外一个服务会被启动,那就是 HttpServerTransport , 这才是为我们提供http查询请求的服务。

  其工作流程与以上过程基本一致,只是其初始化不同的 netty handler 而已。

    // org.elasticsearch.node.Node#start
    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);
        ...
        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        // 获取 transportService 实例
        TransportService transportService = injector.getInstance(TransportService.class);
        // 设置 taskManager 的两个任务管理器
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
        // transportService 生命周期开始
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        injector.getInstance(PeerRecoverySourceService.class).start();
        ...
        
        // 初始化 http 查询服务, 其对应的具体实现类是 SecurityNetty4ServerTransport 
        injector.getInstance(HttpServerTransport.class).start();
        ...
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport#doStart
    @Override
    protected void doStart() {
        super.doStart();
        ipFilter.setBoundHttpTransportAddress(this.boundAddress());
    }
    // org.elasticsearch.http.netty4.Netty4HttpServerTransport#doStart
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            sharedGroup = sharedGroupFactory.getHttpGroup();
            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(sharedGroup.getLowLevelGroup());

            // NettyAllocator will return the channel type designed to work with the configuredAllocator
            serverBootstrap.channel(NettyAllocator.getServerChannelType());

            // Set the allocators for both the server channel and the child channels created
            serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
            // 具体handler 的差异在此体现
            serverBootstrap.childHandler(configureServerChannelHandler());
            serverBootstrap.handler(new ServerChannelExceptionHandler(this));

            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

            if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
                // Netty logs a warning if it can't set the option, so try this only on supported platforms
                if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
                    if (SETTING_HTTP_TCP_KEEP_IDLE.get(settings) >= 0) {
                        final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                        if (keepIdleOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), SETTING_HTTP_TCP_KEEP_IDLE.get(settings));
                        }
                    }
                    if (SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings) >= 0) {
                        final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                        if (keepIntervalOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption),
                                SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings));
                        }
                    }
                    if (SETTING_HTTP_TCP_KEEP_COUNT.get(settings) >= 0) {
                        final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                        if (keepCountOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepCountOption), SETTING_HTTP_TCP_KEEP_COUNT.get(settings));
                        }
                    }
                }
            }

            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }

            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            bindServer();
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }
    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport#configureServerChannelHandler
    @Override
    public ChannelHandler configureServerChannelHandler() {
        return new HttpSslChannelHandler();
    }
        // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport.HttpSslChannelHandler#HttpSslChannelHandler
        HttpSslChannelHandler() {
            super(SecurityNetty4HttpServerTransport.this, handlingSettings);
        }
        // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#HttpChannelHandler
        protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
            this.transport = transport;
            this.handlingSettings = handlingSettings;
            this.byteBufSizer =  new NettyByteBufSizer();
            this.requestCreator =  new Netty4HttpRequestCreator();
            this.requestHandler = new Netty4HttpRequestHandler(transport);
            this.responseCreator = new Netty4HttpResponseCreator();
        }
        // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel
        @Override
        protected void initChannel(Channel ch) throws Exception {
            Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
            // 此处 handler 配置的相当多, 自然是因其功能复杂的原因
            ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
            ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
            ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
            final HttpRequestDecoder decoder = new HttpRequestDecoder(
                handlingSettings.getMaxInitialLineLength(),
                handlingSettings.getMaxHeaderSize(),
                handlingSettings.getMaxChunkSize());
            decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            ch.pipeline().addLast("decoder", decoder);
            ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
            ch.pipeline().addLast("encoder", new HttpResponseEncoder());
            final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
            aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
            ch.pipeline().addLast("aggregator", aggregator);
            if (handlingSettings.isCompression()) {
                ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
            }
            ch.pipeline().addLast("request_creator", requestCreator);
            ch.pipeline().addLast("response_creator", responseCreator);
            // 最后两个处理器, pipelineing, handler, 则处理真正的业务
            ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
            ch.pipeline().addLast("handler", requestHandler);
            transport.serverAcceptedChannel(nettyHttpChannel);
        }

  整体流程就是这样,核心就是 netty 的编程范式。最关键的就是引入最后几个处理器,这也是netty框架使用者的关注点所在。

  其中,本节所讲的http server, 对应的服务端口默认是9200, 而上一节所讲对应的默认端口则是9300. 其实差别主要在于应用场景不同或者说使用的协议不同,一个是基于http协议的,一个是基于tcp协议的。http属于高层协议,其应用相对容易些,而tcp则使用起来有一些门槛,但其性能更好,用于集群间的通信则再好不过。

  以上差异,并不影响我们理解 transportService 的整体逻辑。

 

  至于ES如何接收到网络请求后,如何处理的业务,其框架如何,其又有何具体能力?且听下回分解。