[從源碼學設計]螞蟻金服SOFARegistry之服務上線

[從源碼學設計]螞蟻金服SOFARegistry之服務上線

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。

本文為第十三篇,介紹從SessionServer角度看的服務上線。

本文以介紹業務為主,順便整理邏輯,設計和模式。因為註冊過程牽扯模塊太多,所以本文僅僅專註在註冊過程中Session Server的部分。

0x01 業務領域

1.1 應用場景

服務的上下線過程是指服務通過代碼調用執行常規註冊(Publisher#register) 和下線(Publisher#unregister)操作,不考慮因為服務宕機等意外情況導致的下線場景。

1.1.1 服務發佈

一個典型的 「RPC 調用的服務尋址」 應用場景,服務的提供方通過如下兩個步驟完成服務發佈:

  1. 註冊,將自己以 Publisher 的角色註冊到 SOFARegistry;
  2. 發佈,將需要發佈的數據 (通常是IP 地址、端口、調用方式等) 發佈到 SOFARegistry;

與此相對應的,服務的調用方通過如下步驟實現服務調用:

  1. 註冊,將自己以 Subscriber 的角色註冊到 SOFARegistry;
  2. 訂閱,收到 SOFARegistry 推送的服務數據;

1.1.2 SessionServer的必要性

在SOFARegistry中,所有 Client 在註冊和訂閱數據時,根據 dataInfoId 做一致性 Hash,計算出應該訪問哪一台 DataServer,然後與該 DataServer 建立長連接。

由於每個 Client 通常都會註冊和訂閱比較多的 dataInfoId 數據,因此我們可以預見每個 Client 均會與好幾台 DataServer 建立連接。這個架構存在的問題是:「每台 DataServer 承載的連接數會隨 Client 數量的增長而增長,每台 Client 極端的情況下需要與每台 DataServer 都建連,因此通過 DataServer 的擴容並不能線性的分攤 Client 連接數」。

所以,為數據分片層(DataServer)專門設計一個連接代理層是非常重要的,所以 SOFARegistry 就有了 SessionServer 這一層。隨着 Client 數量的增長,可以通過擴容 SessionServer 就解決了單機的連接數瓶頸問題。

1.2 問題點

因為SessionServer是一個中間層,所以看起來好像比較簡單,表面上看,就是接受,轉發。

但是實際上,在大型系統中,應該如何在邏輯上,物理上實現模塊分割,解耦都是非常有必要的。

1.3 阿里方案

我們主要看看阿里方案的註冊部分。

1.3.1 註冊過程

一次服務的上線(註冊)過程

服務的上下線過程,是指服務通過代碼調用做正常的註冊(publisher.register) 和 下線(publisher.unregister),不考慮因為服務宕機等意外情況導致的下線。如上圖,大概呈現了「一次服務註冊過程」的服務數據在內部流轉過程。

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入內存 (SessionServer 會存儲 Client 的數據到內存,用於後續可以跟 DataServer 做定期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入內存 ,DataServer 會以 dataInfoId 的維度匯總所有 PublisherRegister。同時,DataServer 將該 dataInfoId 的變更事件通知給所有 SessionServer,變更事件的內容是 dataInfoId 和版本號信息 version。
  4. 同時,異步地,DataServer 以 dataInfoId 維度增量地同步數據給其他副本。因為 DataServer 在一致性 Hash 分片的基礎上,對每個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變更事件通知後,對比 SessionServer 內存中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了所有該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊之後的最新的服務列表數據。

1.3.2 圖示

下圖展示了 Publisher 註冊的代碼流轉過程

這個過程也是採用了 Handler – Task & Strategy – Listener 的方式來處理,任務在代碼內部的處理流程和訂閱過程基本一致。

圖 - 代碼流轉:Publisher 註冊

0x02 Client SDK

PublisherRegistration 是Client的接口,發佈數據的關鍵代碼如下:

// 構造發佈者註冊表
PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT");
registration.setGroup("TEST_GROUP");
registration.setAppName("TEST_APP");

// 將註冊表註冊進客戶端並發佈數據
Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy");

// 如需覆蓋上次發佈的數據可以使用發佈者模型重新發佈數據
publisher.republish("10.10.1.1:12200?xx=zz");

發佈數據的關鍵是構造 PublisherRegistration,該類包含三個屬性:

屬性名 屬性類型 描述
dataId String 數據ID,發佈訂閱時需要使用相同值,數據唯一標識由 dataId + group + instanceId 組成。
group String 數據分組,發佈訂閱時需要使用相同值,數據唯一標識由 dataId + group + instanceId 組成,默認值 DEFAULT_GROUP。
appName String 應用 appName。

0x03 Session server

流程來到了Session server。

3.1 Bean

首先,可以通過Beans來入手。

@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(publisherHandler());
    list.add(subscriberHandler());
    list.add(watcherHandler());
    list.add(clientNodeConnectionHandler());
    list.add(cancelAddressRequestHandler());
    list.add(syncConfigHandler());
    return list;
}

serverHandlers 是Bolt Server 的響應函數組合。

@Bean
@ConditionalOnMissingBean(name = "sessionRegistry")
public Registry sessionRegistry() {
    return new SessionRegistry();
}

從Bean角度看,目前的邏輯是如圖所示,這裡有了一次解耦Strategy:

Beans


+-----------------------------------+
| Bolt Server(in openSessionServer) |        +---------------------------------+
|                                   |    +-> | DefaultPublisherHandlerStrategy |
|    +----------------------+       |    |   +---------+-----------------------+
|    |    serverHandlers    |       |    |             |
|    |                      |       |    |             |
|    | +------------------+ |       |    |             |
|    | | PublisherHandle+----------------+             v
|    | |                  | |       |          +-------+-------+
|    | | watcherHandler   | |       |          |SessionRegistry|
|    | |                  | |       |          +---------------+
|    | |     ......       | |       |
|    | +------------------+ |       |
|    +----------------------+       |
+-----------------------------------+

服務發佈者和Session Server一般都應該處於一個Data Center之中,這就是阿里等實踐的單體概念.

3.2 入口

PublisherHandler 是 Session Server對Client的接口,是Bolt Server 的響應函數。

public class PublisherHandler extends AbstractServerHandler {
    @Autowired
    private ExecutorManager          executorManager;

    @Autowired
    private PublisherHandlerStrategy publisherHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) throws RemotingException {

        RegisterResponse result = new RegisterResponse();
        PublisherRegister publisherRegister = (PublisherRegister) message;
        publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
        return result;
    }

邏輯如下圖所示:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |
                              |    |                      |       |
+--------+  PublisherRegister |    | +------------------+ |       |
| Client +---------------------------> PublisherHandler | |       |
+--------+          1         |    | |                  | |       |
               +              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
               |              +-----------------------------------+
               |

3.3 策略

整體上,這裡是採用 Handler – Task & Strategy – Listener 的方式來處理。

什麼是策略模式(Strategy Pattern)

在軟件開發過程中常常遇到這樣的情況,實現某一個功能有很多種算法或實現策略,我們可以根據環境或者條件的不同選擇不同的算法或者策略來完成該功能。如果將這些算法或者策略抽象出來,提供一個統一的接口,不同的算法或者策略有不同的實現類,這樣在程序客戶端就可以通過注入不同的實現對象來實現算法或者策略的動態替換,這種模式的可擴展性和可維護性也更高,這就是策略模式。

策略模式的定義(Strategy Pattern)

  • 策略模式: 定義了算法族,分別封裝起來,讓它們之間可以相互替換,此模式讓算法的變化獨立與使用算法的客戶。

  • 簡單理解: 定義了一系列算法。每個算法封裝起來。各個算法之間可以互相替換。且算法的變化不會影響到使用算法的客戶。屬於行為型模式。

在策略模式(Strategy Pattern)中,一個類的行為或其算法可以在運行時更改。這種類型的設計模式屬於行為型模式

在策略模式中,我們創建表示各種策略的對象和一個行為隨着策略對象改變而改變的 context 對象。策略對象改變 context 對象的執行算法。

3.3.1 目錄結構

從目錄結構看,有很多Strategy的定義和實現,應該螞蟻內部希望根據不同情況制定不同的策略,其中有些是目前留出的接口

com/alipay/sofa/registry/server/session/strategy

.
├── DataChangeRequestHandlerStrategy.java
├── PublisherHandlerStrategy.java
├── ReceivedConfigDataPushTaskStrategy.java
├── ReceivedDataMultiPushTaskStrategy.java
├── SessionRegistryStrategy.java
├── SubscriberHandlerStrategy.java
├── SubscriberMultiFetchTaskStrategy.java
├── SubscriberRegisterFetchTaskStrategy.java
├── SyncConfigHandlerStrategy.java
├── TaskMergeProcessorStrategy.java
├── WatcherHandlerStrategy.java
└── impl
    ├── DefaultDataChangeRequestHandlerStrategy.java
    ├── DefaultPublisherHandlerStrategy.java
    ├── DefaultPushTaskMergeProcessor.java
    ├── DefaultReceivedConfigDataPushTaskStrategy.java
    ├── DefaultReceivedDataMultiPushTaskStrategy.java
    ├── DefaultSessionRegistryStrategy.java
    ├── DefaultSubscriberHandlerStrategy.java
    ├── DefaultSubscriberMultiFetchTaskStrategy.java
    ├── DefaultSubscriberRegisterFetchTaskStrategy.java
    ├── DefaultSyncConfigHandlerStrategy.java
    └── DefaultWatcherHandlerStrategy.java 

3.3.2 DefaultPublisherHandlerStrategy

從目前代碼看,只是設置,分類,轉發。即設置Publisher的缺省信息,並且根據 event type 不同執行register或者unRegister。

public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy {
    @Autowired
    private Registry            sessionRegistry;

    @Override
    public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) {
        try {
            String ip = channel.getRemoteAddress().getAddress().getHostAddress();
            int port = channel.getRemoteAddress().getPort();
            publisherRegister.setIp(ip);
            publisherRegister.setPort(port);

            if (StringUtils.isBlank(publisherRegister.getZone())) {
                publisherRegister.setZone(ValueConstants.DEFAULT_ZONE);
            }

            if (StringUtils.isBlank(publisherRegister.getInstanceId())) {
                publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID);
            }

            Publisher publisher = PublisherConverter.convert(publisherRegister);
            publisher.setProcessId(ip + ":" + port);
            publisher.setSourceAddress(new URL(channel.getRemoteAddress()));
            if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.register(publisher);
            } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) {
                sessionRegistry.unRegister(publisher);
            }
            registerResponse.setSuccess(true);
            registerResponse.setVersion(publisher.getVersion());
            registerResponse.setRegistId(publisherRegister.getRegistId());
            registerResponse.setMessage("Publisher register success!");
        } 
    }
}

