[從源碼學設計]螞蟻金服SOFARegistry 之 服務註冊和操作日誌
- 2021 年 1 月 9 日
- 筆記
- 008_微服務, 210_SOFAStack
[從源碼學設計]螞蟻金服SOFARegistry之服務註冊和操作日誌
0x00 摘要
SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。
本文為第十四篇,介紹SOFARegistry服務上線和操作日誌。上文是從Session Server角度,本文從 Data Server 角度介紹。
0x01 整體業務流程
我們首先回顧總體業務流程,這部分屬於數據分片。
1.1 服務註冊過程
回顧下「一次服務註冊過程」的服務數據在內部流轉過程。
- Client 調用 publisher.register 向 SessionServer 註冊服務。
- SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入記憶體 (SessionServer 會存儲 Client 的數據到記憶體,用於後續可以跟 DataServer 做定期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
- DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入記憶體 ,DataServer 會以 dataInfoId 的維度匯總所有 PublisherRegister。同時,DataServer 將該 dataInfoId 的變更事件通知給所有 SessionServer,變更事件的內容是 dataInfoId 和版本號資訊 version。
- 同時,非同步地,DataServer 以 dataInfoId 維度增量地同步數據給其他副本。因為 DataServer 在一致性 Hash 分片的基礎上,對每個分片保存了多個副本(默認是3個副本)。
- SessionServer 接收到變更事件通知後,對比 SessionServer 記憶體中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了所有該 dataInfoId 具體的 PublisherRegister 列表。
- 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊之後的最新的服務列表數據。
因為篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。
1.2 數據分片
當服務上線時,會計算新增服務的 dataInfoId Hash 值,從而對該服務進行分片,最後尋找最近的一個節點,存儲到相應的節點上。
DataServer 服務在啟動時添加了 publishDataProcessor 來處理相應的服務發布者數據發布請求,該 publishDataProcessor 就是 PublishDataHandler。當有新的服務發布者上線,DataServer 的 PublishDataHandler 將會被觸發。
該 Handler 首先會判斷當前節點的狀態,若是非工作狀態則返回請求失敗。若是工作狀態,則觸發數據變化事件中心 DataChangeEventCenter 的 onChange 方法。
DataChangeEventQueue 中維護著一個 DataChangeEventQueue 隊列數組,數組中的每個元素是一個事件隊列。當上文中的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步確定出該服務註冊數據所在的隊列編號,進而把該變化的數據封裝成一個數據變化對象,傳入到隊列中。
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的時候被一個新的執行緒調用,該方法會源源不斷地從隊列中獲取新增事件,並且進行分發。新增數據會由此添加進節點內,實現分片。
與此同時,DataChangeHandler 會把這個事件變更資訊通過 ChangeNotifier 對外發布,通知其他節點進行數據同步。
0x02 基礎數據結構
這裡需要首先講解幾個相關數據結構。
2.1 Publisher
Publisher是數據發布者資訊。
public class Publisher extends BaseInfo {
private List<ServerDataBox> dataList;
private PublishType publishType = PublishType.NORMAL;
}
2.2 Datum
是從SOFARegistry本身出發而彙集的數據發布者資訊,裡面核心是 :
- dataInfoId:服務唯一標識,由“<分組 group>
和
<租戶 instanceId>構成,例如在 SOFARPC 的場景下,一個 dataInfoId 通常是
com.alipay.sofa.rpc.example.HelloService#@#SOFA#@#00001`,其中SOFA 是 group 名稱,00001 是租戶 id。group 和 instance 主要是方便對服務數據做邏輯上的切分,使不同 group 和 instance 的服務數據在邏輯上完全獨立。模型里有 group 和 instanceId 欄位,但這裡不額外列出來,讀者只要理解 dataInfoId 的含義即可; - dataCenter:一個物理機房,包含多個邏輯單元(zone)。zone:是一種單元化架構下的概念,代表一個機房內的邏輯單元。在服務發現場景下,發布服務時需指定邏輯單元(zone),而訂閱服務者可以訂閱邏輯單元(zone)維度的服務數據,也可以訂閱物理機房(datacenter)維度的服務數據,即訂閱該 datacenter 下的所有 zone 的服務數據。;
- pubMap:包括的Publisher;
- version:對應的版本
具體程式碼如下:
public class Datum implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataId;
private String instanceId;
private String group;
private Map<String/*registerId*/, Publisher> pubMap = new ConcurrentHashMap<>();
private long version;
private boolean containsUnPub = false;
}
2.3 DatumCache
DatumCache 是最新的Datum。
public class DatumCache {
@Autowired
private DatumStorage localDatumStorage;
}
具體存儲是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage {
/**
* row: dataCenter
* column: dataInfoId
* value: datum
*/
protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>();
/**
* all datum index
*
* row: ip:port
* column: registerId
* value: publisher
*/
protected final Map<String, Map<String, Publisher>> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>();
@Autowired
private DataServerConfig dataServerConfig;
}
2.4 Operator
Operator 是每一步Datum對應的操作。
public class Operator {
private Long version;
private Long sourceVersion;
private Datum datum;
private DataSourceTypeEnum sourceType;
}
2.5 Acceptor
記錄了所有的Datum操作。其中:
- logOperatorsOrder記錄了操作的順序;
- logOperators是所有的操作;
public class Acceptor {
private final String dataInfoId;
private final String dataCenter;
private int maxBufferSize;
static final int DEFAULT_DURATION_SECS = 30;
private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>();
private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
private final DatumCache datumCache;
}
2.6 總結
總結下這幾個數據結構的聯繫:
- Publisher是數據發布者資訊。
- Datum是從SOFARegistry本身出發而彙集的數據發布者資訊。
- DatumCache 是最新的Datum。
- Operator 是每一步Datum對應的操作。
- Acceptor記錄了所有的Datum操作。
0x03 Datum的來龍去脈
我們先回顧下 Datum 的來龍去脈。
3.1 Session Server 內部
首先,我們講講Session Server 內部如何獲取Datum
在 Session Server 內部,Datum存儲在 SessionCacheService 之中。
比如在 DataChangeFetchCloudTask 內部,可以這樣獲取 Datum。
private Map<String, Datum> getDatumsCache() {
Map<String, Datum> map = new HashMap<>();
NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META);
Collection<String> dataCenters = nodeManager.getDataCenters();
if (dataCenters != null) {
Collection<Key> keys = dataCenters.stream().
map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(),
new DatumKey(fetchDataInfoId, dataCenter))).
collect(Collectors.toList());
Map<Key, Value> values = null;
values = sessionCacheService.getValues(keys);
if (values != null) {
values.forEach((key, value) -> {
if (value != null && value.getPayload() != null) {
map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload());
}
});
}
}
return map;
}
Session Server 會向 Data Server 發送 PublishDataRequest 請求。
3.2 PublishDataHandler
在DataServer內部,PublishDataHandler 是用來處理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler<PublishDataRequest> {
@Autowired
private ForwardService forwardService;
@Autowired
private SessionServerConnectionFactory sessionServerConnectionFactory;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DatumLeaseManager datumLeaseManager;
@Autowired
private ThreadPoolExecutor publishProcessorExecutor;
@Override
public Object doHandle(Channel channel, PublishDataRequest request) {
Publisher publisher = Publisher.internPublisher(request.getPublisher());
if (forwardService.needForward()) {
CommonResponse response = new CommonResponse();
response.setSuccess(false);
response.setMessage("Request refused, Server status is not working");
return response;
}
dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter());
if (publisher.getPublishType() != PublishType.TEMPORARY) {
String connectId = WordCache.getInstance().getWordCache(
publisher.getSourceAddress().getAddressString());
sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(),
connectId);
// record the renew timestamp
datumLeaseManager.renew(connectId);
}
return CommonResponse.buildSuccessResponse();
}
}
3.3 DataChangeEventCenter
在 DataChangeEventCenter 的 onChange 函數中,會進行投放。
public void onChange(Publisher publisher, String dataCenter) {
int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
if (publisher instanceof UnPublisher) {
datum.setContainsUnPub(true);
}
if (publisher.getPublishType() != PublishType.TEMPORARY) {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB, datum));
} else {
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
DataSourceTypeEnum.PUB_TEMP, datum));
}
}
3.4 DataChangeEventQueue
在DataChangeEventQueue之中,會調用 handleDatum 來處理。在這裡對Datum進行存儲。
3.5 DataChangeHandler
在 DataChangeHandler 之中,會提取ChangeData,然後進行Notify。
public void start() {
DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
int queueCount = queues.length;
Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
Executor notifyExecutor = ExecutorFactory
.newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
for (int idx = 0; idx < queueCount; idx++) {
final DataChangeEventQueue dataChangeEventQueue = queues[idx];
final String name = dataChangeEventQueue.getName();
executor.execute(() -> {
while (true) {
final ChangeData changeData = dataChangeEventQueue.take();
notifyExecutor.execute(new ChangeNotifier(changeData, name));
}
});
}
}
具體如下:
+
Session Server | Data Server
|
|
|
|
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
+ +-------------------+
0x04 DataChangeHandler處理
於是我們接著進行 DataChangeHandler 處理。即總述中提到的:DataChangeHandler 會把這個事件變更資訊:
- 把這個事件變更資訊變成Operator,放到AbstractAcceptorStore;
- 通過 ChangeNotifier 對外發布,通知其他節點進行數據同步;
下面我們從第一部分 :把這個事件變更資訊變成Operator,放到AbstractAcceptorStore 出發,進行講解日誌操作。
即如圖所示:
+
Session Server | Data Server
|
|
|
+
+--------------------------+ PublishDataRequest +--------------------+
| DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler |
+-----------+--------------+ | +------+-------------+
^ | |
| getValues | | onChange(Publisher)
| | v
| | +--------+--------------+
+---------+----------+ | | DataChangeEventCenter |
|sessionCacheService | | +--------+--------------+
+--------------------+ | |
| | Datum
| |
| v
| +--------+-------------+
| | DataChangeEventQueue |
| +--------+-------------+
| |
| |
| | ChangeData
| v
| +-------+-----------+
| | DataChangeHandler |
| +-------+-----------+
| |
| |
| v
| +-------+---------+
| | ChangeNotifier |
| +-------+---------+
| |
| |
| v
| +----------+------------+
| | AbstractAcceptorStore |
| +-----------------------+
+
Acceptor的appendOperator誰來調用?在Notifier 裡面有,比如:
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
以及另一個:
public class SnapshotBackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
0x05 AbstractAcceptorStore存儲
AbstractAcceptorStore是日誌存儲,我們下面詳細分析。
5.1 Bean
對於操作資訊,提供了一個Bean來存儲。
@Bean
public AcceptorStore localAcceptorStore() {
return new LocalAcceptorStore();
}
5.2 StoreServiceFactory
作用是在 storeServiceMap 中存放各種 AcceptorStore,目前只有LocalAcceptorStore 這一個。
public class StoreServiceFactory implements ApplicationContextAware {
private static Map<String/*supportType*/, AcceptorStore> storeServiceMap = new HashMap<>();
/**
* get AcceptorStore by storeType
* @param storeType
* @return
*/
public static AcceptorStore getStoreService(String storeType) {
return storeServiceMap.get(storeType);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, AcceptorStore> map = applicationContext.getBeansOfType(AcceptorStore.class);
map.forEach((key, value) -> storeServiceMap.put(value.getType(), value));
}
}
5.3 AbstractAcceptorStore
AbstractAcceptorStore 是存儲的基本實現類,幾個基本成員是。
-
acceptors :是一個矩陣,按照dataCenter,dataInfoId維度來分類,存儲了此維度下的Acceptor;就是說,針對每一個dataCenter,dataInfoId的組合,都有一個Acceptor,用來存儲這下面的Operator。
-
notifyAcceptorsCache :是一個矩陣,按照dataCente,dataInfoId維度來分類,快取了此維度下需要進一步處理的Acceptor;
-
delayQueue :配合notifyAcceptorsCache使用,針對notifyAcceptorsCache的每一個新acceptor,系統會添加一個消息進入queue,這個queue等延時到了,就會取出,並且從notifyAcceptorsCache取出對應的新acceptor進行相應處理;
按說應該是 cache 有東西,所以dequeue 時候就會取出來,但是如果這期間多放入了幾個進入 Cache,原有cache 的 value 只是被替換而已,等時間到了,也會取出來。
notifyAcceptorsCache 也是按照 data center 來控制的,只有定期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore {
private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
@Autowired
protected IMetaServerService metaServerService;
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private DataServerConnectionFactory dataServerConnectionFactory;
@Autowired
private DatumCache datumCache;
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> acceptors = new ConcurrentHashMap<>();
private Map<String/*dataCenter*/, Map<String/*dataInfoId*/, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<>();
private DelayQueue<DelayItem<Acceptor>> delayQueue
}
具體如下圖:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|[AbstractAcceptorStore] | |
| | +-> dataCenter +---+
| | | |
| acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| | |
| notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| + | +-> dataCenter +-->+
+-----------------------------+ |
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
|
|
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
| +-> dataCenter +-->+
| | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-------------------->+
| +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
+-> dataCenter +---+
+--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手機如圖:
有一點需要說明,就是delayQueue 為何要延遲隊列。這是由於SOFA的「秒級服務上下線通知「特性造成的。
因為要實現此特性,所以涉及到了一個連接敏感性問題,即在 SOFARegistry 里,所有 Client 都與 SessionServer 保持長連接,每條長連接都會有基於 bolt 的連接心跳,如果連接斷開,Client 會馬上重新建連,時刻保證 Client 與 SessionServer 之間有可靠的連接。
因為強烈的連接敏感性,所以導致如果只是網路問題導致連接斷開,實際的進程並沒有宕機,那麼 Client 會馬上重連 SessionServer 並重新註冊所有服務數據。這種大量的短暫的服務下線後又重新上線會給用戶帶來困擾和麻煩。
因此在 DataServer 內部實現了數據延遲合併的功能,就是這裡的DelayQueue。
5.4 加入
addOperator的基本邏輯是:
- 從Operator的Datum中提取dataCenter和dataInfoId;
- 從acceptors取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
- 從acceptorMap中提取dataInfoId對應的existAcceptor;
- 如果新operator是SnapshotOperator類型,則清除之前的 opeator queue。
- 否則加入新operator;
- 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定時任務會處理;
在操作中,都是使用putIfAbsent,這樣短期內若有多個同樣value插入,則不會替換原有的value,這樣 起到了歸併作用。
@Override
public void addOperator(Operator operator) {
Datum datum = operator.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = acceptors.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.get(dataInfoId);
if (existAcceptor == null) {
Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId,
dataCenter, datumCache);
existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor);
if (existAcceptor == null) {
existAcceptor = newAcceptor;
}
}
if (operator instanceof SnapshotOperator) {
//snapshot: clear the queue, Make other data retrieve the latest memory data
existAcceptor.clearBefore();
} else {
existAcceptor.appendOperator(operator);
}
//put cache
putCache(existAcceptor);
}
}
putCache的作用是:
- 從acceptor中提取dataCenter和dataInfoId;
- 從notifyAcceptorsCache中取出dataCenter對應的Map<dataInfoId, Acceptor> acceptorMap;
- 向acceptorMap中放入dataInfoId對應的acceptor;
- 如果acceptorMap中之前沒有對應的value,則把acceptor放入delayQueue;
這裡也使用putIfAbsent,這樣短期內若有多個同樣value插入,則不會替換原有的value,這樣 起到了歸併作用。
private void putCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap == null) {
Map<String/*dataInfoId*/, Acceptor> newMap = new ConcurrentHashMap<>();
acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap);
if (acceptorMap == null) {
acceptorMap = newMap;
}
}
Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor);
if (existAcceptor == null) {
addQueue(acceptor);
}
}
}
5.5 使用
具體消費是在定期任務中完成。消費日誌的目的就是同步日誌操作給其他 DataServer。
5.5.1 Scheduler
Scheduler類是定期任務,會啟動兩個執行緒池定期調用AcceptorStore的函數。
public class Scheduler {
private final ScheduledExecutorService scheduler;
public final ExecutorService versionCheckExecutor;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
}
AbstractAcceptorStore中函數如下:
5.5.2 changeDataCheck
changeDataCheck 內部是一個while true,所以不需要再使用執行緒池。
changeDataCheck綁定在delayQueue上,如果有新消息,則會取出Acceptor,也從notifyAcceptorsCache取出Acceptor,調用notifyChange(acceptor);進行處理 。
@Override
public void changeDataCheck() {
while (true) {
try {
DelayItem<Acceptor> delayItem = delayQueue.take();
Acceptor acceptor = delayItem.getItem();
removeCache(acceptor); // compare and remove
} catch (InterruptedException e) {
break;
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
}
消費Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) {
String dataCenter = acceptor.getDataCenter();
String dataInfoId = acceptor.getDataInfoId();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = notifyAcceptorsCache.get(dataCenter);
if (acceptorMap != null) {
boolean result = acceptorMap.remove(dataInfoId, acceptor);
if (result) {
//data change notify
notifyChange(acceptor);
}
}
}
}
5.5.2.1 通知NotifyDataSyncRequest
在removeCache中,也使用notifyChange進行了通知,邏輯如下:
- 從acceptor中提取 DataInfoId;
- 根據DataInfoId從meta service中獲取dataServerNodes的ip;
- 遍歷ip,通過bolt server進行通知syncServer.sendSync,就是給ip對應的data center發送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) {
Long lastVersion = acceptor.getLastVersion();
NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
acceptor.getDataCenter(), lastVersion, getType());
List<String> targetDataIps = getTargetDataIp(acceptor.getDataInfoId());
for (String targetDataIp : targetDataIps) {
if (DataServerConfig.IP.equals(targetDataIp)) {
continue;
}
Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort());
for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) {
try {
Connection connection = dataServerConnectionFactory.getConnection(targetDataIp);
if (connection == null) {
TimeUtil.randomDelay(1000);
continue;
}
syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
request, 1000);
break;
}
}
}
}
這部分的調用邏輯為:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
具體如下圖:
+--------------------------+
| | +----------------------------------------------------------------------+
| versionCheckExecutor | | [AbstractAcceptorStore] |
| | | |
+--------+-----------------+ | |
| | |
| | |
| | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors |
| changeDataCheck | |
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
|
|
| NotifyDataSyncRequest
|
v
+------+-----------+
| Other DataServer |
+------------------+
手機如下圖:
5.5.3 checkAcceptorsChangAndExpired
checkAcceptorsChangAndExpired作用是遍歷acceptors每個acceptor,看看是否expired,進行處理。
@Override
public void checkAcceptorsChangAndExpired() {
acceptors.forEach((dataCenter, acceptorMap) -> {
if (acceptorMap != null && !acceptorMap.isEmpty()) {
acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
}
});
}
此時,邏輯如下:
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
|
|
| NotifyDataSyncRequest
|
v
+------+-----------+
| Other DataServer |
+------------------+
手機如下:
0x06 Acceptor日誌操作
這裡記錄了日誌,即記錄了所有的Datum操作。
操作日誌存儲採用Queue方式,獲取日誌時候通過當前版本號在堆棧內所在位置,把所有版本之後的操作日誌同步過來執行。
public class Acceptor {
private final String dataInfoId;
private final String dataCenter;
private int maxBufferSize;
static final int DEFAULT_DURATION_SECS = 30;
private final Deque<Long/*version*/> logOperatorsOrder = new ConcurrentLinkedDeque<>();
private Map<Long/*version*/, Operator> logOperators = new ConcurrentHashMap<>();
private final DatumCache datumCache;
}
關鍵變數是:
- logOperators:按照版本號為key存儲的map,用來存儲所有的Operator;
- logOperatorsOrder:因為map沒有辦法排序,所以設置此queue來存儲版本號;
Operator 就是每一步操作對應的Datum。
public class Operator {
private Long version;
private Long sourceVersion;
private Datum datum;
private DataSourceTypeEnum sourceType;
}
6.1 appendOperator
此函數作用是:添加一個操作日誌。
- 如果queue已經滿了,則取出第一個消息,為了向後段插入一個新的 。
- 如果Operator版本號為空,則設置為0L;
- 如果Operator的前一個版本號與queue尾部Operator版本號不一致,說明queue裡面不對了,需要清空map和queue。
- 向map中加入Operator;
- 如果是新版本的Operator,則把版本加入queue;
具體程式碼如下:
/**
* append operator to queue,if queue is full poll the first element and append.
* Process will check version sequence,it must append with a consequent increase in version,
* otherwise queue will be clean
*
* @param operator
*/
public void appendOperator(Operator operator) {
write.lock();
try {
if (isFull()) {
logOperators.remove(logOperatorsOrder.poll());
}
if (operator.getSourceVersion() == null) {
operator.setSourceVersion(0L);
}
Long tailVersion = logOperatorsOrder.peekLast();
if (tailVersion != null) {
//operation add not by solid sequence
if (tailVersion.longValue() != operator.getSourceVersion().longValue()) {
clearBefore();
}
}
Operator previousOperator = logOperators.put(operator.getVersion(), operator);
if (previousOperator == null) {
logOperatorsOrder.add(operator.getVersion());
}
} finally {
write.unlock();
}
}
appendOperator誰來調用?在Notifier 裡面有,比如:
public class BackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
以及
public class SnapshotBackUpNotifier implements IDataChangeNotifier {
@Autowired
private SyncDataService syncDataService;
@Override
public void notify(Datum datum, Long lastVersion) {
syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum,
DataSourceTypeEnum.BACKUP));
}
}
6.2 checkExpired
此方法作用是去除過期日誌。version是時間戳,所以可以定期check,如果過期,就清除。
public void checkExpired(int durationSEC) {
write.lock();
try {
//check all expired
Long peekVersion = logOperatorsOrder.peek();
if (peekVersion != null && isExpired(durationSEC, peekVersion)) {
logOperators.remove(logOperatorsOrder.poll());
checkExpired(durationSEC);
}
} finally {
write.unlock();
}
}
0x07 NotifyDataSyncRequest通知數據同步
此請求作用是通知接收端進行數據同步。
回憶下這部分的調用邏輯為:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
7.1 NotifyDataSyncHandler
接收端data server通過NotifyDataSyncHandler處理
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements
AfterWorkingProcess {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private GetSyncDataHandler getSyncDataHandler;
@Autowired
private DataChangeEventCenter dataChangeEventCenter;
private Executor executor = ExecutorFactory
.newFixedThreadPool(
10,
NotifyDataSyncHandler.class
.getSimpleName());
private ThreadPoolExecutor notifyExecutor;
@Autowired
private DataNodeStatus dataNodeStatus;
@Autowired
private DatumCache datumCache;
}
7.1.1 doHandle
doHandle方法用來繼續處理。
@Override
public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
final Connection connection = ((BoltChannel) channel).getConnection();
if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
return CommonResponse.buildSuccessResponse();
}
executorRequest(connection, request);
return CommonResponse.buildSuccessResponse();
}
7.1.2 executorRequest
因為接到了發起端DataServer的同步通知NotifyDataSyncRequest,所以接收端DataServer主動發起拉取,進行同步數據。即調用GetSyncDataHandler來發送SyncDataRequest
private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
executor.execute(() -> {
fetchSyncData(connection, request);
});
}
protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
String dataInfoId = request.getDataInfoId();
String dataCenter = request.getDataCenter();
Datum datum = datumCache.get(dataCenter, dataInfoId);
Long version = (datum == null) ? null : datum.getVersion();
Long requestVersion = request.getVersion();
if (version == null || requestVersion == 0L || version < requestVersion) {
getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
dataChangeEventCenter));
}
}
7.1.3 GetSyncDataHandler
GetSyncDataHandler和SyncDataCallback配合。
即調用GetSyncDataHandler來發送SyncDataRequest,用SyncDataCallback接收同步結果。
├── remoting
│ ├── dataserver
│ │ ├── DataServerConnectionFactory.java
│ │ ├── DataServerNodeFactory.java
│ │ ├── GetSyncDataHandler.java
│ │ ├── SyncDataCallback.java
│ │ ├── handler
│ │ └── task
GetSyncDataHandler 和 SyncDataCallback 這兩個輔助類的位置比較奇怪,大概因為是功能類,所以放在dataserver目錄下,個人認為也許單獨設置一個目錄存放更好。
public class GetSyncDataHandler {
@Autowired
private DataNodeExchanger dataNodeExchanger;
public void syncData(SyncDataCallback callback) {
int tryCount = callback.getRetryCount();
if (tryCount > 0) {
try {
callback.setRetryCount(--tryCount);
dataNodeExchanger.request(new Request() {
@Override
public Object getRequestBody() {
return callback.getRequest();
}
@Override
public URL getRequestUrl() {
return new URL(callback.getConnection().getRemoteIP(), callback
.getConnection().getRemotePort());
}
@Override
public CallbackHandler getCallBackHandler() {
return new CallbackHandler() {
@Override
public void onCallback(Channel channel, Object message) {
callback.onResponse(message);
}
@Override
public void onException(Channel channel, Throwable exception) {
callback.onException(exception);
}
@Override
public Executor getExecutor() {
return callback.getExecutor();
}
};
}
});
}
}
}
}
7.1.4 SyncDataCallback
這裡接收同步結果。
public class SyncDataCallback implements InvokeCallback {
private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5,
SyncDataCallback.class.getSimpleName());
private static final int RETRY_COUNT = 3;
private Connection connection;
private SyncDataRequest request;
private GetSyncDataHandler getSyncDataHandler;
private int retryCount;
private DataChangeEventCenter dataChangeEventCenter;
@Override
public void onResponse(Object obj) {
GenericResponse<SyncData> response = (GenericResponse) obj;
if (!response.isSuccess()) {
getSyncDataHandler.syncData(this);
} else {
SyncData syncData = response.getData();
Collection<Datum> datums = syncData.getDatums();
DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
.getDataSourceType());
if (syncData.getWholeDataTag()) {
//handle all data, replace cache with these datum directly
for (Datum datum : datums) {
if (datum == null) {
datum = new Datum();
datum.setDataInfoId(syncData.getDataInfoId());
datum.setDataCenter(syncData.getDataCenter());
}
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
break;
}
} else {
//handle incremental data one by one
if (!CollectionUtils.isEmpty(datums)) {
for (Datum datum : datums) {
if (datum != null) {
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
dataSourceTypeEnum, datum);
}
}
}
}
}
}
}
此時邏輯如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +----------------------------------------------------------------------+
|
NotifyDataSyncRequest| 1 ^ 2
| |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest
v |
+-------+-----------------------------------+
|[Other DataServer] | |
| | |
| | |
| + |
| GetSyncDataHandler SyncDataCallback |
| |
| |
| |
| |
+-------------------------------------------+
手機如圖:
0x08 SyncDataRequest回送通知
SyncDataRequest發送回通知發送者。所以這裡是other DataServer 發送給 Sender DataServer。
8.1 SyncDataRequest
public class SyncDataRequest implements Serializable {
private String dataInfoId;
private String dataCenter;
private String dataSourceType;
/**
* be null when dataInfoId not exist in local datumCache
*/
private Long version;
}
8.1.1 SyncDataRequest 從哪裡來
我們回憶下,SyncDataRequest 從哪裡來?在 NotifyDataSyncHandler 的響應函數中,會產生 SyncDataRequest。這裡會根據請求的資訊,從cache之中獲取infoId對應的version,然後發送請求。
public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {
protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
String dataInfoId = request.getDataInfoId();
String dataCenter = request.getDataCenter();
Datum datum = datumCache.get(dataCenter, dataInfoId);
Long version = (datum == null) ? null : datum.getVersion();
Long requestVersion = request.getVersion();
if (version == null || requestVersion == 0L || version < requestVersion) {
getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection,
new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()),
dataChangeEventCenter));
}
}
}
進而在AbstractAcceptorStore之中
private void notifyChange(Acceptor acceptor) {
Long lastVersion = acceptor.getLastVersion();
//may be delete by expired
if (lastVersion == null) {
lastVersion = 0L;
}
NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(),
acceptor.getDataCenter(), lastVersion, getType());
syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()),
request, 1000);
}
8.2 syncDataHandler
通知發起者使用 SyncDataHandler 來處理。
- syncDataHandler
節點間數據同步 Handler,該 Handler 被觸發時,會通過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回所有大於當前請求數據版本號的所有數據,便於節點間進行數據同步。
public class SyncDataHandler extends AbstractServerHandler<SyncDataRequest> {
@Autowired
private SyncDataService syncDataService;
@Override
public Object doHandle(Channel channel, SyncDataRequest request) {
SyncData syncData = syncDataService.getSyncDataChange(request);
return new GenericResponse<SyncData>().fillSucceed(syncData);
}
@Override
public HandlerType getType() {
return HandlerType.PROCESSER;
}
@Override
public Class interest() {
return SyncDataRequest.class;
}
@Override
protected Node.NodeType getConnectNodeType() {
return Node.NodeType.DATA;
}
}
8.3 SyncDataServiceImpl
具體業務服務是SyncDataServiceImpl。會從acceptorStore獲取data,即getSyncDataChange方法。
public class SyncDataServiceImpl implements SyncDataService {
@Override
public void appendOperator(Operator operator) {
AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType()
.toString());
if (acceptorStore != null) {
acceptorStore.addOperator(operator);
}
}
@Override
public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) {
AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest
.getDataSourceType());
if (acceptorStore != null) {
return acceptorStore.getSyncData(syncDataRequest);
}
}
}
關於appendOperator如何調用,前文有描述。
SyncDataServiceImpl會繼續調用到AbstractAcceptorStore。
8.4 AbstractAcceptorStore
根據dataCenter和dataInfoId獲取出Acceptor,然後返回其process後的數據。
@Override
public SyncData getSyncData(SyncDataRequest syncDataRequest) {
String dataCenter = syncDataRequest.getDataCenter();
String dataInfoId = syncDataRequest.getDataInfoId();
Long currentVersion = syncDataRequest.getVersion();
try {
Map<String/*dataInfoId*/, Acceptor> acceptorMap = acceptors.get(dataCenter);
Acceptor existAcceptor = acceptorMap.get(dataInfoId);
return existAcceptor.process(currentVersion);
}
}
8.5 Acceptor
然後是Acceptor的處理。
處理髮送數據的當前版本號,如果當前版本號存在於當前queue中,返回所有版本號大於當前版本號的Operator,否則所有Operator。
public SyncData process(Long currentVersion) {
read.lock();
try {
Collection<Operator> operators = acceptOperator(currentVersion);
List<Datum> retList = new LinkedList<>();
SyncData syncData;
boolean wholeDataTag = false;
if (operators != null) {
//first get all data
if (operators.isEmpty()) {
wholeDataTag = true;
retList.add(datumCache.get(dataCenter, dataInfoId));
} else {
for (Operator operator : operators) {
retList.add(operator.getDatum());
}
}
syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
} else {
//no match get all data
wholeDataTag = true;
retList.add(datumCache.get(dataCenter, dataInfoId));
syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList);
}
return syncData;
} finally {
read.unlock();
}
}
同步數據結構如下:
public class SyncData implements Serializable {
private String dataInfoId;
private String dataCenter;
private Collection<Datum> datums;
private boolean wholeDataTag;
}
此時圖示如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +------------------------------------------------+-----+---------------+
| ^ |
NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | |
| | syncDataHandler +------> | SyncDataServiceImpl+------+ |
| +-----+-----------+ +--------------------+ |
| ^ 2 |
| | | 5
| | |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest |
v | |
+-------+-----------------------------------+ |
|[Other DataServer] | | |
| | | |
| | | |
| + | |
| GetSyncDataHandler SyncDataCallback | <---------------------------+
| |
| |
| |
| |
+-------------------------------------------+
手機如下:
0x09 SyncDataCallback接受者回調
回到接受者,遍歷接受到的所有Datum,逐一調用:
如果是全部datum,調用
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
否則調用
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)
具體如下:
public class SyncDataCallback implements InvokeCallback {
private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5,
SyncDataCallback.class.getSimpleName());
private static final int RETRY_COUNT = 3;
private Connection connection;
private SyncDataRequest request;
private GetSyncDataHandler getSyncDataHandler;
private int retryCount;
private DataChangeEventCenter dataChangeEventCenter;
@Override
public void onResponse(Object obj) {
GenericResponse<SyncData> response = (GenericResponse) obj;
if (!response.isSuccess()) {
getSyncDataHandler.syncData(this);
} else {
SyncData syncData = response.getData();
Collection<Datum> datums = syncData.getDatums();
DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request
.getDataSourceType());
if (syncData.getWholeDataTag()) {
//handle all data, replace cache with these datum directly
for (Datum datum : datums) {
if (datum == null) {
datum = new Datum();
datum.setDataInfoId(syncData.getDataInfoId());
datum.setDataCenter(syncData.getDataCenter());
}
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
break;
}
} else {
//handle incremental data one by one
if (!CollectionUtils.isEmpty(datums)) {
for (Datum datum : datums) {
if (datum != null) {
Datum.internDatum(datum);
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,
dataSourceTypeEnum, datum);
}
}
}
}
}
}
}
DataChangeEventCenter調用如下:
public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
int idx = hash(datum.getDataInfoId());
DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
dataChangeEventQueues[idx].onChange(event);
}
DataChangeEventQueue調用handleDatum處理,這部分在其他文章中已經講述。這裡只是貼出程式碼。
@Override
public void run() {
if (changeData instanceof SnapshotData) {
SnapshotData snapshotData = (SnapshotData) changeData;
String dataInfoId = snapshotData.getDataInfoId();
Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId);
long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l;
Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
long version = datum != null ? datum.getVersion() : 0l;
notify(datum, changeData.getSourceType(), null);
} else {
Datum datum = changeData.getDatum();
String dataCenter = datum.getDataCenter();
String dataInfoId = datum.getDataInfoId();
DataSourceTypeEnum sourceType = changeData.getSourceType();
DataChangeTypeEnum changeType = changeData.getChangeType();
if (changeType == DataChangeTypeEnum.MERGE
&& sourceType != DataSourceTypeEnum.BACKUP
&& sourceType != DataSourceTypeEnum.SYNC) {
//update version for pub or unPub merge to cache
//if the version product before merge to cache,it may be cause small version override big one
datum.updateVersion();
}
long version = datum.getVersion();
try {
if (sourceType == DataSourceTypeEnum.CLEAN) {
if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
}
} else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
notifyTempPub(datum, sourceType, changeType);
} else {
MergeResult mergeResult = datumCache.putDatum(changeType, datum);
Long lastVersion = mergeResult.getLastVersion();
if (lastVersion != null
&& lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
return;
}
//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
if (mergeResult.isChangeFlag()) {
notify(datum, sourceType, lastVersion);
}
}
}
}
}
}
9.1 DataChangeHandler
DataChangeHandler 會定期提取DataChangeEventCenter中的消息,然後進行處理。
ChangeNotifier存儲了Datum。因為此時版本號已經更新,所以不會再次通知,至此流程結束。
MergeResult mergeResult = datumCache.putDatum(changeType, datum);
//lastVersion null means first add datum
if (lastVersion == null || version != lastVersion) {
if (mergeResult.isChangeFlag()) {
notify(datum, sourceType, lastVersion);
}
}
此時邏輯如下:
[Sender DataServer]
+--------------------------+ +------------------------+
| | +----------------------------------------------------------------------+ | |
| versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor |
| | | | | |
+--------+-----------------+ | | +--------------+---------+
| | | |
| | | |
| | | |
| | Map<dataCenter, Map<dataInfoId, Acceptor> > acceptors <---------------------------------+
| changeDataCheck | | checkAcceptorsChangAndExpired
+---------------------------> Map<dataCenter, Map<dataInfoId, Acceptor> > notifyAcceptorsCache |
removeCache / notifyChange | |
+ +------------------------------------------------+-----+---------------+
| ^ |
NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | |
| | syncDataHandler +------> | SyncDataServiceImpl+------+ |
| +-----+-----------+ +--------------------+ |
| ^ 2 |
| | | 5
| | |
+-------------------------------------------------------------------------------------------------------------------------------------------+
| | SyncDataRequest |
[Other DataServer] | | |
| | |
| | |
| | +---------------------------------------+
| | |
| | |
v | v
+------+-----------++ +-----------+-------+ 6 +-----------------------+ 7 +--------------------+ 8 +-----------------+
| GetSyncDataHandler| | SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler|
+-------------------+ +-------------------+ +-----------------------+ +--------------------+ +-----------------+
手機上如下:
0x10 總結
回顧下「一次服務註冊過程」的服務數據在內部流轉過程。
- Client 調用 publisher.register 向 SessionServer 註冊服務。
- SessionServer 收到服務數據 (PublisherRegister) 後,將其寫入記憶體 (SessionServer 會存儲 Client 的數據到記憶體,用於後續可以跟 DataServer 做定期檢查),再根據 dataInfoId 的一致性 Hash 尋找對應的 DataServer,將 PublisherRegister 發給 DataServer。
- DataServer 接收到 PublisherRegister 數據,首先也是將數據寫入記憶體 ,DataServer 會以 dataInfoId 的維度匯總所有 PublisherRegister。同時,DataServer 將該 dataInfoId 的變更事件通知給所有 SessionServer,變更事件的內容是 dataInfoId 和版本號資訊 version。
- 同時,非同步地,DataServer 以 dataInfoId 維度增量地同步數據給其他副本。因為 DataServer 在一致性 Hash 分片的基礎上,對每個分片保存了多個副本(默認是3個副本)。
- SessionServer 接收到變更事件通知後,對比 SessionServer 記憶體中存儲的 dataInfoId 的 version,若發現比 DataServer 發過來的小,則主動向 DataServer 獲取 dataInfoId 的完整數據,即包含了所有該 dataInfoId 具體的 PublisherRegister 列表。
- 最後,SessionServer 將數據推送給相應的 Client,Client 就接收到這一次服務註冊之後的最新的服務列表數據。
因為篇幅所限,上文討論的是前兩點,本文介紹第三,第四點。如果以後有時間,會介紹最後兩點。
0xFF 參考
Eureka系列(六) TimedSupervisorTask類解析
Eureka的TimedSupervisorTask類(自動調節間隔的周期性任務)
java執行緒池ThreadPoolExecutor類使用詳解
Java執行緒池ThreadPoolExecutor實現原理剖析
深入理解Java執行緒池:ThreadPoolExecutor
深入理解Java執行緒池:ThreadPoolExecutor
Java中執行緒池ThreadPoolExecutor原理探究
螞蟻金服服務註冊中心如何實現 DataServer 平滑擴縮容
螞蟻金服服務註冊中心 SOFARegistry 解析 | 服務發現優化之路
服務註冊中心 Session 存儲策略 | SOFARegistry 解析
海量數據下的註冊中心 – SOFARegistry 架構介紹
服務註冊中心數據分片和同步方案詳解 | SOFARegistry 解析
螞蟻金服開源通訊框架SOFABolt解析之超時控制機制及心跳機制
螞蟻金服服務註冊中心數據一致性方案分析 | SOFARegistry 解析
SOFABolt 源碼分析9 – UserProcessor 自定義處理器的設計
SOFABolt 源碼分析13 – Connection 事件處理機制的設計