[從源碼學設計]螞蟻金服SOFARegistry之存儲結構

[從源碼學設計]螞蟻金服SOFARegistry之存儲結構

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。

本文為第六篇,介紹SOFARegistry的存儲結構,本文與業務聯繫密切。

0x01 業務範疇

首先,我們從 Data Server 角度出發,看看本身可能涉及的存儲結構。

哪些需要存儲。

  • 本身服務器狀態;
  • 其他服務器節點狀態,比如其他DataServer,SessionServer,MetaServer;
  • 註冊的服務狀態;

所以我們得到如下問題需要思考:

  • 問題:DataServer如何知道/保存其他 DataServer?
  • 問題:考慮 其他DataServer 需要保存什麼:ip,端口,狀態,如何hash,裏面存儲的數據怎麼對應hash?
  • 問題:幾個DataServer都從MetaServer獲取數據變化,互相發送同步消息,怎麼處理?
  • 問題:與其他 Data Server 怎麼合作,新加入一個DataServer,數據需要切換?
  • 問題:為什麼要把 DataServerNodeFactory 和 DataServerCache 分開。因為 Node 的結構不同導致的?
  • 問題:DataServerCache內部成員分成幾類模塊?需要保存哪些信息?為什麼這樣保存?

可能有些問題脫離了本文研究範疇,但是我們也一起羅列在這裡。

1.1 緩存

時間和空間之間的平衡關係可以說是計算機系統中最為本質的關係之一。

時間和空間這一對矛盾關係在推薦系統中的典型表現,主要體現在對緩存的使用上。

緩存通常用來存儲一些計算代價較高以及相對靜態變化較少的數據,常常在生產者和消費者之間起到緩衝的作用,使得二者可以解耦,各自異步進行。

利用緩存來解耦系統,帶來性能上的提升以及開發的便利,是在系統架構設計中需要掌握的一種通用的思路。

1.2 DataServer 分片機制

在大部分的服務註冊中心系統中,每台服務器都存儲着全量的服務註冊數據,服務器之間通過一致性協議(paxos、Raft 等)實現數據的複製,或者採用只保障最終一致性的算法,來實現異步數據複製。這樣的設計對於一般業務規模的系統來說沒有問題,而當應用於有着海量服務的龐大的業務系統來說,就會遇到性能瓶頸

為解決這一問題,SOFARegistry 採用了數據分片的方法。全量服務註冊數據不再保存在單機里,而是分佈於每個節點中,每台服務器保存一定量的服務註冊數據,同時進行多副本備份,從理論上實現了服務無限擴容,且實現了高可用,最終達到支撐海量數據的目的。

在各種數據分片算法中,SOFARegistry 採用了業界主流的一致性 Hash 算法做數據分片,當節點動態擴縮容時,數據仍能均勻分佈,維持數據的平衡。

在數據同步時,沒有採用與 Dynamo、Casandra、Tair、Codis、Redis cluster 等項目中類似的預分片機制,而是在 DataServer 內存里以 dataInfoId 為粒度進行操作日誌記錄,這種實現方式在某種程度上也實現了「預分片」,從而保障了數據同步的有效性

1.3 服務模型

為了更好的說明數據類型,我們只能從SOFA博客中大段摘取文字

1.3.1 服務發佈模型(PublisherRegister)

  • dataInfoId:服務唯一標識,由“<分組 group><租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 通常是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據做邏輯上的切分,使不同 group 和 instance 的服務數據在邏輯上完全獨立。模型里有 group 和 instanceId 字段,但這裡不額外列出來,讀者只要理解 dataInfoId 的含義即可。
  • zone:是一種單元化架構下的概念,代表一個機房內的邏輯單元,通常一個物理機房(Datacenter)包含多個邏輯單元(zone)。在服務發現場景下,發佈服務時需指定邏輯單元(zone),而訂閱服務者可以訂閱邏輯單元(zone)維度的服務數據,也可以訂閱物理機房(datacenter)維度的服務數據,即訂閱該 datacenter 下的所有 zone 的服務數據。
  • dataList:服務註冊數據,通常包含「協議」、「地址」和「額外的配置參數」,例如 SOFARPC 所發佈的數據類似bolt://192.168.1.100:8080?timeout=2000」。這裡使用 dataList,表示一個 PublisherRegister 可以允許同時發佈多個服務數據(但是通常只會發佈一個)。

