[從源碼學設計]螞蟻金服SOFARegistry 之 服務註冊和操作日誌

[從源碼學設計]螞蟻金服SOFARegistry之服務註冊和操作日誌

0x00 摘要

SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。

本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。

本文為第十四篇,介紹SOFARegistry服務上線和操作日誌。上文是從Session Server角度,本文從 Data Server 角度介紹。

0x01 整體業務流程

我們首先回顧總體業務流程,這部分屬於數據分片。

1.1 服務註冊過程

回顧下「一次服務註冊過程」的服務數據在內部流轉過程。

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入記憶體 (SessionServer 會存儲 Client 的數據到記憶體,用於後續可以跟 DataServer 做定期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入記憶體 ,DataServer 會以 dataInfoId 的維度匯總所有 PublisherRegister。同時,DataServer 將該 dataInfoId 的變更事件通知給所有 SessionServer,變更事件的內容是 dataInfoId 和版本號資訊 version。
  4. 同時,非同步地,DataServer 以 dataInfoId 維度增量地同步數據給其他副本。因為 DataServer 在一致性 Hash 分片的基礎上,對每個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變更事件通知後,對比 SessionServer 記憶體中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了所有該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊之後的最新的服務列表數據。

因為篇幅所限,上文討論的是前兩點,本文介紹第三,第四點

1.2 數據分片

當服務上線時,會計算新增服務的 dataInfoId Hash 值,從而對該服務進行分片,最後尋找最近的一個節點,存儲到相應的節點上。

DataServer 服務在啟動時添加了 publishDataProcessor 來處理相應的服務發布者數據發布請求,該 publishDataProcessor 就是 PublishDataHandler。當有新的服務發布者上線,DataServer 的 PublishDataHandler 將會被觸發。

該 Handler 首先會判斷當前節點的狀態,若是非工作狀態則返回請求失敗。若是工作狀態,則觸發數據變化事件中心 DataChangeEventCenter 的 onChange 方法。

DataChangeEventQueue 中維護著一個 DataChangeEventQueue 隊列數組,數組中的每個元素是一個事件隊列。當上文中的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步確定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。

DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的執行緒調用,該方法會源源不斷地從隊列中獲取新增事件,並且進行分發。新增數據會由此添加進節點內,實現分片。

與此同時,DataChangeHandler 會把這個事件變更資訊通過 ChangeNotifier 對外發布,通知其他節點進行數據同步

0x02 基礎數據結構

這裡需要首先講解幾個相關數據結構。

2.1 Publisher

Publisher是數據發布者資訊

public class Publisher extends BaseInfo {
    private List<ServerDataBox> dataList;
    private PublishType         publishType      = PublishType.NORMAL;
}

2.2 Datum

從SOFARegistry本身出發而彙集的數據發布者資訊,裡面核心是 :

  • dataInfoId:服務唯一標識,由“<分組 group><租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 通常是 com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據做邏輯上的切分,使不同 group 和 instance 的服務數據在邏輯上完全獨立。模型里有 group 和 instanceId 欄位,但這裡不額外列出來,讀者只要理解 dataInfoId 的含義即可;
  • dataCenter:一個物理機房,包含多個邏輯單元(zone)。zone:是一種單元化架構下的概念,代表一個機房內的邏輯單元。在服務發現場景下,發布服務時需指定邏輯單元(zone),而訂閱服務者可以訂閱邏輯單元(zone)維度的服務數據,也可以訂閱物理機房(datacenter)維度的服務數據,即訂閱該 datacenter 下的所有 zone 的服務數據。;
  • pubMap:包括的Publisher;
  • version:對應的版本

具體程式碼如下:

public class Datum implements Serializable {
    private String                                dataInfoId;
    private String                                dataCenter;
    private String                                dataId;
    private String                                instanceId;
    private String                                group;
    private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
    private long                                  version;
    private boolean                               containsUnPub    = false;
}

2.3 DatumCache

DatumCache 是最新的Datum。

public class DatumCache {
    @Autowired
    private DatumStorage localDatumStorage;
}

具體存儲是在LocalDatumStorage中完成。

public class LocalDatumStorage implements DatumStorage {
    /**
     * row:     dataCenter
     * column:  dataInfoId
     * value:   datum
     */
    protected final Map<String, Map<String, Datum>>     DATUM_MAP            = new ConcurrentHashMap<>();

    /**
     * all datum index
     *
     * row:     ip:port
     * column:  registerId
     * value:   publisher
     */
    protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();

    @Autowired
    private DataServerConfig                            dataServerConfig;
}

2.4 Operator

Operator 是每一步Datum對應的操作

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

2.5 Acceptor

記錄了所有的Datum操作。其中:

  • logOperatorsOrder記錄了操作的順序;
  • logOperators是所有的操作;
public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

2.6 總結

總結下這幾個數據結構的聯繫:

  • Publisher是數據發布者資訊
  • Datum是從SOFARegistry本身出發而彙集的數據發布者資訊
  • DatumCache 是最新的Datum
  • Operator 是每一步Datum對應的操作
  • Acceptor記錄了所有的Datum操作

0x03 Datum的來龍去脈

我們先回顧下 Datum 的來龍去脈。

3.1 Session Server 內部

首先,我們講講Session Server 內部如何獲取Datum

在 Session Server 內部,Datum存儲在 SessionCacheService 之中。

比如在 DataChangeFetchCloudTask 內部,可以這樣獲取 Datum。

private Map<String, Datum> getDatumsCache() {
    Map<String, Datum> map = new HashMap<>();
    NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
    Collection<String> dataCenters = nodeManager.getDataCenters();
    if (dataCenters != null) {
        Collection<Key> keys = dataCenters.stream().
                map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
                        new DatumKey(fetchDataInfoId, dataCenter))).
                collect(Collectors.toList());

        Map<Key, Value> values = null;
        values = sessionCacheService.getValues(keys);

        if (values != null) {
            values.forEach((key, value) -> {
                if (value != null && value.getPayload() != null) {
                    map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
                }
            });
        }
    }
    return map;
}