邏輯如下圖所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |                          |                               |
               +              |    | |  watcherHandler  | |       |                          +-------------------------------+
               |              |    | |                  | |       |
               |              |    | |     ......       | |       |
               |              |    | +------------------+ |       |
               |              |    +----------------------+       |
                              +-----------------------------------+

手機如圖

3.4 核心邏輯組件

前面代碼中,策略會調用到 sessionRegistry.register(publisher),即註冊功能。

從SessionRegistry的內部成員變量就能夠看出來,這是 Session Server 核心邏輯所在。

主要提供了如下功能:

  • register(StoreData data) :註冊新publisher或者subscriber data

  • cancel(List connectIds) :取消publisher或者subscriber data

  • remove(List connectIds) :移除publisher或者subscriber data

  • unRegister(StoreData data) :註銷publisher或者subscriber data

  • …..

具體成員變量如下:

public class SessionRegistry implements Registry {

    /**
     * store subscribers
     */
    @Autowired
    private Interests                 sessionInterests;

    /**
     * store watchers
     */
    @Autowired
    private Watchers                  sessionWatchers;

    /**
     * store publishers
     */
    @Autowired
    private DataStore                 sessionDataStore;

    /**
     * transfer data to DataNode
     */
    @Autowired
    private DataNodeService           dataNodeService;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    @Autowired
    private TaskListenerManager       taskListenerManager;