1.3.2 服務訂閱模型(SubscriberRegister)

  • dataInfoId:服務唯一標識,上面已經解釋過了。
  • scope: 訂閱維度,共有 3 種訂閱維度:zone、dataCenter 和 global。zone 和 datacenter 的意義,在上述有關「zone」的介紹里已經解釋。global 維度涉及到機房間數據同步的特性,目前暫未開源。

關於「zone」和「scope」的概念理解,這裡再舉個例子。如下圖所示,物理機房內有 ZoneA 和 ZoneB 兩個單元,PublisherA 處於 ZoneA 里,所以發佈服務時指定了 zone=ZoneA,PublisherB 處於 ZoneB 里,所以發佈服務時指定了 zone=ZoneB;此時 Subscriber 訂閱時指定了 scope=datacenter 級別,因此它可以獲取到 PublisherA 和 PublisherB (如果 Subscriber 訂閱時指定了 scope=zone 級別,那麼它只能獲取到 PublisherA)。

1.3.3 dataInfoId

Data 層是數據服務器集群。Data 層通過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的唯一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用

SOFARegistry 最早選擇了一致性哈希分片,所以同樣遇到了數據分佈不固定帶來的數據同步難題。我們如何解決的呢?我們通過在 DataServer 內存中以 dataInfoId 的粒度記錄操作日誌,並且在 DataServer 之間也是以 dataInfoId 的粒度去做數據同步(一個服務就由一個 dataInfoId 唯標識)。其實這種日誌記錄的思想和虛擬桶是一致的,只是每個 datainfoId 就相當於一個 slot 了,這是一種因歷史原因而採取的妥協方案。在服務註冊中心的場景下,datainfoId 往往對應着一個發佈的服務,所以總量還是比較有限的,以螞蟻金服目前的規模,每台 DataServer 中承載的 dataInfoId 數量也僅在數萬的級別,勉強實現了 dataInfoId 作為 slot 的數據多副本同步方案。

最終一致性

SOFARegistry 在數據存儲層面採用了類似 Eureka 的最終一致性的過程,但是存儲內容上和 Eureka 在每個節點存儲相同內容特性不同,採用每個節點上的內容按照一致性 Hash 數據分片來達到數據容量無限水平擴展能力。

SOFARegistry 是一個 AP 分佈式系統,表明了在已有條件 P 的前提下,選擇了 A 可用性。當數據進行同步時,獲取到的數據與實際數據不一致。但因為存儲的信息為服務的註冊節點,儘管會有短暫的不一致產生,但對於客戶端來說,大概率還是能從這部分數據中找到可用的節點,不會因為數據暫時的不一致對業務系統帶來致命性的傷害。

集群內部數據遷移過程

SOFARegistry 的 DataServer 選擇了「一致性 Hash分片」來存儲數據。在「一致性 Hash分片」的基礎上,為了避免「分片數據不固定」這個問題,SOFARegistry 選擇了在 DataServer 內存里以 dataInfoId 的粒度記錄操作日誌,並且在 DataServer 之間也是以 dataInfoId 的粒度去做數據同步。

img

圖 DataServer 之間進行異步數據同步

數據和副本分別分佈在不同的節點上,進行一致性 Hash 分片,當時對主副本進行寫操作之後,主副本會把數據異步地更新到其他副本中,實現了集群內部不同副本之間的數據遷移工作。

1.3.4 版本號

為了確定服務發佈數據的變更,SOFA對於一個服務不僅定義了服務 ID,還對一個服務 ID 定義了對應的版本信息。

服務發佈數據變更主動通知到 Session 時,Session 對服務 ID 版本變更比較,高版本覆蓋低版本數據,然後進行推送。

因為有了服務 ID 的版本號,Session 可以定期發起版本號比較,如果Session 存儲的的服務 ID 版本號高於dataServer存儲的 ,Session再次拉取新版本數據進行推送,這樣避免了某次變更通知沒有通知到所有訂閱方的情況。

0x02 基本概念

首先,我們講講一些基本概念。

2.1 物理機房DataCenter

DataCenter代表一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。