Session Server 會向 Data Server 發送 PublishDataRequest 請求

3.2 PublishDataHandler

在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest

public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
    @Autowired
    private ForwardService                 forwardService;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DataChangeEventCenter          dataChangeEventCenter;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DatumLeaseManager              datumLeaseManager;

    @Autowired
    private ThreadPoolExecutor             publishProcessorExecutor;

    @Override
    public Object doHandle(Channel channel, PublishDataRequest request) {
        Publisher publisher = Publisher.internPublisher(request.getPublisher());
        if (forwardService.needForward()) {
            CommonResponse response = new CommonResponse();
            response.setSuccess(false);
            response.setMessage("Request refused, Server status is not working");
            return response;
        }

        dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());

        if (publisher.getPublishType() != PublishType.TEMPORARY) {
            String connectId = WordCache.getInstance().getWordCache(
                publisher.getSourceAddress().getAddressString());
            sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
                connectId);
            // record the renew timestamp
            datumLeaseManager.renew(connectId);
        }

        return CommonResponse.buildSuccessResponse();
    }
}

3.3 DataChangeEventCenter

在 DataChangeEventCenter 的 onChange 函數中,會進行投放

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

3.4 DataChangeEventQueue

在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裡對Datum進行存儲。

3.5 DataChangeHandler

在 DataChangeHandler 之中,會提取ChangeData,然後進行Notify。

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

具體如下:

                                           +
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           |
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           +      +-------------------+

0x04 DataChangeHandler處理

於是我們接著進行 DataChangeHandler 處理。即總述中提到的:DataChangeHandler 會把這個事件變更資訊:

  1. 把這個事件變更資訊變成Operator,放到AbstractAcceptorStore;
  2. 通過 ChangeNotifier 對外發布,通知其他節點進行數據同步;

下面我們從第一部分 :把這個事件變更資訊變成Operator,放到AbstractAcceptorStore 出發,進行講解日誌操作。

即如圖所示:

                                           +
                           Session Server  |  Data Server
                                           |
                                           |
                                           |
                                           +
+--------------------------+  PublishDataRequest   +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+               |       +------+-------------+
            ^                              |              |
            | getValues                    |              |  onChange(Publisher)
            |                              |              v
            |                              |     +--------+--------------+
  +---------+----------+                   |     | DataChangeEventCenter |
  |sessionCacheService |                   |     +--------+--------------+
  +--------------------+                   |              |
                                           |              |  Datum
                                           |              |
                                           |              v
                                           |     +--------+-------------+
                                           |     | DataChangeEventQueue |
                                           |     +--------+-------------+
                                           |              |
                                           |              |
                                           |              | ChangeData
                                           |              v
                                           |      +-------+-----------+
                                           |      | DataChangeHandler |
                                           |      +-------+-----------+
                                           |              |
                                           |              |
                                           |              v
                                           |      +-------+---------+
                                           |      |  ChangeNotifier |
                                           |      +-------+---------+
                                           |              |
                                           |              |
                                           |              v
                                           |   +----------+------------+
                                           |   | AbstractAcceptorStore |
                                           |   +-----------------------+
                                           +

Acceptor的appendOperator誰來調用?在Notifier 裡面有,比如:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及另一個:

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

0x05 AbstractAcceptorStore存儲

AbstractAcceptorStore是日誌存儲,我們下面詳細分析。

5.1 Bean

對於操作資訊,提供了一個Bean來存儲。

@Bean
public AcceptorStore localAcceptorStore() {
    return new LocalAcceptorStore();
}