    /**
     * calculate data node url
     */
    @Autowired
    private NodeManager               dataNodeManager;

    @Autowired
    private SessionServerConfig       sessionServerConfig;

    @Autowired
    private Exchange                  boltExchange;

    @Autowired
    private SessionRegistryStrategy   sessionRegistryStrategy;

    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;

    @Autowired
    private DataIdMatchStrategy       dataIdMatchStrategy;

    @Autowired
    private RenewService              renewService;

    @Autowired
    private WriteDataAcceptor         writeDataAcceptor;

    private volatile boolean          enableDataRenewSnapshot = true;
}

register函數生成一個WriteDataRequest,然後調用了 writeDataAcceptor.accept 完成處理。

@Override
public void register(StoreData storeData) {

    WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation(
            new Wrapper<StoreData, Boolean>() {
                @Override
                public Boolean call() {

                    switch (storeData.getDataType()) {
                        case PUBLISHER:
                            Publisher publisher = (Publisher) storeData;

                            sessionDataStore.add(publisher);

                            // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot)
                            // are handed over to WriteDataAcceptor
                            writeDataAcceptor.accept(new WriteDataRequest() {
                                @Override
                                public Object getRequestBody() {
                                    return publisher;
                                }

                                @Override
                                public WriteDataRequestType getRequestType() {
                                    return WriteDataRequestType.PUBLISHER;
                                }

                                @Override
                                public String getConnectId() {
                                    return publisher.getSourceAddress().getAddressString();
                                }

                                @Override
                                public String getDataServerIP() {
                                    Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
                                    return dataNode.getNodeUrl().getIpAddress();
                                }
                            });

                            sessionRegistryStrategy.afterPublisherRegister(publisher);
                            break;
                        case SUBSCRIBER:
                            Subscriber subscriber = (Subscriber) storeData;

                            sessionInterests.add(subscriber);

                            sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                            break;
                        case WATCHER:
                            Watcher watcher = (Watcher) storeData;

                            sessionWatchers.add(watcher);

                            sessionRegistryStrategy.afterWatcherRegister(watcher);
                            break;
                        default:
                            break;
                    }
                    return null;
                }

                @Override
                public Supplier<StoreData> getParameterSupplier() {
                    return () -> storeData;
                }

            }, wrapperInterceptorManager);

    try {
        wrapperInvocation.proceed();
    } catch (Exception e) {
        throw new RuntimeException("Proceed register error!", e);
    }

}