nodeList.add(new DataNode(new URL("192.168.0.1", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.2", 9632), "DefaultDataCenter"));
nodeList.add(new DataNode(new URL("192.168.0.3", 9632), "DefaultDataCenter"));

2.2 Server節點DataNode

DataNode是Server節點,可以代表任意類型的Server,無論是meta,data,session。

public class DataNode implements Node, HashNode {
    private final URL    nodeUrl;
    private final String nodeName;
    private final String dataCenter;
    private String       regionId;
    private NodeStatus   nodeStatus;
    private long         registrationTimestamp;
}

2.3 數據節點DataServerNode

這是 Data Server 概念。

為什麼要有DataNode和DataServerNode兩個類似的數據結構類型

原來這是分屬於不同的包,或者模塊。

  • DataNode 是從 MetaServer 傳來的,被 DataServerCache 使用,而 DataServerCache 放在 cache 包。
  • DataServerNode 是 DataServer 本身自己依據信息構建的,被DataServerNodeFactory 使用,放在 remoting 包。

具體定義如下:

public class DataServerNode implements HashNode {

    private String     ip;

    private String     dataCenter;

    private Connection connection;
}

2.4 服務Publisher

Publisher 是服務概念,具體如下。

public class Publisher extends BaseInfo {
    private List<ServerDataBox> dataList;
    private PublishType         publishType      = PublishType.NORMAL;
}

2.5 服務聚合Datum

Datum類定義如下,可以看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是代表發佈者自己業務服務器。Datum則是從SOFARegistry整體角度做了整理,就是一個Session Server包括的服務聚合

public class Datum implements Serializable {
    private String                                dataInfoId;
    private String                                dataCenter;
    private String                                dataId;
    private String                                instanceId;
    private String                                group;
    private Map<String/*registerId*/, Publisher> pubMap           = new ConcurrentHashMap<>();
    private long                                  version;
    private boolean                               containsUnPub    = false;
}

0x03 本機 Data Server

以下是關於本Data Server服務器的數據結構。

3.1 本身Data Server狀態

DataNodeStatus代表本身Data Server的狀態

com.alipay.sofa.registry.server.data.node.DataNodeStatus

public enum LocalServerStatusEnum {
    INITIAL,
    WORKING
}

public class DataNodeStatus {
    private volatile LocalServerStatusEnum status = LocalServerStatusEnum.INITIAL;

    public LocalServerStatusEnum getStatus() {
        return status;
    }

    public void setStatus(LocalServerStatusEnum status) {
        LocalServerStatusEnum originStatus = this.status;
        this.status = status;
    }
}

3.1.1 設置狀態

DataServerCache . updateDataServerStatus 中有設置DataNodeStatus的狀態,比如:dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING)。

private void updateDataServerStatus() {
    if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {

        dataNodeStatus.setStatus(LocalServerStatusEnum.WORKING);

        //after working update current dataCenter list to old DataServerChangeItem
        updateItem(
            newDataServerChangeItem.getServerMap().get(dataServerConfig.getLocalDataCenter()),
            newVersion, dataServerConfig.getLocalDataCenter());
    }
}

另外 addNotWorkingServer 有 addStatus 操作。

public void addNotWorkingServer(long version, String ip) {
    synchronized (DataServerCache.class) {
        if (version >= curVersion.get()) {
            addStatus(version, ip, LocalServerStatusEnum.INITIAL);
            if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
                updateDataServerStatus();
            }
        }
    }
}

3.2 本DataServer配置

DataServerConfig包括了本DataServer全部配置,這裡只摘錄了相關信息。

public class DataServerConfig {
    public static final String IP    = NetUtil.getLocalAddress().getHostAddress();
    private CommonConfig       commonConfig; 
    private int                numberOfReplicas = 1000;
  
    public int getNumberOfReplicas() {
        return numberOfReplicas;
    }	
  
    public String getLocalDataCenter() {
        return commonConfig.getLocalDataCenter();
    }  
}

3.3 Beans

對於 DataServerConfig 和 DataNodeStatus,系統做了beans。

@Configuration
protected static class DataServerBootstrapConfigConfiguration {

    ...

    @Bean
    @ConditionalOnMissingBean
    public DataServerConfig dataServerConfig(CommonConfig commonConfig) {
        return new DataServerConfig(commonConfig);
    }

    @Bean
    public DataNodeStatus dataNodeStatus() {
        return new DataNodeStatus();
    }

    ...
}

具體如下:

+-----------------------------------------------+
|                                               |
|  [Data Server]                                |
|                                               |
|                                               |
| +---------------+       +------------------+  |
| | DataNodeStatus|       | DataServerConfig |  |
| +---------------+       +------------------+  |
|                                               |
+-----------------------------------------------+

0x04 Meta Server

因為不涉及到 Meta Server 內部架構,所以從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 即可

其中邏輯意義是:Map<dataCenter, Map<ip, Connection>>,就是哪些dataCenter中包括哪些Meta Server,對應於哪些ip

public class MetaServerConnectionFactory {
    private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();
}

具體如下:

+-----------------------------------------+
|       MetaServerConnectionFactory       |
|                                         |
|  Map<dataCenter, Map<ip, Connection> >  |
|                                         |
+-----------------------------------------+

0x05 Session Server

因為不涉及到 Session Server 內部架構,所以從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 即可

其中邏輯含義從注釋即可了解。

public class SessionServerConnectionFactory {
    private static final int               DELAY                       = 30 * 1000;
  
    private static final Map               EMPTY_MAP                   = new HashMap(0);
  
    /**
     * key  :   SessionServer address
     * value:   SessionServer processId
     */
    private final Map<String, String>      SESSION_CONN_PROCESS_ID_MAP = new ConcurrentHashMap<>();

    /**
     * key  :   SessionServer processId
     * value:   ip:port of clients
     */
    private final Map<String, Set<String>> PROCESS_ID_CONNECT_ID_MAP   = new ConcurrentHashMap<>();

    /**
     * key  :   SessionServer processId
     * value:   pair(SessionServer address, SessionServer connection)
     */
    private final Map<String, Pair>        PROCESS_ID_SESSION_CONN_MAP = new ConcurrentHashMap<>();

    @Autowired
    private DisconnectEventHandler         disconnectEventHandler;  
}

具體如下:

+------------------------------------------------------------------------------------+
|[SessionServerConnectionFactory]                                                    |
|                                                                                    |
|                                                                                    |
|                                                                                    |
|EMPTY_MAP                                                                           |
|                                                                                    |
|Map<SessionServer address, SessionServer processId>                                 |
|                                                                                    |
|Map<SessionServer processId, Set<ip:port of clients> >                              |
|                                                                                    |
|Map<SessionServer processId, pair(SessionServer address, SessionServer connection)> |
|                                                                                    |
+------------------------------------------------------------------------------------+

0x06 其他Data Server

因為涉及到與其他 Data Server 的深度交互,所以需要對其他 Data Server 的深度信息作存儲

分為兩個部分:DataServerNodeFactory和DataServerCache。

為什麼要有DataServerNodeFactory和DataServerCache兩個類似的數據結構類型

從注釋來看,是為了把功能分離細化,DataServerNodeFactory專註連接管理,DataServerCache注重dataServer的變化與版本管理

DataServerNodeFactory

the factory to hold other dataservers and connection connected to them

DataServerCache

cache of dataservers

所以也分別在不同的包,或者模塊。

  • DataServerCache 放在 cache 包。
  • DataServerNodeFactory 放在 remoting 包。

6.1 DataServerNodeFactory

DataServerNodeFactory 存儲了其他 Data Server 的 DataServerNode,因為 DataServerNode 本身就包括了 Connection,所以 DataServerNodeFactory 也間接的包含了 Connection,這從其類定義注釋可以看出,而且其定義是在remoting.dataserver包之中。

the factory to hold other dataservers and connection connected to them

DataServerNodeFactory 裏面按照兩個維度存儲同一類東西,就是其他DataServer :

  • Map<dataCenter, Map<ip, DataServerNode>> MAP;
  • Map<dataCenter, ConsistentHash>這裡會計算 DataServerNode 的一致性hash

具體定義如下:

public class DataServerNodeFactory {
    /**
     * row:     dataCenter
     * column:  ip
     * value    dataServerNode
     */
    private static final Map<String, Map<String, DataServerNode>>    MAP                 = new ConcurrentHashMap<>();

    /**
     * key:     dataCenter
     * value:   consistentHash
     */
    private static final Map<String, ConsistentHash<DataServerNode>> CONSISTENT_HASH_MAP = new ConcurrentHashMap<>();

    private static AtomicBoolean  init        = new AtomicBoolean(false);
}

6.1.1 添加

在DataServerChangeEventHandler.doHandle裏面會調用connectDataServer,其又會調用 DataServerNodeFactory.register(new DataServerNode(ip, dataCenter, conn), dataServerConfig); 來添加,其裏面又會生成一致性Hash。

6.1.2 使用

DefaultMetaServiceImpl 會調用 DataServerNodeFactory 來計算 consistentHash 來獲取 DataServerNode。

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Override
    public DataServerNode getDataServer(String dataCenter, String dataInfoId) {
        return DataServerNodeFactory.computeDataServerNode(dataCenter, dataInfoId);
    }  
}