5.2 StoreServiceFactory

作用是在 storeServiceMap 中存放各種 AcceptorStore,目前只有LocalAcceptorStore 這一個。

public class StoreServiceFactory implements ApplicationContextAware {

    private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();

    /**
     * get AcceptorStore by storeType
     * @param storeType
     * @return
     */
    public static AcceptorStore getStoreService(String storeType) {
        return storeServiceMap.get(storeType);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);

        map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
    }
}

5.3 AbstractAcceptorStore

AbstractAcceptorStore 是存儲的基本實現類,幾個基本成員是。

  • acceptors :是一個矩陣,按照dataCenter,dataInfoId維度來分類,存儲了此維度下的Acceptor;就是說,針對每一個dataCenter,dataInfoId的組合,都有一個Acceptor,用來存儲這下面的Operator。

  • notifyAcceptorsCache :是一個矩陣,按照dataCente,dataInfoId維度來分類,快取了此維度下需要進一步處理的Acceptor;

  • delayQueue :配合notifyAcceptorsCache使用,針對notifyAcceptorsCache的每一個新acceptor,系統會添加一個消息進入queue,這個queue等延時到了,就會取出,並且從notifyAcceptorsCache取出對應的新acceptor進行相應處理;

按說應該是 cache 有東西,所以dequeue 時候就會取出來,但是如果這期間多放入了幾個進入 Cache,原有cache 的 value 只是被替換而已,等時間到了,也會取出來

notifyAcceptorsCache 也是按照 data center 來控制的,只有定期 removeCache。

public abstract class AbstractAcceptorStore implements AcceptorStore {

    private static final int               DEFAULT_MAX_BUFFER_SIZE = 30;

    @Autowired
    protected IMetaServerService           metaServerService;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private DataServerConnectionFactory    dataServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors               = new ConcurrentHashMap<>();

    private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache    = new ConcurrentHashMap<>();

    private DelayQueue<DelayItem<Acceptor>>     delayQueue 
}

具體如下圖:

+-----------------------------+                      +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore]      |                      |
|                             |   +-> dataCenter +---+
|                             |   |                  |
|     acceptors  +--------------->+                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|                             |   |
|     notifyAcceptorsCache    |   |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
|           +                 |   +-> dataCenter +-->+
+-----------------------------+                      |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |
            |
            |                                        +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            |                     +-> dataCenter +-->+
            |                     |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
            +-------------------->+
                                  |                  +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>
                                  +-> dataCenter +---+
                                                     +--> dataInfoId +--->  Acceptor +--> Map<>ersion, Operator>

手機如圖:

有一點需要說明,就是delayQueue 為何要延遲隊列。這是由於SOFA的「秒級服務上下線通知「特性造成的

因為要實現此特性,所以涉及到了一個連接敏感性問題,即在 SOFARegistry 里,所有 Client 都與 SessionServer 保持長連接,每條長連接都會有基於 bolt 的連接心跳,如果連接斷開,Client 會馬上重新建連,時刻保證 Client 與 SessionServer 之間有可靠的連接。

因為強烈的連接敏感性,所以導致如果只是網路問題導致連接斷開,實際的進程並沒有宕機,那麼 Client 會馬上重連 SessionServer 並重新註冊所有服務數據。這種大量的短暫的服務下線後又重新上線會給用戶帶來困擾和麻煩

因此在 DataServer 內部實現了數據延遲合併的功能,就是這裡的DelayQueue

5.4 加入

addOperator的基本邏輯是:

  • 從Operator的Datum中提取dataCenter和dataInfoId;
  • 從acceptors取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
  • 從acceptorMap中提取dataInfoId對應的existAcceptor;
  • 如果新operator是SnapshotOperator類型,則清除之前的 opeator queue。
  • 否則加入新operator;
  • 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定時任務會處理;

在操作中,都是使用putIfAbsent,這樣短期內若有多個同樣value插入,則不會替換原有的value,這樣 起到了歸併作用

@Override
public void addOperator(Operator operator) {

    Datum datum = operator.getDatum();
    String dataCenter = datum.getDataCenter();
    String dataInfoId = datum.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }

        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        if (existAcceptor == null) {
            Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
                dataCenter, datumCache);
            existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
            if (existAcceptor == null) {
                existAcceptor = newAcceptor;
            }
        }

        if (operator instanceof SnapshotOperator) {
            //snapshot: clear the queue, Make other data retrieve the latest memory data
            existAcceptor.clearBefore();
        } else {
            existAcceptor.appendOperator(operator);
        }

        //put cache
        putCache(existAcceptor);
    } 
}

putCache的作用是:

  • 從acceptor中提取dataCenter和dataInfoId;
  • 從notifyAcceptorsCache中取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
  • 向acceptorMap中放入dataInfoId對應的acceptor;
  • 如果acceptorMap中之前沒有對應的value,則把acceptor放入delayQueue;

