ES系列(三):網絡通信模塊解析

  ES是一個分佈式搜索引擎,其除了用戶提供必要的通信服務外,集群間也必須保持緊密的通信聯繫,才能在必要的時候給出正確的結果。其則必然涉及到各種繁多且要求高的通信場景,那麼如何實現高性能的通信,則是其必須要考慮的問題。

  今天,我們就以es的transportService的實現為窺點,觀察es的高性能的通信模塊實現吧。

 

1. 前言概要

  談到高性能的網絡通信,相信很多人都明白大概是什麼道理,或者看過我之前的一些文章,也必然清楚其核心原理。總結來說,其實就是利用io多路復用技術,充分利用帶寬,從而達到高性能的目標。

  而具體到java語言上來,能聊的點也許就更少了。比如nio, netty, akka… 

  所以,其實本文所討論的目標,看起來沒有那麼神秘,也沒必要神秘。我們僅站在研究ES實現細節的方向,去深入理解一些實際的問題,目的僅是為了解惑。

  

2. transportService的初始化

  es中幾乎所有的模塊,都是在服務啟動的時候進行初始化的,這是自然。一來是啟動時緩慢一點是可以的,二來是啟動的時候有非常多的上下文信息可用非常方便各種初始化,三來是能夠提前發現問題而不是運行了很久之後才發現不可解決的問題。

  而transportService是在創建Node時進行初始化的。

    // org.elasticsearch.node.Node#start
    /**
     * Constructs a node
     *
     * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
     * @param classpathPlugins           the plugins to be loaded from the classpath
     * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
     *                                   test framework for tests that rely on being able to set private settings
     */
    protected Node(final Environment initialEnvironment,
                   Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        ...
        try {
            ...
            new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
            final Transport transport = networkModule.getTransportSupplier().get();
            Set<String> taskHeaders = Stream.concat(
                pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                Stream.of(Task.X_OPAQUE_ID)
            ).collect(Collectors.toSet());
            // 創建 transportService
            final TransportService transportService = newTransportService(settings, transport, threadPool,
                networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
            final GatewayMetaState gatewayMetaState = new GatewayMetaState();
            ...
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

  即要初始化 transportService , 重點就要看 newTransportService() 如何處理了。在當然了,要進行這個方法的調用,它其實比較多的前提,即各種入參的初始化。重要一點的就是:線程池的創建,transport 的初始化。線程池咱們略去不說,主要是它會在非常多的地方用到,單獨在這裡講也不合適。那麼就主要看看 transport 是如何初始化的即可。

 

2.1. NetworkModule 的實例化

  從上面的實現中,我們看到要獲取 transport 實例,還需要先拿到 networkModule ,這又是如何初始化的呢?

            // 在 Node() 的構造方法中,直接new出來的 。
            final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                networkService, restController, clusterService.getClusterSettings());
    // org.elasticsearch.common.network.NetworkModule#NetworkModule
    /**
     * Creates a network module that custom networking classes can be plugged into.
     * @param settings The settings for the node
     * @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
     */
    public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
                         BigArrays bigArrays,
                         PageCacheRecycler pageCacheRecycler,
                         CircuitBreakerService circuitBreakerService,
                         NamedWriteableRegistry namedWriteableRegistry,
                         NamedXContentRegistry xContentRegistry,
                         NetworkService networkService, HttpServerTransport.Dispatcher dispatcher,
                         ClusterSettings clusterSettings) {
        this.settings = settings;
        this.transportClient = transportClient;
        // 這裡的 plugin 可能有多個,如 XPackPlugin, Netty4Plugin, Security, VotingOnlyNodePlugin
        for (NetworkPlugin plugin : plugins) {
            Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
                pageCacheRecycler, circuitBreakerService, xContentRegistry, networkService, dispatcher, clusterSettings);
            if (transportClient == false) {
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    // 向 transportHttpFactories 中註冊相關信息
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }
            }
            Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
                circuitBreakerService, namedWriteableRegistry, networkService);
            for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
                // 向 transportFactories 中註冊相關信息
                registerTransport(entry.getKey(), entry.getValue());
            }
            List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
                threadPool.getThreadContext());
            for (TransportInterceptor interceptor : transportInterceptors) {
                // 向 transportIntercetors 中註冊攔截器
                registerTransportInterceptor(interceptor);
            }
        }
    }

  可見,整個 NetworkModule 的工作,重點在於註冊相關的組件到自身,以便將來取用。這個容器則有可能是 map 形式的,也有可能是 list 形式的。總之,能夠起到註冊的作用即可。感興趣的同學可以展開以下查看更多註冊實現:

    private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
    private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
    private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();

    /** Adds an http transport implementation that can be selected by setting {@link #HTTP_TYPE_KEY}. */
    // TODO: we need another name than "http transport"....so confusing with transportClient...
    private void registerHttpTransport(String key, Supplier<HttpServerTransport> factory) {
        if (transportClient) {
            throw new IllegalArgumentException("Cannot register http transport " + key + " for transport client");
        }
        if (transportHttpFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }
    /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
    private void registerTransport(String key, Supplier<Transport> factory) {
        if (transportFactories.putIfAbsent(key, factory) != null) {
            throw new IllegalArgumentException("transport for name: " + key + " is already registered");
        }
    }

    /**
     * Registers a new {@link TransportInterceptor}
     */
    private void registerTransportInterceptor(TransportInterceptor interceptor) {
        this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
    }

View Code

  當然了,還有很重要的東西,就是 NetworkPlugin 中的方法的實現。因為所有的註冊來源,都是基於這些方法的。這也就為我們的 plugin 提供了方便的入口,我們先來看看ES都會 NetworkPlugin 提供了哪些入口:

// org.elasticsearch.plugins
/**
 * Plugin for extending network and transport related classes
 */
public interface NetworkPlugin {

    /**
     * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
     * transport (inter-node) requests. This must not return <code>null</code>
     *
     * @param namedWriteableRegistry registry of all named writeables registered
     * @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional
     *                      headers in the interceptors
     */
    default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
                                                                ThreadContext threadContext) {
        return Collections.emptyList();
    }

    /**
     * Returns a map of {@link Transport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
     */
    default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                           CircuitBreakerService circuitBreakerService,
                                                           NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.emptyMap();
    }

    /**
     * Returns a map of {@link HttpServerTransport} suppliers.
     * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
     */
    default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                         PageCacheRecycler pageCacheRecycler,
                                                                         CircuitBreakerService circuitBreakerService,
                                                                         NamedXContentRegistry xContentRegistry,
                                                                         NetworkService networkService,
                                                                         HttpServerTransport.Dispatcher dispatcher,
                                                                         ClusterSettings clusterSettings) {
        return Collections.emptyMap();
    }
}

  方法不多,剛好夠前面的 NetworkModule 初始化場景用。且都有默認實現方法,即如果相應plugin不關注這塊東西,就直接忽略即可。

  因為我們是衝著es的高性能服務來的,所以有必要看看netty的相關實現。netty中,將實現了兩個獲取 transport 的方法,而攔截器都未做處理,因為業務處理框架有需要。

    // org.elasticsearch.transport.Netty4Plugin#getTransports
    @Override
    public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
                                                          CircuitBreakerService circuitBreakerService,
                                                          NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
            networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
    }

    @Override
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
                                                                        PageCacheRecycler pageCacheRecycler,
                                                                        CircuitBreakerService circuitBreakerService,
                                                                        NamedXContentRegistry xContentRegistry,
                                                                        NetworkService networkService,
                                                                        HttpServerTransport.Dispatcher dispatcher,
                                                                        ClusterSettings clusterSettings) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
                clusterSettings, getSharedGroupFactory(settings)));
    }

  無它,返回兩個Netty相關的服務實例,備用。

 