6.2 DataServerCache

由注釋可以知道,這是其他dataservers的緩存

cache of dataservers

幾個關鍵變量:

nodeStatusMap 是本 Data Center 中所有 Data Server 的狀態。

  • dataServerChangeItem 是當前節點列表;
  • newDataServerChangeItem 是新加入的節點列表;有兩個變量的原因是因為有一個變化過程,所以加入一個New….;

具體如下:

com.alipay.sofa.registry.server.data.cache.DataServerCache
  
public class DataServerCache {

    @Autowired
    private DataNodeStatus                                dataNodeStatus;

    @Autowired
    private DataServerConfig                              dataServerConfig;

    @Autowired
    private AfterWorkingProcessHandler                    afterWorkingProcessHandler;

    /** current dataServer list and version */
    private volatile DataServerChangeItem                 dataServerChangeItem    = new DataServerChangeItem();

    /** new input dataServer list and version */
    private volatile DataServerChangeItem                 newDataServerChangeItem = new DataServerChangeItem();

    private final AtomicBoolean                           HAS_NOTIFY_ALL          = new AtomicBoolean(false);

    private AtomicLong                                    curVersion              = new AtomicLong(-1L);

    /** version -> Map(serverIp, serverStatus) */
    private Map<Long, Map<String, LocalServerStatusEnum>> nodeStatusMap           = new ConcurrentHashMap<>();
}  