這裡也使用putIfAbsent,這樣短期內若有多個同樣value插入,則不會替換原有的value,這樣 起到了歸併作用

private void putCache(Acceptor acceptor) {

    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();

    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap == null) {
            Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
            acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
            if (acceptorMap == null) {
                acceptorMap = newMap;
            }
        }
        Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
        if (existAcceptor == null) {
            addQueue(acceptor);
        }
    } 
}

5.5 使用

具體消費是在定期任務中完成。消費日誌的目的就是同步日誌操作給其他 DataServer。

5.5.1 Scheduler

Scheduler類是定期任務,會啟動兩個執行緒池定期調用AcceptorStore的函數

public class Scheduler {
    private final ScheduledExecutorService scheduler;
  
    public final ExecutorService           versionCheckExecutor;

    private final ThreadPoolExecutor       expireCheckExecutor;

    @Autowired
    private AcceptorStore                  localAcceptorStore;

   public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

    }
  
    public void startScheduler() {
        scheduler.schedule(
                new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                30, TimeUnit.SECONDS);

        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

    }
}

AbstractAcceptorStore中函數如下:

5.5.2 changeDataCheck

changeDataCheck 內部是一個while true,所以不需要再使用執行緒池。

changeDataCheck綁定在delayQueue上,如果有新消息,則會取出Acceptor,也從notifyAcceptorsCache取出Acceptor,調用notifyChange(acceptor);進行處理 。

@Override
public void changeDataCheck() {

    while (true) {
        try {
            DelayItem<Acceptor> delayItem = delayQueue.take();
            Acceptor acceptor = delayItem.getItem();
            removeCache(acceptor); // compare and remove
        } catch (InterruptedException e) {
            break;
        } catch (Throwable e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

}

消費Cache用到的是removeCache。

private void removeCache(Acceptor acceptor) {
    String dataCenter = acceptor.getDataCenter();
    String dataInfoId = acceptor.getDataInfoId();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
        if (acceptorMap != null) {
            boolean result = acceptorMap.remove(dataInfoId, acceptor);
            if (result) {
                //data change notify
                notifyChange(acceptor);
            }
        }
    } 
}
5.5.2.1 通知NotifyDataSyncRequest

在removeCache中,也使用notifyChange進行了通知,邏輯如下:

  • 從acceptor中提取 DataInfoId;
  • 根據DataInfoId從meta service中獲取dataServerNodes的ip;
  • 遍歷ip,通過bolt server進行通知syncServer.sendSync,就是給ip對應的data center發送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());

    List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId());
    for (String targetDataIp : targetDataIps) {

        if (DataServerConfig.IP.equals(targetDataIp)) {
            continue;
        }

        Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());

        for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
            try {
                Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
                if (connection == null) {
                    TimeUtil.randomDelay(1000);
                    continue;
                }
                syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                    request, 1000);
                break;
            } 
        }
    }
}

這部分的調用邏輯為:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

具體如下圖:

+--------------------------+
|                          |     +----------------------------------------------------------------------+
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |
|                          |     |                                                                      |
+--------+-----------------+     |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |                                                                      |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors            |
         |   changeDataCheck     |                                                                      |
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手機如下圖:

5.5.3 checkAcceptorsChangAndExpired

checkAcceptorsChangAndExpired作用是遍歷acceptors每個acceptor,看看是否expired,進行處理。

@Override
public void checkAcceptorsChangAndExpired() {
    acceptors.forEach((dataCenter, acceptorMap) -> {
        if (acceptorMap != null && !acceptorMap.isEmpty()) {
            acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
        }
    });
}

此時,邏輯如下:

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                    +            +----------------------------------------------------------------------+
                    |
                    |
                    |
                    | NotifyDataSyncRequest
                    |
                    v
             +------+-----------+
             | Other DataServer |
             +------------------+

手機如下:

0x06 Acceptor日誌操作

這裡記錄了日誌,即記錄了所有的Datum操作。

操作日誌存儲採用Queue方式,獲取日誌時候通過當前版本號在堆棧內所在位置,把所有版本之後的操作日誌同步過來執行。

public class Acceptor {
    private final String                    dataInfoId;
    private final String                    dataCenter;
    private int                             maxBufferSize;
    static final int                        DEFAULT_DURATION_SECS = 30;
    private final Deque<Long/*version*/>   logOperatorsOrder     = new ConcurrentLinkedDeque<>();
    private Map<Long/*version*/, Operator> logOperators          = new ConcurrentHashMap<>();
    private final DatumCache                datumCache;
}

關鍵變數是:

  • logOperators:按照版本號為key存儲的map,用來存儲所有的Operator;
  • logOperatorsOrder:因為map沒有辦法排序,所以設置此queue來存儲版本號