目前邏輯如下圖所示:

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                                    3  | register
               |              |    +----------------------+       |                                       |
                              +-----------------------------------+                                       |
                                                                                                          v
                                                                                      +-------------------+-------------------+
                                                                                      |           SessionRegistry             |
                                                                                      |                                       |
                                                                                      |                                       |
                                                                                      |  storeData.getDataType() == PUBLISHER |
                                                                                      +---------------------------------------+

手機如下:

3.4.1 SessionRegistryStrategy

這裡又出現一個策略,目前也只有一個實現,應該也是想要未來做成替換,目前功能只是簡單的留下了接口為空。

我們可以看出阿里處處想解耦的思路

public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy {
    @Override
    public void afterPublisherRegister(Publisher publisher) {

    }
}

3.4.2 存儲模塊

前文在註冊過程中有:

sessionDataStore.add(publisher);

這裡就是Session的 數據存儲模塊,也是系統的核心

public class SessionDataStore implements DataStore {
    /**
     * publisher store
     */
    private Map<String/*dataInfoId*/, Map<String/*registerId*/, Publisher>> registry      = new ConcurrentHashMap<>();

    /*** index */
    private Map<String/*connectId*/, Map<String/*registerId*/, Publisher>>  connectIndex  = new ConcurrentHashMap<>();
}

這裡記錄了兩種存儲方式,分別是按照 dataInfoId 和 connectId 來存儲。

存儲時候,會從版本號和時間戳兩個維度來比較