下面介紹幾個DataServerCache的函數。

6.2.1 updateItem

updateItem會被幾個不同地方調用,進行更新dataServerChangeItem,就是插入一個new DataServer。

比如:LocalDataServerChangeEvent 和 DataServerChangeEvent 的響應函數就會調用。

public void updateItem(Map<String, DataNode> localDataNodes, Long version, String dataCenter) {
    synchronized (DataServerCache.class) {
        Long oldVersion = dataServerChangeItem.getVersionMap().get(dataCenter);
        Map<String, DataNode> oldList = dataServerChangeItem.getServerMap().get(dataCenter);
        Set<String> oldIps = oldList == null ? new HashSet<>() : oldList.keySet();
        Set<String> newIps = localDataNodes == null ? new HashSet<>() : localDataNodes.keySet();
        dataServerChangeItem.getServerMap().put(dataCenter, localDataNodes);
        dataServerChangeItem.getVersionMap().put(dataCenter, version);
   }
}

6.2.2 newDataServerChangeItem

newDataServerChangeItem 用這個來獲取所有的datacenters的所有DataServer

/**
 * get all datacenters
 *
 * @return
 */
public Set<String> getAllDataCenters() {
    return newDataServerChangeItem.getVersionMap().keySet();
}

6.2.3 dataServerChangeItem

dataServerChangeItem 被用來獲取某一個特定 data center的所有DataServer。

public Map<String, DataNode> getDataServers(String dataCenter) {
    return getDataServers(dataCenter, dataServerChangeItem);
}

public Map<String, DataNode> getDataServers(String dataCenter,
                                            DataServerChangeItem dataServerChangeItem) {
    return doGetDataServers(dataCenter, dataServerChangeItem);
}

private Map<String, DataNode> doGetDataServers(String dataCenter,
                                                   DataServerChangeItem dataServerChangeItem) {
        synchronized (DataServerCache.class) {
            Map<String, Map<String, DataNode>> dataserverMap = dataServerChangeItem.getServerMap();
            if (dataserverMap.containsKey(dataCenter)) {
                return dataserverMap.get(dataCenter);
            } else {
                return new HashMap<>();
            }
        }
}