Operator 就是每一步操作對應的Datum。

public class Operator {
    private Long               version;
    private Long               sourceVersion;
    private Datum              datum;
    private DataSourceTypeEnum sourceType;
}

6.1 appendOperator

此函數作用是:添加一個操作日誌。

  • 如果queue已經滿了,則取出第一個消息,為了向後段插入一個新的 。
  • 如果Operator版本號為空,則設置為0L;
  • 如果Operator的前一個版本號與queue尾部Operator版本號不一致,說明queue裡面不對了,需要清空map和queue。
  • 向map中加入Operator;
  • 如果是新版本的Operator,則把版本加入queue;

具體程式碼如下:

/**
 * append operator to queue,if queue is full poll the first element and append.
 * Process will check version sequence,it must append with a consequent increase in version,
 * otherwise queue will be clean
 *
 * @param operator
 */
public void appendOperator(Operator operator) {
    write.lock();
    try {
        if (isFull()) {
            logOperators.remove(logOperatorsOrder.poll());
        }
        if (operator.getSourceVersion() == null) {
            operator.setSourceVersion(0L);
        }
        Long tailVersion = logOperatorsOrder.peekLast();
        if (tailVersion != null) {
            //operation add not by solid sequence
            if (tailVersion.longValue() != operator.getSourceVersion().longValue()) {
                clearBefore();
            }
        }

        Operator previousOperator = logOperators.put(operator.getVersion(), operator);
        if (previousOperator == null) {
            logOperatorsOrder.add(operator.getVersion());
        } 
    } finally {
        write.unlock();
    }
}

appendOperator誰來調用?在Notifier 裡面有,比如:

public class BackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

以及

public class SnapshotBackUpNotifier implements IDataChangeNotifier {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public void notify(Datum datum, Long lastVersion) {
        syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
            DataSourceTypeEnum.BACKUP));
    }
}

6.2 checkExpired

此方法作用是去除過期日誌。version是時間戳,所以可以定期check,如果過期,就清除。

public void checkExpired(int durationSEC) {
    write.lock();
    try {
        //check all expired
        Long peekVersion = logOperatorsOrder.peek();
        if (peekVersion != null && isExpired(durationSEC, peekVersion)) {
            logOperators.remove(logOperatorsOrder.poll());
            checkExpired(durationSEC);
        }
    } finally {
        write.unlock();
    }
}

0x07 NotifyDataSyncRequest通知數據同步

此請求作用是通知接收端進行數據同步。

回憶下這部分的調用邏輯為:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest

7.1 NotifyDataSyncHandler

接收端data server通過NotifyDataSyncHandler處理

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements
                                                                                       AfterWorkingProcess {

    @Autowired
    private DataServerConfig                                      dataServerConfig;

    @Autowired
    private GetSyncDataHandler                                    getSyncDataHandler;

    @Autowired
    private DataChangeEventCenter                                 dataChangeEventCenter;

    private Executor                                              executor    = ExecutorFactory
                                                                                  .newFixedThreadPool(
                                                                                      10,
                                                                                      NotifyDataSyncHandler.class
                                                                                          .getSimpleName());

    private ThreadPoolExecutor                                    notifyExecutor;

    @Autowired
    private DataNodeStatus                                        dataNodeStatus;

    @Autowired
    private DatumCache                                            datumCache;
} 

7.1.1 doHandle

doHandle方法用來繼續處理。

@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
    final Connection connection = ((BoltChannel) channel).getConnection();
    if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
        noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
        return CommonResponse.buildSuccessResponse();
    }
    executorRequest(connection, request);
    return CommonResponse.buildSuccessResponse();
}

7.1.2 executorRequest

因為接到了發起端DataServer的同步通知NotifyDataSyncRequest,所以接收端DataServer主動發起拉取,進行同步數據。即調用GetSyncDataHandler來發送SyncDataRequest

private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
    executor.execute(() -> {
        fetchSyncData(connection, request);
    });
}

protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
    String dataInfoId = request.getDataInfoId();
    String dataCenter = request.getDataCenter();
    Datum datum = datumCache.get(dataCenter, dataInfoId);
    Long version = (datum == null) ? null : datum.getVersion();
    Long requestVersion = request.getVersion();

    if (version == null || requestVersion == 0L || version < requestVersion) {
        getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
            new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
            dataChangeEventCenter));
    } 
}

7.1.3 GetSyncDataHandler

GetSyncDataHandler和SyncDataCallback配合。

即調用GetSyncDataHandler來發送SyncDataRequest,用SyncDataCallback接收同步結果。

├── remoting
│   ├── dataserver
│   │   ├── DataServerConnectionFactory.java
│   │   ├── DataServerNodeFactory.java
│   │   ├── GetSyncDataHandler.java
│   │   ├── SyncDataCallback.java
│   │   ├── handler
│   │   └── task