2.2. transportService 的實例化

  上一節只是講一些必要條件,本節才講真正的初始化的邏輯。

    // org.elasticsearch.node.Node#newTransportService
    protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
                                                   TransportInterceptor interceptor,
                                                   Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
                                                   ClusterSettings clusterSettings, Set<String> taskHeaders) {
        return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
    }

  沒有多餘的,TransportService 就是一個完整的實現類。看一下其構建方法即可。

    // org.elasticsearch.transport.TransportService#TransportService
    /**
     * Build the service.
     *
     * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
     *    updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
     */
    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                            Set<String> taskHeaders) {
        // ClusterConnectionManager 重要
        this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
            new ClusterConnectionManager(settings, transport));
    }
    
    public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
                            Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
                            Set<String> taskHeaders, ConnectionManager connectionManager) {

        final boolean isTransportClient = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));

        // If we are a transport client then we skip the check that the remote node has a compatible build hash
        this.requireCompatibleBuild = isTransportClient == false;

        // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
        this.validateConnections = isTransportClient == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
        // 保存各配置及服務上下文
        this.transport = transport;
        transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
        this.threadPool = threadPool;
        this.localNodeFactory = localNodeFactory;
        this.connectionManager = connectionManager;
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
        setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
        tracerLog = Loggers.getLogger(logger, ".tracer");
        // 任務管理器
        taskManager = createTaskManager(settings, threadPool, taskHeaders);
        // 攔截器獲取
        this.interceptor = transportInterceptor;
        this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
        this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
        // 集群服務管理
        remoteClusterService = new RemoteClusterService(settings, this);
        responseHandlers = transport.getResponseHandlers();
        if (clusterSettings != null) {
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
            if (remoteClusterClient) {
                // 監聽配置更新操作
                remoteClusterService.listenForUpdates(clusterSettings);
            }
            clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, transport::setSlowLogThreshold);
        }
        // 註冊握手方法的處理器 internal:transport/handshake
        registerRequestHandler(
            HANDSHAKE_ACTION_NAME,
            ThreadPool.Names.SAME,
            false, false,
            HandshakeRequest::new,
            (request, channel, task) -> channel.sendResponse(
                new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));

        if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
            logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
                    PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning");
            DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds",
                "system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed");
        }
    }

  無它,就是實例化各種必要的服務,保存必要配置信息。其中每個點都值得去深挖,但這不是我們的目的。我們只需了解大致即可。不過有一個 ClusterConnectionManager 還是需要我們重視,因為它的作用是維持和集群各節點通信的特性,此處實例化後,後面將會被完美利用。實例化時,更多的是保存 transport 實例,以便真正實現遠程連接。

    // org.elasticsearch.transport.ClusterConnectionManager#ClusterConnectionManager
    public ClusterConnectionManager(Settings settings, Transport transport) {
        this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
    }

    public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
        this.transport = transport;
        this.defaultProfile = connectionProfile;
    }
    // org.elasticsearch.transport.ConnectionProfile#buildDefaultConnectionProfile
    /**
     * Builds a default connection profile based on the provided settings.
     *
     * @param settings to build the connection profile from
     * @return the connection profile
     */
    public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
        int connectionsPerNodeRecovery = TransportSettings.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
        int connectionsPerNodeBulk = TransportSettings.CONNECTIONS_PER_NODE_BULK.get(settings);
        int connectionsPerNodeReg = TransportSettings.CONNECTIONS_PER_NODE_REG.get(settings);
        int connectionsPerNodeState = TransportSettings.CONNECTIONS_PER_NODE_STATE.get(settings);
        int connectionsPerNodePing = TransportSettings.CONNECTIONS_PER_NODE_PING.get(settings);
        Builder builder = new Builder();
        builder.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
        builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings));
        builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings));
        builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
        builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
        // if we are not master eligible we don't need a dedicated channel to publish the state
        builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
        // if we are not a data-node we don't need any dedicated channels for recovery
        builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
        builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
        return builder.build();
    }

  到此,整個 transportService 的實例化工作就算是完成了。至於其何真正work起來,則需要留到整個es框架的start的生命周期節點時才會體現。且看下節分解。

 