6.2.4 DataServerChangeItem

此數據結構實際只用來獲取local data center的data servers

/**
 * change info of datacenters
 */
public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}

6.2.5 使用

有些類會間接使用dataServerCache。

比如:DefaultMetaServiceImpl . dataServerCache 會被NotifyOnlineHandler調用。

public class NotifyOnlineHandler extends AbstractServerHandler<NotifyOnlineRequest> {
    @Autowired
    private DataServerCache dataServerCache;

    @Override
    public Object doHandle(Channel channel, NotifyOnlineRequest request) {
        long version = request.getVersion();
        if (version >= dataServerCache.getCurVersion()) {
            dataServerCache.addNotWorkingServer(version, request.getIp());
        }
        return CommonResponse.buildSuccessResponse();
    }
}

NotifyOnlineHandler其配置在

@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
    Collection<AbstractServerHandler> list = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler()); //在這裡
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());
    return list;
}

屬於 dataSyncServer 響應函數的一部分。

private void openDataSyncServer() {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
}

具體如下:

+---------------------------------------------------+
|                                                   |
| [DataServerNodeFactory]                           |
|                                                   |
|                                                   |
| Map<dataCenter, Map<ip, dataServerNode> >         |
|                                                   |
| Map<dataCenter, ConsistentHash<DataServerNode> >  |
|                                                   |
+---------------------------------------------------+

+---------------------------------------------------+
| [DataServerCache]                                 |
|                                                   |
|                                                   |
| DataServerChangeItem dataServerChangeItem         |
|                                                   |
| DataServerChangeItem newDataServerChangeItem      |
|                                                   |
| Map<>ersion, Map<serverIp, serverStatus> >        |
|                                                   |
+---------------------------------------------------+

6.2.6 DataServer版本

從之前的MetaServer分析以及DataServerChangeItem,可知DataSever也是有版本的。

public class DataServerChangeItem {

    /** datacenter -> Map<ip, DataNode> */
    private Map<String, Map<String, DataNode>> serverMap;

    /** datacenter -> version */
    private Map<String, Long>                  versionMap;
}

在DataServer中,DefaultMetaServiceImpl中有設置版本號。

if (obj instanceof NodeChangeResult) {
    NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
    Map<String, Long> versionMap = result.getDataCenterListVersions();
    versionMap.put(result.getLocalDataCenter(), result.getVersion());
    return new DataServerChangeItem(result.getNodes(), versionMap);
}

其來源是MetaServer的DataStoreService。

dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> {
    if (localDataCenter.equalsIgnoreCase(dataCenter)) {
        nodeChangeResult.setVersion(dataNodeRepository.getVersion());
    }
    versionMap.put(dataCenter, dataNodeRepository.getVersion());
    Map<String, RenewDecorate<DataNode>> dataMap = dataNodeRepository.getNodeMap();
    Map<String, DataNode> newMap = new ConcurrentHashMap<>();
    dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
    pushNodes.put(dataCenter, newMap);
});

以及DataStoreService。

metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> {
    if (localDataCenter.equalsIgnoreCase(dataCenter)) {
        nodeChangeResult.setVersion(metaNodeRepository.getVersion());
    }
    versionMap.put(dataCenter, metaNodeRepository.getVersion());
    Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap();
    Map<String, MetaNode> newMap = new ConcurrentHashMap<>();
    dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
    pushNodes.put(dataCenter, newMap);
});

都是提取dataServer的版本號,發送出去。

0x07 服務信息

服務信息包括 Subscriber 和 Publisher,這些信息需要深度存儲,本文僅以 Publisher 為例分析。

前面描述了,Publisher包括在Datum之中,所以我們下面的講解以Datum為主。

7.1 Datum

Datum類定義如下,可以看到裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是代表發佈者自己業務服務器。Datum則是從SOFARegistry整體角度做了整理,就是一個Session Server包括的服務聚合

public class Datum implements Serializable {

    private String                                dataInfoId;

    private String                                dataCenter;

    private String                                dataId;

    private String                                instanceId;

    private String                                group;

    private Map<String/*registerId*/, Publisher> pubMap           = new ConcurrentHashMap<>();

    private long                                  version;

    private boolean                               containsUnPub    = false;
}

7.2 DatumCache