GetSyncDataHandler 和 SyncDataCallback 這兩個輔助類的位置比較奇怪,大概因為是功能類,所以放在dataserver目錄下,個人認為也許單獨設置一個目錄存放更好。

public class GetSyncDataHandler {
    @Autowired
    private DataNodeExchanger   dataNodeExchanger;

    public void syncData(SyncDataCallback callback) {
        int tryCount = callback.getRetryCount();
        if (tryCount > 0) {
            try {
                callback.setRetryCount(--tryCount);
                dataNodeExchanger.request(new Request() {
                    @Override
                    public Object getRequestBody() {
                        return callback.getRequest();
                    }

                    @Override
                    public URL getRequestUrl() {
                        return new URL(callback.getConnection().getRemoteIP(), callback
                            .getConnection().getRemotePort());
                    }

                    @Override
                    public CallbackHandler getCallBackHandler() {
                        return new CallbackHandler() {
                            @Override
                            public void onCallback(Channel channel, Object message) {
                                callback.onResponse(message);
                            }

                            @Override
                            public void onException(Channel channel, Throwable exception) {
                                callback.onException(exception);
                            }

                            @Override
                            public Executor getExecutor() {
                                return callback.getExecutor();
                            }
                        };
                    }
                });
            }
        }
    }

}

7.1.4 SyncDataCallback

這裡接收同步結果。

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                } 
            }
        }
    }
}

此時邏輯如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +----------------------------------------------------------------------+
                     |
NotifyDataSyncRequest| 1         ^ 2
                     |           |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest
                     v           |
             +-------+-----------------------------------+
             |[Other DataServer] |                       |
             |                   |                       |
             |                   |                       |
             |                   +                       |
             |  GetSyncDataHandler      SyncDataCallback |
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手機如圖:

0x08 SyncDataRequest回送通知

SyncDataRequest發送回通知發送者。所以這裡是other DataServer 發送給 Sender DataServer

8.1 SyncDataRequest

public class SyncDataRequest implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private String            dataSourceType;

    /**
     * be null when dataInfoId not exist in local datumCache
     */
    private Long              version;
}

8.1.1 SyncDataRequest 從哪裡來

我們回憶下,SyncDataRequest 從哪裡來?在 NotifyDataSyncHandler 的響應函數中,會產生 SyncDataRequest。這裡會根據請求的資訊,從cache之中獲取infoId對應的version,然後發送請求。

public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {

    protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
        String dataInfoId = request.getDataInfoId();
        String dataCenter = request.getDataCenter();
        Datum datum = datumCache.get(dataCenter, dataInfoId);
        Long version = (datum == null) ? null : datum.getVersion();
        Long requestVersion = request.getVersion();

        if (version == null || requestVersion == 0L || version < requestVersion) {
            getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
                new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
                dataChangeEventCenter));
        } 
    }
}

進而在AbstractAcceptorStore之中

private void notifyChange(Acceptor acceptor) {

    Long lastVersion = acceptor.getLastVersion();

    //may be delete by expired
    if (lastVersion == null) {
        lastVersion = 0L;
    }

    NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
        acceptor.getDataCenter(), lastVersion, getType());
    
    syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
                        request, 1000);
}

8.2 syncDataHandler

通知發起者使用 SyncDataHandler 來處理。

  • syncDataHandler

節點間數據同步 Handler,該 Handler 被觸發時,會通過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回所有大於當前請求數據版本號的所有數據,便於節點間進行數據同步。

public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> {

    @Autowired
    private SyncDataService syncDataService;

    @Override
    public Object doHandle(Channel channel, SyncDataRequest request) {
        SyncData syncData = syncDataService.getSyncDataChange(request);
        return new GenericResponse<SyncData>().fillSucceed(syncData);
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    public Class interest() {
        return SyncDataRequest.class;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

8.3 SyncDataServiceImpl

具體業務服務是SyncDataServiceImpl。會從acceptorStore獲取data,即getSyncDataChange方法。

public class SyncDataServiceImpl implements SyncDataService {

    @Override
    public void appendOperator(Operator operator) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType()
            .toString());
        if (acceptorStore != null) {
            acceptorStore.addOperator(operator);
        } 
    }

    @Override
    public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) {
        AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest
            .getDataSourceType());
        if (acceptorStore != null) {
            return acceptorStore.getSyncData(syncDataRequest);
        } 
    }
}

關於appendOperator如何調用,前文有描述。

SyncDataServiceImpl會繼續調用到AbstractAcceptorStore。

8.4 AbstractAcceptorStore

根據dataCenter和dataInfoId獲取出Acceptor,然後返回其process後的數據。