@Override
public void add(Publisher publisher) {
    Publisher.internPublisher(publisher);

    write.lock();
    try {
        Map<String, Publisher> publishers = registry.get(publisher.getDataInfoId());

        if (publishers == null) {
            ConcurrentHashMap<String, Publisher> newmap = new ConcurrentHashMap<>();
            publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap);
            if (publishers == null) {
                publishers = newmap;
            }
        }

        Publisher existingPublisher = publishers.get(publisher.getRegisterId());

        if (existingPublisher != null) {

            if (existingPublisher.getVersion() != null) {
                long oldVersion = existingPublisher.getVersion();
                Long newVersion = publisher.getVersion();
                if (newVersion == null) {
                    return;
                } else if (oldVersion > newVersion) {
                    return;
                } else if (oldVersion == newVersion) {
                    Long newTime = publisher.getRegisterTimestamp();
                    long oldTime = existingPublisher.getRegisterTimestamp();
                    if (newTime == null) {
                        return;
                    }
                    if (oldTime > newTime) {
                        return;
                    }
                }
            }
        }
        publishers.put(publisher.getRegisterId(), publisher);
        addToConnectIndex(publisher);

    } finally {
        write.unlock();
    }
}

3.5 Acceptor模塊

在SessionServer本身存儲完成之後,接下來就是通知Data Server了。

3.5.1 總體Acceptor

WriteDataAcceptorImpl 負責處理具體Publisher的寫入。首先需要把寫入請求統一起來

使用 private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap(); 來統一存儲所有的寫入請求

這裡根據不同的Connection來處理不同連接的寫入請求

具體如下:

public class WriteDataAcceptorImpl implements WriteDataAcceptor {

    @Autowired
    private TaskListenerManager             taskListenerManager;

    @Autowired
    private SessionServerConfig             sessionServerConfig;

    @Autowired
    private RenewService                    renewService;

    /**
     * acceptor for all write data request
     * key:connectId
     * value:writeRequest processor
     *
     */
    private Map<String, WriteDataProcessor> writeDataProcessors = new ConcurrentHashMap();

    public void accept(WriteDataRequest request) {
        String connectId = request.getConnectId();
        WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
                key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

        writeDataProcessor.process(request);
    }
  
    public void remove(String connectId) {
        writeDataProcessors.remove(connectId);
    }
}

目前邏輯如下圖所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +-----------------------------------------------------+                    +-------------+-------------------------+
               | |           WriteDataAcceptorImpl                     |  WriteDataRequest  |           SessionRegistry             |
               | |                                                     | <------------------+                                       |
               | |                                                     |                    |                                       |
               | | Map<String, WriteDataProcessor> writeDataProcessors |                    |  storeData.getDataType() == PUBLISHER |
               | |                                                     |                    +---------------------------------------+
               + +-----------------------------------------------------+

手機如圖

3.5.2 具體處理

前面已經把所有請求統一起來,現在就需要針對每一個連接的寫入繼續處理

這裡關鍵是如下數據結構,就是每一個連接的寫入請求 放到了queue中。

ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue

針對每個請求不同,做不同處理。

對於我們的例子,處理如下:

case PUBLISHER: {
		doPublishAsync(request);
}

而最終是向taskListenerManager發送給請求TaskType.PUBLISH_DATA_TASK,該請求將被PublishDataTaskListener調用publishDataTask來處理。

這裡有一個listener解耦,我們接下來講解。

private void doPublishAsync(WriteDataRequest request) {
    sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
}

private void sendEvent(Object eventObj, TaskType taskType) {
		TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
		taskListenerManager.sendTaskEvent(taskEvent);
}

具體代碼如下:

public class WriteDataProcessor {
    private final TaskListenerManager               taskListenerManager;

    private final SessionServerConfig               sessionServerConfig;

    private final RenewService                      renewService;

    private final String                            connectId;

    private Map<String, AtomicLong>                 lastUpdateTimestampMap = new ConcurrentHashMap<>();

    private AtomicBoolean                           writeDataLock          = new AtomicBoolean(
                                                                               false);

    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue          = new ConcurrentLinkedQueue();

    private AtomicInteger                           acceptorQueueSize      = new AtomicInteger(0);