DatumCache緩存了所有Datum,就是本DataServer對應所有的SessionServer中所有的服務

public class DatumCache {

    @Autowired
    private DatumStorage localDatumStorage;
}

7.3 LocalDatumStorage

LocalDatumStorage負責Datum具體的存儲。

public class LocalDatumStorage implements DatumStorage {


    /**
     * row:     dataCenter
     * column:  dataInfoId
     * value:   datum
     */
    protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();

    /**
     * all datum index
     *
     * row:     ip:port
     * column:  registerId
     * value:   publisher
     */
    protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();

    @Autowired
    private DataServerConfig                            dataServerConfig;
}

7.4 Beans

相關的Bean如下:

@Configuration
public static class DataServerStorageConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public DatumCache datumCache() {
        return new DatumCache();
    }

    @Bean
    @ConditionalOnMissingBean
    public LocalDatumStorage localDatumStorage() {
        return new LocalDatumStorage();
    }
}

0x08 Datum的來龍去脈

8.1 Session Server 內部

首先,我們講講Session Server 內部如何獲取Datum

Session Server 內部,Datum存儲在 SessionCacheService 之中。

比如在 DataChangeFetchCloudTask 內部,可以這樣獲取 Datum。

private Map<String, Datum> getDatumsCache() {
    Map<String, Datum> map = new HashMap<>();
    NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
    Collection<String> dataCenters = nodeManager.getDataCenters();
    if (dataCenters != null) {
        Collection<Key> keys = dataCenters.stream().
                map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                        new DatumKey(fetchDataInfoId, dataCenter))).
                collect(Collectors.toList());

        Map<Key, Value> values = null;
        values = sessionCacheService.getValues(keys);

        if (values != null) {
            values.forEach((key, value) -> {
                if (value != null && value.getPayload() != null) {
                    map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                }
            });
        }
    }
    return map;
}

Session Server 會向 Data Server 發送 PublishDataRequest 請求。

8.2 PublishDataHandler

在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest。

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSuccessResponse();
    }
}

8.3 DataChangeEventCenter

在 DataChangeEventCenter 的 onChange 函數中,會進行投放。

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

8.4 DataChangeEventQueue

在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裡對Datum進行存儲。

8.5 DataChangeHandler

在 DataChangeHandler 之中,會提取ChangeData,然後進行Notify。

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

具體如下:

                                           +
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           |
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           +      +-------------------+

0x09 總結

至此,本文總結基本存儲結構如下:

9.1 基本概念

物理機房DataCenter

DataCenter代表一個物理機房。一個數據中心包括多個DataNode,這些DataNode就是同機房數據節點。

Server節點DataNode

DataNode是Server節點,可以代表任意類型的Server,無論是meta,data,session。

數據節點DataServerNode

這是 Data Server 概念。

服務Publisher

Publisher 是服務概念

服務聚合Datum

Datum裏面有一個ConcurrentHashMap,其中放入了Datum所包括的Publisher。

Publisher 只是代表發佈者自己業務服務器。Datum則是從SOFARegistry整體角度做了整理,就是一個Session Server包括的服務聚合

9.2 本身Data Server狀態

DataNodeStatus代表本身Data Server的狀態DataServerConfig包括了本DataServer全部配置。

9.3 Meta Server

因為不涉及到 Meta Server 內部架構,所以從 Data Server 角度看,只要存儲 Meta Server 對應的網絡 Connection 即可

9.4 Session Server

因為不涉及到 Session Server 內部架構,所以從 Data Server 角度看,只要存儲 Session Server 對應的網絡 Connection 即可

9.5 其他Data Server

因為涉及到與其他 Data Server 的深度交互,所以需要對其他 Data Server 的深度信息作存儲

分為兩個部分:DataServerNodeFactory 和 DataServerCache。

為什麼要有DataServerNodeFactory和DataServerCache兩個類似的數據結構類型

從注釋來看,是為了把功能分離細化,DataServerNodeFactory專註連接管理,DataServerCache注重dataServer的變化與版本管理

0xFF 參考

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 – SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通信框架SOFABolt解析之連接管理剖析

螞蟻金服開源通信框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通信框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通信框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 – 優雅簡潔的設計之道

SofaBolt源碼分析-服務啟動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 – UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 – Connection 事件處理機制的設計