@Override
public SyncData getSyncData(SyncDataRequest syncDataRequest) {

    String dataCenter = syncDataRequest.getDataCenter();
    String dataInfoId = syncDataRequest.getDataInfoId();

    Long currentVersion = syncDataRequest.getVersion();
    try {
        Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
        Acceptor existAcceptor = acceptorMap.get(dataInfoId);
        return existAcceptor.process(currentVersion);
    } 
}

8.5 Acceptor

然後是Acceptor的處理。

處理髮送數據的當前版本號,如果當前版本號存在於當前queue中,返回所有版本號大於當前版本號的Operator,否則所有Operator。

public SyncData process(Long currentVersion) {
    read.lock();
    try {
        Collection<Operator> operators = acceptOperator(currentVersion);
        List<Datum> retList = new LinkedList<>();
        SyncData syncData;
        boolean wholeDataTag = false;
        if (operators != null) {
            //first get all data
            if (operators.isEmpty()) {
                wholeDataTag = true;
                retList.add(datumCache.get(dataCenter, dataInfoId));
            } else {
                for (Operator operator : operators) {
                    retList.add(operator.getDatum());
                }
            }
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        } else {
            //no match get all data
            wholeDataTag = true;
            retList.add(datumCache.get(dataCenter, dataInfoId));
            syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
        }

        return syncData;
    } finally {
        read.unlock();
    }
}

同步數據結構如下:

public class SyncData implements Serializable {

    private String            dataInfoId;

    private String            dataCenter;

    private Collection<Datum> datums;

    private boolean           wholeDataTag;
}

此時圖示如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
                     v           |                                                      |
             +-------+-----------------------------------+                              |
             |[Other DataServer] |                       |                              |
             |                   |                       |                              |
             |                   |                       |                              |
             |                   +                       |                              |
             |  GetSyncDataHandler      SyncDataCallback |  <---------------------------+
             |                                           |
             |                                           |
             |                                           |
             |                                           |
             +-------------------------------------------+

手機如下:

0x09 SyncDataCallback接受者回調

回到接受者,遍歷接受到的所有Datum,逐一調用:

如果是全部datum,調用

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);

否則調用

dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)

具體如下:

public class SyncDataCallback implements InvokeCallback {

    private static final Executor EXECUTOR    = ExecutorFactory.newFixedThreadPool(5,
                                                  SyncDataCallback.class.getSimpleName());

    private static final int      RETRY_COUNT = 3;

    private Connection            connection;

    private SyncDataRequest       request;

    private GetSyncDataHandler    getSyncDataHandler;

    private int                   retryCount;

    private DataChangeEventCenter dataChangeEventCenter;

    @Override
    public void onResponse(Object obj) {
        GenericResponse<SyncData> response = (GenericResponse) obj;
        if (!response.isSuccess()) {
            getSyncDataHandler.syncData(this);
        } else {
            SyncData syncData = response.getData();
            Collection<Datum> datums = syncData.getDatums();
            DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
                .getDataSourceType());
            if (syncData.getWholeDataTag()) {
                //handle all data, replace cache with these datum directly
                for (Datum datum : datums) {
                    if (datum == null) {
                        datum = new Datum();
                        datum.setDataInfoId(syncData.getDataInfoId());
                        datum.setDataCenter(syncData.getDataCenter());
                    }
                    Datum.internDatum(datum);
                    dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
                    break;
                }
            } else {
                //handle incremental data one by one
                if (!CollectionUtils.isEmpty(datums)) {
                    for (Datum datum : datums) {
                        if (datum != null) {
                            Datum.internDatum(datum);
                            dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
                                dataSourceTypeEnum, datum);
                        }
                    }
                } 
            }
        }
    }
}

DataChangeEventCenter調用如下:

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

DataChangeEventQueue調用handleDatum處理,這部分在其他文章中已經講述。這裡只是貼出程式碼。

@Override
public void run() {
    if (changeData instanceof SnapshotData) {
        SnapshotData snapshotData = (SnapshotData) changeData;
        String dataInfoId = snapshotData.getDataInfoId();
        Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
        Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
        Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId);
        long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l;
        Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
        long version = datum != null ? datum.getVersion() : 0l;
        notify(datum, changeData.getSourceType(), null);
    } else {
        Datum datum = changeData.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        DataSourceTypeEnum sourceType = changeData.getSourceType();
        DataChangeTypeEnum changeType = changeData.getChangeType();

        if (changeType == DataChangeTypeEnum.MERGE
            && sourceType != DataSourceTypeEnum.BACKUP
            && sourceType != DataSourceTypeEnum.SYNC) {
            //update version for pub or unPub merge to cache
            //if the version product before merge to cache,it may be cause small version override big one
            datum.updateVersion();
        }

        long version = datum.getVersion();

        try {
            if (sourceType == DataSourceTypeEnum.CLEAN) {
                if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                }
            } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                notifyTempPub(datum, sourceType, changeType);
            } else {
                MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                Long lastVersion = mergeResult.getLastVersion();

                if (lastVersion != null
                    && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                    return;
                }

                //lastVersion null means first add datum
                if (lastVersion == null || version != lastVersion) {
                    if (mergeResult.isChangeFlag()) {
                        notify(datum, sourceType, lastVersion);
                    }
                }
            }
        } 
    }

}