    public void process(WriteDataRequest request) {
        // record the last update time by pub/unpub
        if (isWriteRequest(request)) {
            refreshUpdateTime(request.getDataServerIP());
        }

        if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
            // snapshot has high priority, so handle directly
            doHandle(request);
        } else {
            // If locked, insert the queue;
            // otherwise, try emptying the queue (to avoid residue) before processing the request.
            if (writeDataLock.get()) {
                addQueue(request);
            } else {
                flushQueue();
                doHandle(request);
            }
        }

    }

    private void doHandle(WriteDataRequest request) {
        switch (request.getRequestType()) {
            case PUBLISHER: {
                doPublishAsync(request);
            }
                break;
            case UN_PUBLISHER: {
                doUnPublishAsync(request);
            }
                break;
            case CLIENT_OFF: {
                doClientOffAsync(request);
            }
                break;
            case RENEW_DATUM: {
                if (renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                doRenewAsync(request);
            }
                break;
            case DATUM_SNAPSHOT: {
                if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                halt();
                try {
                    doSnapshotAsync(request);
                } finally {
                    resume();
                }
            }
                break;
    }
      
    private void doPublishAsync(WriteDataRequest request) {
        sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK);
    }
      
    private void sendEvent(Object eventObj, TaskType taskType) {
        TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
        taskListenerManager.sendTaskEvent(taskEvent);
    }
}

如下圖所示

 Publisher     +   Session Server Scope
 Scope         |
               |              +-----------------------------------+
               |              | Bolt Server(in openSessionServer) |
               |              |                                   |
               |              |    +----------------------+       |
               +              |    |    serverHandlers    |       |                          +-------------------------------+
                              |    |                      |       |                          |DefaultPublisherHandlerStrategy|
+--------+  PublisherRegister |    | +------------------+ |       |  handlePublisherRegister |                               |
| Client +---------------------------> PublisherHandler+------------------------------------>+    EventType == REGISTER      |
+--------+          1         |    | |                  | |       |          2               |                               |
               +              |    | |  watcherHandler  | |       |                          +------------+------------------+
               |              |    | |                  | |       |                                       |
               |              |    | |     ......       | |       |                                       |
               |              |    | +------------------+ |       |                              register | 3
               |              |    +----------------------+       |                                       |
               |              +-----------------------------------+                                       |
               |                                                                                          v
               | +---------------------------------------------------------+                    +---------+-----------------------------+
               | |           WriteDataAcceptorImpl                         |  WriteDataRequest  |           SessionRegistry             |
               | |                                                         | <------------------+                                       |
               | |                                                         |       4            |   sessionDataStore.add(publisher)     |
               | | Map<connectId , WriteDataProcessor> writeDataProcessors |                    |                                       |
               | |                                                         |                    |  storeData.getDataType() == PUBLISHER |
               | +----------------------+----------------------------------+                    |                                       |
               |                process | 5                                                     +---------------------------------------+
               |                        v
               |    +-------------------+---------------------+                     +--------------------------+
               |    |          WriteDataProcessor             |                     |  PublishDataTaskListener |
               |    |                                         |  PUBLISH_DATA_TASK  |                          |
               |    | ConcurrentLinkedQueue<WriteDataRequest> +-------------------> |      PublishDataTask     |
               |    |                                         |      6              +--------------------------+
               +    +-----------------------------------------+

手機如圖 :

3.6 Listener 解耦

前面在邏輯上都是一體化的,在這裡,進行了一次解耦。

3.6.1 解耦引擎

DefaultTaskListenerManager 是解耦的機制,可以看到,其中添加了listener,當用戶調用sendTaskEvent時候,將遍歷所有的listeners,調用對應的listener。

public class DefaultTaskListenerManager implements TaskListenerManager {

    private Multimap<TaskType, TaskListener> taskListeners = ArrayListMultimap.create();

    @Override
    public Multimap<TaskType, TaskListener> getTaskListeners() {
        return taskListeners;
    }

    @Override
    public void addTaskListener(TaskListener taskListener) {
        taskListeners.put(taskListener.support(), taskListener);
    }