3. transportService的啟動核心

  即它是如何開始工作的,以及它的工作模式是怎麼樣的?

  事實上,整個ES的框架,是一個生命周期管理模式存在的。而它的所有組件真正的啟動時機,也是在start() 周期中統一進行的的。

    // org.elasticsearch.node.Node#start
    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);
        ...
        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        // 獲取 transportService 實例
        TransportService transportService = injector.getInstance(TransportService.class);
        // 設置 taskManager 的兩個任務管理器
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
        // transportService 生命周期開始
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        injector.getInstance(PeerRecoverySourceService.class).start();
        ...
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

  在該周期內,要處理的組件非常多,而我們則只挑關注點:transportService 的初始化,一窺其行為。其過程主要為,通過injector獲取前面實例化的 transportService, 然後設置taskManager的必要屬性, 最後調用transportService的start()方法,開啟真正的服務。

  即核心就是 transportService.start() , 這是一個統一的生命周期入口方法:

    // org.elasticsearch.common.component.AbstractLifecycleComponent#start
    @Override
    public void start() {
        synchronized (lifecycle) {
            // 安全啟動,不允許重複初始化,或者其他
            if (!lifecycle.canMoveToStarted()) {
                return;
            }
            // 監聽者處理
            for (LifecycleListener listener : listeners) {
                listener.beforeStart();
            }
            // 各真實組件完成必要任務的地方
            doStart();
            // 設置狀態為已啟動,為下次判斷做好依據
            lifecycle.moveToStarted();
            // 後置監聽
            for (LifecycleListener listener : listeners) {
                listener.afterStart();
            }
        }
    }

  這一生命周期管理,可以非常完整了。首先,它是線程安全的,然後不允許重複初始化或在不必要的時候初始化,然後還有前置和後置監聽鉤子供用戶擴展。監聽處理自不必多說,但如何管理組件的狀態,可以一起看看:

    // org.elasticsearch.common.component.Lifecycle#canMoveToStarted
    public boolean canMoveToStarted() throws IllegalStateException {
        State localState = this.state;
        if (localState == State.INITIALIZED || localState == State.STOPPED) {
            return true;
        }
        if (localState == State.STARTED) {
            return false;
        }
        if (localState == State.CLOSED) {
            throw new IllegalStateException("Can't move to started state when closed");
        }
        throw new IllegalStateException("Can't move to started with unknown state");
    }
    
    public synchronized boolean moveToStarted() throws IllegalStateException {
        State localState = this.state;
        if (localState == State.INITIALIZED || localState == State.STOPPED) {
            state = State.STARTED;
            return true;
        }
        if (localState == State.STARTED) {
            return false;
        }
        if (localState == State.CLOSED) {
            throw new IllegalStateException("Can't move to started state when closed");
        }
        throw new IllegalStateException("Can't move to started with unknown state");
    }

  狀態判定,一切盡在代碼中。

  接下來,是我們真正的 transportService 的啟動實現了。即 transport.doStart() 方法:

    // org.elasticsearch.transport.TransportService#doStart
    @Override
    protected void doStart() {
        // transport 先start
        transport.setMessageListener(this);
        connectionManager.addListener(this);
        transport.start();
        if (transport.boundAddress() != null && logger.isInfoEnabled()) {
            logger.info("{}", transport.boundAddress());
            for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
                logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
            }
        }
        // 設置本地節點標識
        localNode = localNodeFactory.apply(transport.boundAddress());
        // 連接到集群
        if (remoteClusterClient) {
            // here we start to connect to the remote clusters
            remoteClusterService.initializeRemoteClusters();
        }
    }

  以上 transportService.doStart(), 看起來並沒有實際什麼工作,而只是將start()又交給了 transport 組件了。而 transportService 只是一些前置和後置工作。也難怪,transport 承擔著各節點的連接能力,由其進行真正的網絡通信啟動,再合適不過了。

  而同樣的,transport 也是一個受ES生命周期管理的組件,如同前面我們看到的一樣的,它會再執行一遍。只是此時,它擁有了更多的監聽器了。而它的 doStart() 則體現了其工作過程。

    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport#doStart
    @Override
    protected void doStart() {
        super.doStart();
        if (authenticator != null) {
            authenticator.setBoundTransportAddress(boundAddress(), profileBoundAddresses());
        }
    }
    // org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport#doStart
    @Override
    protected void doStart() {
        super.doStart();
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#doStart
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            // 獲取 netty 的 eventGroup, 復用目的
            sharedGroup = sharedGroupFactory.getTransportGroup();
            // 創建 bootstrap, client 版本
            clientBootstrap = createClientBootstrap(sharedGroup);
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                for (ProfileSettings profileSettings : profileSettings) {
                    // 創建 bootsrap, server 版本
                    createServerBootstrap(profileSettings, sharedGroup);
                    bindServer(profileSettings);
                }
            }
            // TcpTransport 默認為空 
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#createClientBootstrap
    private Bootstrap createClientBootstrap(SharedGroupFactory.SharedGroup sharedGroup) {
        // netty 的 bootsrap 的創建過程,編程範式而已
        // 設置各可控參數
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(sharedGroup.getLowLevelGroup());

        // NettyAllocator will return the channel type designed to work with the configured allocator
        assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType());
        bootstrap.channel(NettyAllocator.getChannelType());
        bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());

        bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
        bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
        if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
            // Note that Netty logs a warning if it can't set the option
            if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
                final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                if (keepIdleOption != null) {
                    bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
                }
            }
            if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
                final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                if (keepIntervalOption != null) {
                    bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
                }
            }
            if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
                final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                if (keepCountOption != null) {
                    bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
                }
            }
        }

        final ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings);
        if (tcpSendBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
        }

        final ByteSizeValue tcpReceiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings);
        if (tcpReceiveBufferSize.getBytes() > 0) {
            bootstrap.option(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
        }

        bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

        final boolean reuseAddress = TransportSettings.TCP_REUSE_ADDRESS.get(settings);
        bootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);

        return bootstrap;
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#createServerBootstrap
    private void createServerBootstrap(ProfileSettings profileSettings, SharedGroupFactory.SharedGroup sharedGroup) {
        String name = profileSettings.profileName;
        if (logger.isDebugEnabled()) {
            logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]",
                name, sharedGroupFactory.getTransportWorkerCount(), profileSettings.portOrRange, profileSettings.bindHosts,
                profileSettings.publishHosts, receivePredictorMin, receivePredictorMax);
        }
        // serverBootstrap 的編程範式
        final ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(sharedGroup.getLowLevelGroup());

        // NettyAllocator will return the channel type designed to work with the configuredAllocator
        serverBootstrap.channel(NettyAllocator.getServerChannelType());

        // Set the allocators for both the server channel and the child channels created
        serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
        serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
        // 設置handler, 未來數據處理入口從此入
        serverBootstrap.childHandler(getServerChannelInitializer(name));
        serverBootstrap.handler(new ServerChannelExceptionHandler());

        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
        if (profileSettings.tcpKeepAlive) {
            // Note that Netty logs a warning if it can't set the option
            if (profileSettings.tcpKeepIdle >= 0) {
                final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                if (keepIdleOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
                }
            }
            if (profileSettings.tcpKeepInterval >= 0) {
                final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                if (keepIntervalOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
                }

            }
            if (profileSettings.tcpKeepCount >= 0) {
                final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                if (keepCountOption != null) {
                    serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
                }
            }
        }

        if (profileSettings.sendBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes()));
        }

        if (profileSettings.receiveBufferSize.getBytes() != -1) {
            serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(profileSettings.receiveBufferSize.bytesAsInt()));
        }

        serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

        serverBootstrap.option(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, profileSettings.reuseAddress);
        serverBootstrap.validate();

        serverBootstraps.put(name, serverBootstrap);
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#getServerChannelInitializer
    protected ChannelHandler getServerChannelInitializer(String name) {
        return new ServerChannelInitializer(name);
    }
    
    protected class ServerChannelInitializer extends ChannelInitializer<Channel> {

        protected final String name;
        private final NettyByteBufSizer sizer = new NettyByteBufSizer();

        protected ServerChannelInitializer(String name) {
            this.name = name;
        }

        @Override
        protected void initChannel(Channel ch) throws Exception {
            addClosedExceptionLogger(ch);
            assert ch instanceof Netty4NioSocketChannel;
            NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
            Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
            ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
            ch.pipeline().addLast("byte_buf_sizer", sizer);
            // 通過 logging 記錄請求日誌
            ch.pipeline().addLast("logging", new ESLoggingHandler());
            // 通過 dispatcher 分發處理請求
            ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
            serverAcceptedChannel(nettyTcpChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ExceptionsHelper.maybeDieOnAnotherThread(cause);
            super.exceptionCaught(ctx, cause);
        }
    }

    bindServer 是將我們初始化好的 serverBootstrap, 綁定到某個端口上,以便其可以真正監聽請求的到來。
    // org.elasticsearch.transport.TcpTransport#bindServer
    protected void bindServer(ProfileSettings profileSettings) {
        // Bind and start to accept incoming connections.
        InetAddress[] hostAddresses;
        List<String> profileBindHosts = profileSettings.bindHosts;
        try {
            hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
        } catch (IOException e) {
            throw new BindTransportException("Failed to resolve host " + profileBindHosts, e);
        }
        if (logger.isDebugEnabled()) {
            String[] addresses = new String[hostAddresses.length];
            for (int i = 0; i < hostAddresses.length; i++) {
                addresses[i] = NetworkAddress.format(hostAddresses[i]);
            }
            logger.debug("binding server bootstrap to: {}", (Object) addresses);
        }

        assert hostAddresses.length > 0;

        List<InetSocketAddress> boundAddresses = new ArrayList<>();
        for (InetAddress hostAddress : hostAddresses) {
            // 調用 bindToPort() 綁定端口到 serverBootstrap 上
            boundAddresses.add(bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange));
        }
        // 保存已發佈的端口信息
        final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(profileSettings, boundAddresses);

        if (profileSettings.isDefaultProfile) {
            this.boundAddress = boundTransportAddress;
        } else {
            profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress);
        }
    }

    private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) {
        PortsRange portsRange = new PortsRange(port);
        final AtomicReference<Exception> lastException = new AtomicReference<>();
        final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
        closeLock.writeLock().lock();
        try {
            // No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
            if (lifecycle.initialized() == false && lifecycle.started() == false) {
                throw new IllegalStateException("transport has been stopped");
            }
            // 此處將會迭代可用端口,比如從 9300-9400 依次查找可用端口,提供服務
            boolean success = portsRange.iterate(portNumber -> {
                try {
                    // 綁定到 serverBootstrap 中
                    TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
                    serverChannels.computeIfAbsent(name, k -> new ArrayList<>()).add(channel);
                    boundSocket.set(channel.getLocalAddress());
                } catch (Exception e) {
                    lastException.set(e);
                    return false;
                }
                return true;
            });
            if (!success) {
                throw new BindTransportException(
                    "Failed to bind to " + NetworkAddress.format(hostAddress, portsRange),
                    lastException.get()
                );
            }
        } finally {
            closeLock.writeLock().unlock();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
        }

        return boundSocket.get();
    }
    // org.elasticsearch.transport.netty4.Netty4Transport#bind
    @Override
    protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
        // 調用 netty 的端口綁定方法,到此對外服務功能開啟
        Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
        Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel);
        channel.attr(SERVER_CHANNEL_KEY).set(esChannel);
        return esChannel;
    }

  以上,就是es的transport的初始化過程了。至於後續連接或更新集群信息到其他節點,則是另一堆問題了。至少我們明白了,es是通過netty來開啟服務端口,然後通過 Netty4MessageChannelHandler 來分髮網絡請求。

 