9.1 DataChangeHandler

DataChangeHandler 會定期提取DataChangeEventCenter中的消息,然後進行處理。

ChangeNotifier存儲了Datum。因為此時版本號已經更新,所以不會再次通知,至此流程結束。

MergeResult mergeResult = datumCache.putDatum(changeType, datum);


//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
    if (mergeResult.isChangeFlag()) {
        notify(datum, sourceType, lastVersion);
     }
}

此時邏輯如下:

[Sender DataServer]

+--------------------------+                                                                                     +------------------------+
|                          |     +----------------------------------------------------------------------+        |                        |
|   versionCheckExecutor   |     | [AbstractAcceptorStore]                                              |        |   expireCheckExecutor  |
|                          |     |                                                                      |        |                        |
+--------+-----------------+     |                                                                      |        +--------------+---------+
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |                                                                      |                       |
         |                       |     Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors  <---------------------------------+
         |   changeDataCheck     |                                                                      |     checkAcceptorsChangAndExpired
         +---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
  removeCache / notifyChange     |                                                                      |
                     +           +------------------------------------------------+-----+---------------+
                     |                                                            ^     |
NotifyDataSyncRequest| 1   +-----------------+  3     +--------------------+   4  |     |
                     |     | syncDataHandler +------> | SyncDataServiceImpl+------+     |
                     |     +-----+-----------+        +--------------------+            |
                     |           ^ 2                                                    |
                     |           |                                                      |  5
                     |           |                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
                     |           |  SyncDataRequest                                     |
[Other DataServer]   |           |                                                      |
                     |           |                                                      |
                     |           |                                                      |
                     |           |              +---------------------------------------+
                     |           |              |
                     |           |              |
                     v           |              v
              +------+-----------++ +-----------+-------+  6    +-----------------------+  7   +--------------------+  8   +-----------------+
              | GetSyncDataHandler| |  SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler|
              +-------------------+ +-------------------+       +-----------------------+      +--------------------+      +-----------------+

手機上如下:

0x10 總結

回顧下「一次服務註冊過程」的服務數據在內部流轉過程。

  1. Client 調用 publisher.register 向 SessionServer 註冊服務。
  2. SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入記憶體 (SessionServer 會存儲 Client 的數據到記憶體,用於後續可以跟 DataServer 做定期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
  3. DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入記憶體 ,DataServer 會以 dataInfoId 的維度匯總所有 PublisherRegister。同時,DataServer 將該 dataInfoId 的變更事件通知給所有 SessionServer,變更事件的內容是 dataInfoId 和版本號資訊 version。
  4. 同時,非同步地,DataServer 以 dataInfoId 維度增量地同步數據給其他副本。因為 DataServer 在一致性 Hash 分片的基礎上,對每個分片保存了多個副本(默認是3個副本)。
  5. SessionServer 接收到變更事件通知後,對比 SessionServer 記憶體中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了所有該 dataInfoId 具體的 PublisherRegister 列表。
  6. 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊之後的最新的服務列表數據。

因為篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。如果以後有時間,會介紹最後兩點。

0xFF 參考

Eureka系列(六) TimedSupervisorTask類解析

Eureka的TimedSupervisorTask類(自動調節間隔的周期性任務)

java執行緒池ThreadPoolExecutor類使用詳解

Java執行緒池ThreadPoolExecutor實現原理剖析

深入理解Java執行緒池:ThreadPoolExecutor

深入理解Java執行緒池:ThreadPoolExecutor

Java中執行緒池ThreadPoolExecutor原理探究

螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容

螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路

服務註冊中心 Session 存儲策略 | SOFARegistry 解析

海量數據下的註冊中心 – SOFARegistry 架構介紹

服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析

螞蟻金服開源通訊框架SOFABolt解析之連接管理剖析

螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制

螞蟻金服開源通訊框架 SOFABolt 協議框架解析

螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析

螞蟻通訊框架實踐

sofa-bolt 遠程調用

sofa-bolt學習

SOFABolt 設計總結 – 優雅簡潔的設計之道

SofaBolt源碼分析-服務啟動到消息處理

SOFABolt 源碼分析

SOFABolt 源碼分析9 – UserProcessor 自定義處理器的設計

SOFARegistry 介紹

SOFABolt 源碼分析13 – Connection 事件處理機制的設計