    @Override
    public void sendTaskEvent(TaskEvent taskEvent) {
        Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
        for (TaskListener taskListener : taskListeners) {
            taskListener.handleEvent(taskEvent);
        }
    }
}

3.6.2 Listener

PublishDataTaskListener是對應的處理函數,在其support函數中,聲明了支持PUBLISH_DATA_TASK。這樣就完成了解耦。

public class PublishDataTaskListener implements TaskListener {

    @Autowired
    private DataNodeService dataNodeService;

    @Autowired
    private TaskProcessor   dataNodeSingleTaskProcessor;

    @Autowired
    private ExecutorManager executorManager;

    @Override
    public TaskType support() {
        return TaskType.PUBLISH_DATA_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {

        SessionTask publishDataTask = new PublishDataTask(dataNodeService);

        publishDataTask.setTaskEvent(event);

        executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
    }
}

3.7 Task調度

上面找到了Listener,Listener中通過如下代碼啟動了執行業務的task來處理。但是這背後的機制需要探究。

executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));

3.7.1 ExecutorManager

ExecutorManager 之中,對於線程池做了統一的啟動,關閉。publishDataExecutor就是其中之一。

ExecutorManager相關代碼摘取如下:

public class ExecutorManager {

    private final ScheduledThreadPoolExecutor scheduler;

    private final ThreadPoolExecutor          publishDataExecutor;

    private static final String               PUBLISH_DATA_EXECUTOR                      = "PublishDataExecutor";

    public ExecutorManager(SessionServerConfig sessionServerConfig) {
      
        publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR,
                k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR,
                        sessionServerConfig.getPublishDataExecutorMinPoolSize(),
                        sessionServerConfig.getPublishDataExecutorMaxPoolSize(),
                        sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()),
                        new NamedThreadFactory("PublishData-executor", true)));
    }
  
		public ThreadPoolExecutor getPublishDataExecutor() {
        return publishDataExecutor;
    }
}

其中ExecutorManager的bean如下:

@Bean
public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) {
    return new ExecutorManager(sessionServerConfig);
}

3.7.2 Processor

Processor是任務定義,內部封裝了task。

public class DataNodeSingleTaskProcessor implements TaskProcessor<SessionTask> {

    @Override
    public ProcessingResult process(SessionTask task) {
        try {
            task.execute();
            return ProcessingResult.Success;
        } catch (Throwable throwable) {
            if (task instanceof Retryable) {
                Retryable retryAbleTask = (Retryable) task;
                if (retryAbleTask.checkRetryTimes()) {
                    return ProcessingResult.TransientError;
                }
            }
            return ProcessingResult.PermanentError;
        }
    }

    @Override
    public ProcessingResult process(List<SessionTask> tasks) {
        return null;
    }
}

3.7.3 業務Task

PublishDataTask的execute 之中 ,調用dataNodeService.register(publisher)進行註冊。

public class PublishDataTask extends AbstractSessionTask {

    private final DataNodeService dataNodeService;

    private Publisher             publisher;

    public PublishDataTask(DataNodeService dataNodeService) {
        this.dataNodeService = dataNodeService;
    }

    @Override
    public void execute() {
        dataNodeService.register(publisher);
    }

    @Override
    public void setTaskEvent(TaskEvent taskEvent) {
        //taskId create from event
        if (taskEvent.getTaskId() != null) {
            setTaskId(taskEvent.getTaskId());
        }

        Object obj = taskEvent.getEventObj();
        if (obj instanceof Publisher) {
            this.publisher = (Publisher) obj;
        } 
    }
}

具體如下

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
                       |
     PUBLISH_DATA_TASK |
                       |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       |
                       v
              +--------+--------+
              | PublishDataTask |
              +-----------------+

3.8 轉發服務信息

經過listener解耦之後,PublishDataTask就調用了dataNodeService.register(publisher),於是接下來就是轉發服務信息給Data Server

此處就是調用DataNodeServiceImpl的register函數來把請求轉發給Data Server。

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;

    @Autowired
    private SessionServerConfig   sessionServerConfig;

    private AsyncHashedWheelTimer asyncHashedWheelTimer;
}

可以看到,建立了PublishDataRequest,然後通過Bolt Client,發送給Data Server。