4. http請求處理器的初始化

  上面的分析中,我們看到了es對於transportService的實例化和初始化過程,大致明白了其處理網絡請求的方式。但是,當我們細查時,發現以上提供的服務為9300端口的服務,而非我們常看到的 //localhost:9200 那種。最終,再經過一番查看後,發現原來,在啟動時還會有另外一個服務會被啟動,那就是 HttpServerTransport , 這才是為我們提供http查詢請求的服務。

  其工作流程與以上過程基本一致,只是其初始化不同的 netty handler 而已。

    // org.elasticsearch.node.Node#start
    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);
        ...
        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        // 獲取 transportService 實例
        TransportService transportService = injector.getInstance(TransportService.class);
        // 設置 taskManager 的兩個任務管理器
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
        // transportService 生命周期開始
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        injector.getInstance(PeerRecoverySourceService.class).start();
        ...
        
        // 初始化 http 查詢服務, 其對應的具體實現類是 SecurityNetty4ServerTransport 
        injector.getInstance(HttpServerTransport.class).start();
        ...
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport#doStart
    @Override
    protected void doStart() {
        super.doStart();
        ipFilter.setBoundHttpTransportAddress(this.boundAddress());
    }
    // org.elasticsearch.http.netty4.Netty4HttpServerTransport#doStart
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            sharedGroup = sharedGroupFactory.getHttpGroup();
            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(sharedGroup.getLowLevelGroup());

            // NettyAllocator will return the channel type designed to work with the configuredAllocator
            serverBootstrap.channel(NettyAllocator.getServerChannelType());

            // Set the allocators for both the server channel and the child channels created
            serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
            // 具體handler 的差異在此體現
            serverBootstrap.childHandler(configureServerChannelHandler());
            serverBootstrap.handler(new ServerChannelExceptionHandler(this));

            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

            if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
                // Netty logs a warning if it can't set the option, so try this only on supported platforms
                if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
                    if (SETTING_HTTP_TCP_KEEP_IDLE.get(settings) >= 0) {
                        final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
                        if (keepIdleOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), SETTING_HTTP_TCP_KEEP_IDLE.get(settings));
                        }
                    }
                    if (SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings) >= 0) {
                        final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
                        if (keepIntervalOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption),
                                SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings));
                        }
                    }
                    if (SETTING_HTTP_TCP_KEEP_COUNT.get(settings) >= 0) {
                        final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
                        if (keepCountOption != null) {
                            serverBootstrap.childOption(NioChannelOption.of(keepCountOption), SETTING_HTTP_TCP_KEEP_COUNT.get(settings));
                        }
                    }
                }
            }

            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }

            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            bindServer();
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }
    // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport#configureServerChannelHandler
    @Override
    public ChannelHandler configureServerChannelHandler() {
        return new HttpSslChannelHandler();
    }
        // org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport.HttpSslChannelHandler#HttpSslChannelHandler
        HttpSslChannelHandler() {
            super(SecurityNetty4HttpServerTransport.this, handlingSettings);
        }
        // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#HttpChannelHandler
        protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
            this.transport = transport;
            this.handlingSettings = handlingSettings;
            this.byteBufSizer =  new NettyByteBufSizer();
            this.requestCreator =  new Netty4HttpRequestCreator();
            this.requestHandler = new Netty4HttpRequestHandler(transport);
            this.responseCreator = new Netty4HttpResponseCreator();
        }
        // org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel
        @Override
        protected void initChannel(Channel ch) throws Exception {
            Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
            // 此處 handler 配置的相當多, 自然是因其功能複雜的原因
            ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
            ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
            ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
            final HttpRequestDecoder decoder = new HttpRequestDecoder(
                handlingSettings.getMaxInitialLineLength(),
                handlingSettings.getMaxHeaderSize(),
                handlingSettings.getMaxChunkSize());
            decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            ch.pipeline().addLast("decoder", decoder);
            ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
            ch.pipeline().addLast("encoder", new HttpResponseEncoder());
            final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
            aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
            ch.pipeline().addLast("aggregator", aggregator);
            if (handlingSettings.isCompression()) {
                ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
            }
            ch.pipeline().addLast("request_creator", requestCreator);
            ch.pipeline().addLast("response_creator", responseCreator);
            // 最後兩個處理器, pipelineing, handler, 則處理真正的業務
            ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
            ch.pipeline().addLast("handler", requestHandler);
            transport.serverAcceptedChannel(nettyHttpChannel);
        }

  整體流程就是這樣,核心就是 netty 的編程範式。最關鍵的就是引入最後幾個處理器,這也是netty框架使用者的關注點所在。

  其中,本節所講的http server, 對應的服務端口默認是9200, 而上一節所講對應的默認端口則是9300. 其實差別主要在於應用場景不同或者說使用的協議不同,一個是基於http協議的,一個是基於tcp協議的。http屬於高層協議,其應用相對容易些,而tcp則使用起來有一些門檻,但其性能更好,用於集群間的通信則再好不過。

  以上差異,並不影響我們理解 transportService 的整體邏輯。

 

  至於ES如何接收到網絡請求後,如何處理的業務,其框架如何,其又有何具體能力?且聽下回分解。