@Override
public void register(final Publisher publisher) {
    String bizName = "PublishData";
    Request<PublishDataRequest> request = buildPublishDataRequest(publisher);
    try {
        sendRequest(bizName, request);
    } catch (RequestException e) {
        doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(),
            sessionServerConfig.getPublishDataTaskRetryFirstDelay(),
            sessionServerConfig.getPublishDataTaskRetryIncrementDelay());
    }
}

private CommonResponse sendRequest(String bizName, Request request) throws RequestException {
        Response response = dataNodeExchanger.request(request);
        Object result = response.getResult();
        CommonResponse commonResponse = (CommonResponse) result;
        return commonResponse;
}

如下:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |
                       v
              +--------+--------+
              | PublishDataTask |
              +--------+--------+
              register |
                       |
            +----------v----------+
            | DataNodeServiceImpl |
            +----------+----------+
    PublishDataRequest |
                       v
            +----------+----------+  Client.sendSync   +------------+
            |  DataNodeExchanger  +------------------> | Data Server|
            +---------------------+ PublishDataRequest +------------+

如何知道發給哪一個Data Sever?DataNodeExchanger 中有:

@Override
public Response request(Request request) throws RequestException {

    Response response;
    URL url = request.getRequestUrl();
    try {
        Client sessionClient = getClient(url);

        final Object result = sessionClient
                .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut());

        response = () -> result;
    } 

    return response;
}

於是去DataNodeServiceImpl尋找

private Request<PublishDataRequest> buildPublishDataRequest(Publisher publisher) {
    return new Request<PublishDataRequest>() {
        private AtomicInteger retryTimes = new AtomicInteger();

        @Override
        public PublishDataRequest getRequestBody() {
            PublishDataRequest publishDataRequest = new PublishDataRequest();
            publishDataRequest.setPublisher(publisher);
            publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator
                .getSessionProcessId());
            return publishDataRequest;
        }

        @Override
        public URL getRequestUrl() {
            return getUrl(publisher.getDataInfoId());
        }

        @Override
        public AtomicInteger getRetryTimes() {
            return retryTimes;
        }
    };
}

private URL getUrl(String dataInfoId) {
        Node dataNode = dataNodeManager.getNode(dataInfoId);
        //meta push data node has not port
        String dataIp = dataNode.getNodeUrl().getIpAddress();
        return new URL(dataIp, sessionServerConfig.getDataServerPort());
}

在 DataNodeManager中有:

@Override
public DataNode getNode(String dataInfoId) {
    DataNode dataNode = consistentHash.getNodeFor(dataInfoId);
    return dataNode;
}

可見是通過dataInfoId計算出hash,然後 從DataNodeManager之中獲取對應的DataNode,得到其url

於是,上圖拓展為:

+-------------------------------------------------+
|          DefaultTaskListenerManager             |
|                                                 |
|  Multimap<TaskType, TaskListener> taskListeners |
|                                                 |
+----------------------+--------------------------+
                       |
     PUBLISH_DATA_TASK |  1
                       v
          +------------+--------------+
          |  PublishDataTaskListener  |
          +------------+--------------+
                       |
          setTaskEvent |  2
                       v
              +--------+--------+        4     +---------------+
              | PublishDataTask |     +------> |DataNodeManager|
              +--------+--------+     |        +---------------+
              register |  3           |  consistentHash|
                       |              |                | 5
            +----------v----------+---+                v
            | DataNodeServiceImpl |       6      +-----+----+
            +----------+----------+ <------------+ DataNode |
    PublishDataRequest | 7              url      +----------+
                       v
            +----------+----------+
            |  DataNodeExchanger  |
            +----------+----------+
                       |
       Client.sendSync | PublishDataRequest
                       |
                       v 8
                 +-----+------+
                 | Data Server|
                 +------------+

0xFF 參考

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 – SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通信框架SOFABolt解析之連接管理剖析

螞蟻金服開源通信框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通信框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通信框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 – 優雅簡潔的設計之道

SofaBolt源碼分析-服務啟動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 – UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 – Connection 事件處理機制的設計