[源码阅读] 阿里SOFA服务注册中心MetaServer(2)
- 2020 年 10 月 11 日
- 筆記
- 005_源码分析, 008_微服务, 010_业界方案, 210_SOFAStack
[源码阅读] 阿里SOFA服务注册中心MetaServer(2)
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列将带领大家一起分析其MetaServer的实现机制,本文为第二篇,介绍MetaServer基本功能,比如注册,存储,通知,续约等。
0x01 MetaServer 注册
1.1 Leader入口
MetaServer 的业务启动是从setLeaderProcessListener开始。
前面提到,MetaServer 集群内部基于 Raft 协议选举和复制,只要不超过 1⁄2 节点宕机,就可以对外服务。
Raft 协议由三个部分组成:
- 领导人选举(Leader Election)
- 日志复制(Log Replication)
- 安全性(Safety)
如果使用JRaft, 需要实现其状态机,而在MetaServer之中, jraft FSM 的具体实现是 ServiceStateMachine类(后文中会有大量 Raft 相关内容)。
在 raft 选出了MetaServer leader之后,ServiceStateMachine会调用 setLeaderProcessListener,其中又调用到了registerCurrentNode,这样就在MetaServer中注册了当前Meta节点。
raftServer.setLeaderProcessListener(new LeaderProcessListener() {
@Override
public void startProcess() {
executorManager.startScheduler();
PeerId leader = new PeerId(NetUtil.getLocalAddress().getHostAddress(),
metaServerConfig.getRaftServerPort());
registerCurrentNode(); //
raftServer.sendNotify(leader, "leader");
}
其运行堆栈如下 :
register:51, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
registerCurrentNode:203, RaftExchanger (com.alipay.sofa.registry.server.meta.remoting)
access$200:56, RaftExchanger (com.alipay.sofa.registry.server.meta.remoting)
startProcess:105, RaftExchanger$1 (com.alipay.sofa.registry.server.meta.remoting)
lambda$onLeaderStart$2:234, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
run:-1, 318295790 (com.alipay.sofa.registry.jraft.bootstrap.ServiceStateMachine$$Lambda$191)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
1.2 注册
注册行为由 Registry 完成。注册接口实现为:
public interface Registry<T extends Node> {
NodeChangeResult setNodes(List<T> nodes);
NodeChangeResult register(T node);
void cancel(String connectId, NodeType nodeType);
void evict();
void renew(T node, int duration);
void getOtherDataCenterNodeAndUpdate(NodeType nodeType);
DataCenterNodes getDataCenterNodes(NodeType nodeType);
NodeChangeResult getAllNodes(NodeType nodeType);
void pushNodeListChange(NodeType nodeType);
}
具体实现举例是:
public class MetaServerRegistry implements Registry<Node> {
@Override
public NodeChangeResult register(Node node) {
StoreService storeService = ServiceFactory.getStoreService(node.getNodeType());
return storeService.addNode(node);
}
}
Registry 根据不同的节点类型,获取对应的StoreService
完成添加节点服务,这里节点类型就是”META”。
node = {MetaNode@6342} "MetaNode{ip=192.168.1.2}"
nodeType = {Node$NodeType@6369} "META"
nodeUrl = {URL@6370} "URL{address='192.168.1.2:0'}"
dataCenter = "DefaultDataCenter"
name = "192.168.1.2"
regionId = null
nodeStatus = {Node$NodeStatus@6373} "INIT"
storeService 就是从上文 ServiceFactory 介绍的 storeServiceMap 中提取。
比如 MetaNode
,其对应 storeService 实现为 MetaStoreService
,所以就会用 MetaStoreService
完成存储。
storeServiceMap = {HashMap@6394} size = 3
{Node$NodeType@6454} "SESSION" -> {SessionStoreService@6455}
{Node$NodeType@6369} "META" -> {MetaStoreService@6456}
{Node$NodeType@6457} "DATA" -> {DataStoreService@6458}
1.3 存储服务
Node 然后由 StoreService
存储到 Repository
中,具体举例 MetaStoreService
实现为:
public class MetaStoreService implements StoreService<MetaNode> {
@RaftReference(uniqueId = "metaServer")
private RepositoryService<String, RenewDecorate<MetaNode>> metaRepositoryService;
@Override
public NodeChangeResult addNode(MetaNode metaNode) {
NodeChangeResult nodeChangeResult;
String ipAddress = metaNode.getNodeUrl().getIpAddress();
write.lock();
try {
//存放到repository(自动通过jraft同步给集群)
metaRepositoryService.put(ipAddress, new RenewDecorate(metaNode,
RenewDecorate.DEFAULT_DURATION_SECS));
//触发通知(需要通知data/session)
nodeChangeResult = getNodeChangeResult();
firePushDataListTask(nodeChangeResult, "addMetaNode");
firePushSessionListTask(nodeChangeResult, "addMetaNode");
} finally {
write.unlock();
}
return nodeChangeResult;
}
}
预先把存储流程总结如下:
+------------------------------+
+-----------------------+ | Map(String, NodeRepository) |
+--->+ metaRepositoryService +------->+ registry |
| +-----------------------+ +------------------------------+
|
|
|
Register +-------------------+ addNode +-----+-----------+
+------> | MetaServerRegistry| +--------> | MetaStoreService|
+-------------------+ +-----+-----------+
|
|
| +-------------------+ +--------------+ +----------------------+
+----------> |TaskListenerManager+---> |TaskDispatcher| +---> |DataNodeChangePushTask|
sendTaskEvent +-------------------+ +--------------+ +----------------------+
手机上参见:
1.4 Repository服务
Repository算是一个比较经典的概念了,封装数据查询和存储逻辑。定义(来自Martin Fowler的《企业应用架构模式》):Mediates between the domain and data mapping layers using a collection-like interface for accessing domain objects。
Repository是一个独立的层,介于领域层与数据映射层(数据访问层)之间。它的存在让领域层感觉不到数据访问层的存在,即提供一个类似集合的接口提供给领域层进行领域对象的访问。
Repository可以被认为是仓库管理员,领域层需要什么东西只需告诉仓库管理员,由仓库管理员把东西拿给它,并不需要知道东西实际放在哪。
在这里,Node并没有持久化,而是存储在内存中。于是就要 重点注意,虽然Repository存储一个Node是很简单的,但是在分布式状态下,如何保证 “在一个Repository中新Node的存储状态” 可以拓展到集群其他节点中?而且能保证 数据一致性呢?
1.4.1 MetaRepositoryService
MetaRepositoryService 内部的存储是在集群内部由Raft协议来保证数据一致性的。
后端 Repository 可以看作 SOFAJRaft 的状态机,任何对 Map 的操作都会在集群内部,交由 Raft 协议进行同步,从而达到集群内部的一致。从代码中可以看到MetaRepositoryService
加上了 RaftService
注解,这就是Raft的部分实现。
为了更好的说明,我们这里提前介绍下 Raft 的两个注解:RaftReference
和 RaftService
。
这两个注解可以认为是封装好Raft的,呈现给Registry的接口。RaftReference 对应了客户端代理,RaftService对应着服务端的实现。为什么要这么做?因为需要维护数据一致性,所以必须把单纯的本地调用转换为异步网络调用,这样才能用raft协议保证数据一致性。
RepositoryService
的具体实现类都加了@RaftService
注解,这样就说明自己是一个服务端;- 凡是 引用
RepositoryService
的地方,都加了@RaftReference
,这样调用RepositoryService
的函数就相当于客户端调用到服务端; - 凡是加了
@RaftReference
注解的属性,都会被动态代理类替换,其代理实现见ProxyHandler
类,即将方法调用,封装为ProcessRequest
,通过 RaftClient 发送给 RaftServer。
回到MetaRepositoryService
代码,具体如下:
@RaftService(uniqueId = "metaServer")
public class MetaRepositoryService extends AbstractSnapshotProcess
implements RepositoryService<String, RenewDecorate<MetaNode>> {
@Autowired
private NodeConfig nodeConfig;
/**
* meta node store and version
*/
private Map<String/*dataCenter*/, NodeRepository> registry = new ConcurrentHashMap<>();
private Set<String> snapShotFileNames = new HashSet<>();
}
1.4.2 put操作
当有新节点 时候,MetaRepositoryService 会进行 put 操作。在具体put函数调用中,实际上是将方法调用,封装为 ProcessRequest
,通过 RaftClient 发送给 RaftServer。这样就由 Raft 协议保证了数据一致性。
@Override
public RenewDecorate<MetaNode> put(String ipAddress, RenewDecorate<MetaNode> metaNode,
Long currentTimeMillis) {
try {
String dataCenter = metaNode.getRenewal().getDataCenter();
NodeRepository<MetaNode> metaNodeRepository = registry.get(dataCenter);
if (metaNodeRepository == null) {
NodeRepository<MetaNode> nodeRepository = new NodeRepository<>(dataCenter,
new ConcurrentHashMap<>(), currentTimeMillis);
// put操作实际上是调用到了服务端
metaNodeRepository = registry.put(dataCenter, nodeRepository);
if (metaNodeRepository == null) {
metaNodeRepository = nodeRepository;
}
}
metaNodeRepository.setVersion(currentTimeMillis);
Map<String/*ipAddress*/, RenewDecorate<MetaNode>> metaNodes = metaNodeRepository
.getNodeMap();
RenewDecorate oldRenewDecorate = metaNodes.get(ipAddress);
metaNodes.put(ipAddress, metaNode);
}
return metaNode;
}
1.4.3 节点数据存储
节点数据的存储,其本质上是存储在内存的哈希表中,其存储结构为:
// RepositoryService 底层存储
Map<String/*dataCenter*/, NodeRepository> registry;
// NodeRepository 底层存储
Map<String/*ipAddress*/, RenewDecorate<T>> nodeMap;
将RenewDecorate
存储到该 Map 中,整个节点注册的流程就完成了,至于如何和 Raft 协议进行结合和数据同步,后续会介绍。
节点移除的逻辑类似,将节点信息从该 Map 中删除,也会存储一个变更事件到队列。
最后结果如下 :
this = {MetaRepositoryService@6905}
registry = {ConcurrentHashMap@6907} size = 1
"DefaultDataCenter" -> {NodeRepository@7251}
snapShotFileNames = {HashSet@6904} size = 1
1.5 jraft实现数据一致性
成员列表数据存储在 Repository 中,Repository 被一致性协议层进行包装,作为 SOFAJRaft 的状态机实现,所有对 Repository 的操作都会同步到其他节点,通过 Registry 来操作存储层。
在同步时候,jraft可以直接调用 MetaRepositoryService,实现内部数据一致性。
这里说的是其他节点的同步操作,和上节不同。上节是主动存储节点,这里是被动同步。
put:45, MetaRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
put:31, RepositoryService (com.alipay.sofa.registry.server.meta.repository)
invokeInterface_L3_L:-1, 1412712349 (java.lang.invoke.LambdaForm$DMH)
reinvoke:-1, 1139092036 (java.lang.invoke.LambdaForm$BMH)
invoker:-1, 403463237 (java.lang.invoke.LambdaForm$MH)
invokeExact_MT:-1, 1568507411 (java.lang.invoke.LambdaForm$MH)
invokeWithArguments:627, MethodHandle (java.lang.invoke)
process:123, Processor (com.alipay.sofa.registry.jraft.processor)
onApply:133, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
doApplyTasks:534, FSMCallerImpl (com.alipay.sofa.jraft.core)
doCommitted:503, FSMCallerImpl (com.alipay.sofa.jraft.core)
runApplyTask:431, FSMCallerImpl (com.alipay.sofa.jraft.core)
access$100:72, FSMCallerImpl (com.alipay.sofa.jraft.core)
onEvent:147, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
onEvent:141, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
run:137, BatchEventProcessor (com.lmax.disruptor)
run:748, Thread (java.lang)
1.6 通知data, session
1.6.1 通知操作
前文 “1.3 存储服务” 中有 firePushDataListTask,firePushSessionListTask,最终目的是通知所有data/session有metaServer变动,这里把代码再贴出来温习下。
public class MetaStoreService implements StoreService<MetaNode> {
@Override
public NodeChangeResult addNode(MetaNode metaNode) {
//存放到repository(自动通过jraft同步给集群)
metaRepositoryService.put(ipAddress, new RenewDecorate(metaNode,
RenewDecorate.DEFAULT_DURATION_SECS));
//触发通知(需要通知data/session),我们说的就是这里
nodeChangeResult = getNodeChangeResult();
firePushDataListTask(nodeChangeResult, "addMetaNode");
firePushSessionListTask(nodeChangeResult, "addMetaNode");
}
}
firePushDataListTask 和 firePushSessionListTask 进而会往taskListenerManager发送消息,
private void firePushDataListTask(NodeChangeResult nodeChangeResult, String nodeOperate) {
TaskEvent taskEvent = new TaskEvent(nodeChangeResult, TaskType.DATA_NODE_CHANGE_PUSH_TASK);
taskEvent.setAttribute(Constant.PUSH_NEED_CONFIRM_KEY, false);
taskEvent.setAttribute(Constant.PUSH_TARGET_TYPE, NodeType.DATA);
taskEvent.setAttribute(Constant.PUSH_TARGET_OPERATOR_TYPE, nodeOperate);
taskListenerManager.sendTaskEvent(taskEvent);
}
private void firePushSessionListTask(NodeChangeResult nodeChangeResult, String nodeOperate) {
//notify all session node
TaskEvent taskEvent = new TaskEvent(nodeChangeResult, TaskType.DATA_NODE_CHANGE_PUSH_TASK);
taskEvent.setAttribute(Constant.PUSH_TARGET_TYPE, NodeType.SESSION);
taskEvent.setAttribute(Constant.PUSH_TARGET_OPERATOR_TYPE, nodeOperate);
taskListenerManager.sendTaskEvent(taskEvent);
}
此处是触发通知,于是data 节点, session节点会得到通知。
如何通知data节点, session节点?这里用到了Listener,即调用了 DefaultTaskListenerManager # sendTaskEvent。
堆栈如下 :
sendTaskEvent:45, DefaultTaskListenerManager (com.alipay.sofa.registry.task.listener)
firePushDataListTask:355, MetaStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:127, MetaStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:54, MetaStoreService (com.alipay.sofa.registry.server.meta.store)
register:52, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
registerCurrentNode:203, RaftExchanger (com.alipay.sofa.registry.server.meta.remoting)
access$200:56, RaftExchanger (com.alipay.sofa.registry.server.meta.remoting)
startProcess:105, RaftExchanger$1 (com.alipay.sofa.registry.server.meta.remoting)
lambda$onLeaderStart$2:234, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
run:-1, 50941601 (com.alipay.sofa.registry.jraft.bootstrap.ServiceStateMachine$$Lambda$192)
1.6.2 分发通知消息
TaskListenerManager 是用来分发各种通知消息的类。
public class DefaultTaskListenerManager implements TaskListenerManager {
@Override
public void sendTaskEvent(TaskEvent taskEvent) {
Collection<TaskListener> taskListeners = this.taskListeners.get(taskEvent.getTaskType());
for (TaskListener taskListener : taskListeners) {
taskListener.handleEvent(taskEvent);
}
}
}
在 taskListenerManager.addTaskListener(taskListener); 中注册了很多处理消息的handler,从下面运行变量的文字中可以看出其逻辑意义。
this.taskListeners = {ArrayListMultimap@7221}
expectedValuesPerKey = 3
map = {HashMap@7227} size = 4
{TaskEvent$TaskType@7235} "RECEIVE_STATUS_CONFIRM_NOTIFY_TASK" -> {ArrayList@7236} size = 1
key = {TaskEvent$TaskType@7235} "RECEIVE_STATUS_CONFIRM_NOTIFY_TASK"
value = {ArrayList@7236} size = 1
0 = {ReceiveStatusConfirmNotifyTaskListener@7248}
{TaskEvent$TaskType@7237} "PERSISTENCE_DATA_CHANGE_NOTIFY_TASK" -> {ArrayList@7238} size = 1
key = {TaskEvent$TaskType@7237} "PERSISTENCE_DATA_CHANGE_NOTIFY_TASK"
value = {ArrayList@7238} size = 1
0 = {PersistenceDataChangeNotifyTaskListener@7254}
{TaskEvent$TaskType@7239} "SESSION_NODE_CHANGE_PUSH_TASK" -> {ArrayList@7240} size = 1
key = {TaskEvent$TaskType@7239} "SESSION_NODE_CHANGE_PUSH_TASK"
value = {ArrayList@7240} size = 1
0 = {SessionNodeChangePushTaskListener@7252}
{TaskEvent$TaskType@7241} "DATA_NODE_CHANGE_PUSH_TASK" -> {ArrayList@7242} size = 1
key = {TaskEvent$TaskType@7241} "DATA_NODE_CHANGE_PUSH_TASK"
value = {ArrayList@7242} size = 1
0 = {DataNodeChangePushTaskListener@7250}
于是在 sendTaskEvent 之中会调用相关的消息handler来进行处理。
1.6.3 异步处理消息
具体处理消息,我们举例如下:
DataNodeChangePushTaskListener 就是用来处理DataNode相关消息的类。
public class DataNodeChangePushTaskListener implements TaskListener {
private TaskDispatcher<String, MetaServerTask> dataSingleTaskDispatcher;
private TaskDispatcher<String, MetaServerTask> sessionSingleTaskDispatcher;
@Override
public void handleEvent(TaskEvent event) {
NodeType nodeType = (NodeType) event.getAttribute(Constant.PUSH_TARGET_TYPE);
switch (nodeType) {
case SESSION:
MetaServerTask sessionNodeChangePushTask = new DataNodeChangePushTask(
NodeType.SESSION, metaServerConfig);
sessionNodeChangePushTask.setTaskEvent(event);
sessionSingleTaskDispatcher.dispatch(sessionNodeChangePushTask.getTaskId(),
sessionNodeChangePushTask, sessionNodeChangePushTask.getExpiryTime());
break;
case DATA:
MetaServerTask dataNodeChangePushTask = new DataNodeChangePushTask(NodeType.DATA,
metaServerConfig);
dataNodeChangePushTask.setTaskEvent(event);
dataSingleTaskDispatcher.dispatch(dataNodeChangePushTask.getTaskId(),
dataNodeChangePushTask, dataNodeChangePushTask.getExpiryTime());
break;
default:
break;
}
}
}
我们可以看到,TaskDispatcher 是一个分发异步消息,随之通过TaskExecutors进行异步操作的类。
假如是DataNode,最后会调用到 DataNodeChangePushTask,其是由 DataNodeSingleTaskProcessor 来执行。
其调用栈如下:
pushDataNodes:73, DataNodeServiceImpl (com.alipay.sofa.registry.server.meta.node.impl)
execute:86, DataNodeChangePushTask (com.alipay.sofa.registry.server.meta.task)
process:41, DataNodeSingleTaskProcessor (com.alipay.sofa.registry.server.meta.task.processor)
process:32, DataNodeSingleTaskProcessor (com.alipay.sofa.registry.server.meta.task.processor)
run:136, TaskExecutors$WorkerRunnable (com.alipay.sofa.registry.task.batcher)
run:748, Thread (java.lang)
具体到 DataNodeChangePushTask 来进行与各个DataNode进行交互的操作。
public class DataNodeChangePushTask extends AbstractMetaServerTask {
private final SessionNodeService sessionNodeService;
private final DataNodeService dataNodeService;
final private MetaServerConfig metaServerConfig;
final private NodeType nodeType;
private NodeChangeResult nodeChangeResult;
private Boolean confirm;
private String confirmNodeIp;
private Map<String, DataNode> targetNodes;
@Override
public void execute() {
switch (nodeType) {
case SESSION:
sessionNodeService.pushDataNodes(nodeChangeResult);
break;
case DATA:
dataNodeService
.pushDataNodes(nodeChangeResult, targetNodes, confirm, confirmNodeIp);
break;
default:
break;
}
}
}
以DataNodeServiceImpl为例,可以看到最后调用了 dataNodeExchanger 完成了节点间通讯。
public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private NodeExchanger dataNodeExchanger;
@Autowired
private StoreService dataStoreService;
@Autowired
private AbstractServerHandler dataConnectionHandler;
@Override
public void pushDataNodes(NodeChangeResult nodeChangeResult, Map<String, DataNode> targetNodes,
boolean confirm, String confirmNodeIp) {
if (nodeChangeResult != null) {
List<Throwable> exceptions = new ArrayList<>();
NodeConnectManager nodeConnectManager = getNodeConnectManager();
Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);
// add register confirm
StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
DataCenterNodes dataCenterNodes = storeService.getDataCenterNodes();
Map<String, DataNode> registeredNodes = dataCenterNodes.getNodes();
for (InetSocketAddress address : connections) {
try {
if (targetNodes != null && !targetNodes.isEmpty()) {
if (!targetNodes.keySet().contains(address.getAddress().getHostAddress())) {
continue;
}
} else {
if (!registeredNodes.keySet().contains(
address.getAddress().getHostAddress())) {
continue;
}
}
Request<NodeChangeResult> nodeChangeRequestRequest = new Request<NodeChangeResult>() {
@Override
public NodeChangeResult getRequestBody() {
return nodeChangeResult;
}
@Override
public URL getRequestUrl() {
return new URL(address);
}
};
// 节点间通讯
Response response = dataNodeExchanger.request(nodeChangeRequestRequest);
if (confirm) {
Object result = response.getResult();
if (result instanceof CommonResponse) {
CommonResponse genericResponse = (CommonResponse) result;
if (genericResponse.isSuccess()) {
confirmStatus(address, confirmNodeIp);
}
}
}
}
}
}
}
}
1.6.4 另一种产生通知方式
ExecutorManager的pushNodeListChange会定期检查,如果有必要,则产生通知。
对应后文的 DataConfirmStatusService 节点变更事件。
scheduler.schedule(
new TimedSupervisorTask("CheckDataNodeListChangePush", scheduler, checkNodeListChangePushExecutor,
metaServerConfig.getSchedulerCheckNodeListChangePushTimeout(), TimeUnit.SECONDS,
metaServerConfig.getSchedulerCheckNodeListChangePushExpBackOffBound(),
() -> metaServerRegistry.pushNodeListChange(NodeType.DATA)),
metaServerConfig.getSchedulerCheckNodeListChangePushFirstDelay(), TimeUnit.SECONDS);
其堆栈是:
pushNodeListChange:278, DataStoreService (com.alipay.sofa.registry.server.meta.store)
pushNodeListChange:103, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
lambda$startScheduler$4:158, ExecutorManager (com.alipay.sofa.registry.server.meta.executor)
run:-1, 758751909 (com.alipay.sofa.registry.server.meta.executor.ExecutorManager$$Lambda$202)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
比如
public class DataStoreService implements StoreService<DataNode> {
@Override
public void pushNodeListChange() {
NodeOperator<DataNode> fireNode;
if ((fireNode = dataConfirmStatusService.peekConfirmNode()) != null) {
NodeChangeResult nodeChangeResult = getNodeChangeResult();
Map<String, Map<String, DataNode>> map = nodeChangeResult.getNodes();
Map<String, DataNode> addNodes = map.get(nodeConfig.getLocalDataCenter());
if (addNodes != null) {
Map<String, DataNode> previousNodes = dataConfirmStatusService.putExpectNodes(
fireNode.getNode(), addNodes);
if (!previousNodes.isEmpty()) {
// 产生通知
firePushDataListTask(fireNode, nodeChangeResult, previousNodes, true);
}
}
// 产生通知
firePushSessionListTask(nodeChangeResult, fireNode.getNodeOperate().toString());
}
}
}
0x02 节点注册
2.1 DataApplication
2.1.1 DataConnectionHandler
当一个DataApplication启动,首先DataConnectionHandler会响应。
connected:40, DataConnectionHandler (com.alipay.sofa.registry.server.meta.remoting.connection)
onEvent:69, ConnectionEventAdapter (com.alipay.sofa.registry.remoting.bolt)
onEvent:44, ConnectionEventListener (com.alipay.remoting)
run:201, ConnectionEventHandler$1 (com.alipay.remoting)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)
感觉只是设置了一下。
public class DataConnectionHandler extends AbstractServerHandler implements NodeConnectManager {
private Map<String/*connectId*/, InetSocketAddress> connections = new ConcurrentHashMap<>();
@Override
public void connected(Channel channel) throws RemotingException {
super.connected(channel);
addConnection(channel);
}
}
2.1.2 DataNodeHandler
然后是 DataNodeHandler 会响应。
reply:43, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
handleRequest:54, SyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
dispatchToUserProcessor:239, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
doProcess:145, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
run:366, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)
具体代码如下。
public class DataNodeHandler extends AbstractServerHandler<DataNode> {
@Autowired
private Registry metaServerRegistry;
@Override
public Object reply(Channel channel, DataNode dataNode) {
NodeChangeResult nodeChangeResult;
nodeChangeResult = metaServerRegistry.register(dataNode);
return nodeChangeResult;
}
}
接下来是调用Store服务进行添加节点。
@Override
public NodeChangeResult register(Node node) {
StoreService storeService = ServiceFactory.getStoreService(node.getNodeType());
return storeService.addNode(node);
}
代码来到 DataStoreService,依然是调用 RepositoryService 来进行节点的注册和存储。
public class DataStoreService implements StoreService<DataNode> {
@Autowired
private TaskListenerManager taskListenerManager;
@RaftReference(uniqueId = "dataServer")
private RepositoryService<String, RenewDecorate<DataNode>> dataRepositoryService;
@RaftReference(uniqueId = "dataServer")
private NodeConfirmStatusService<DataNode> dataConfirmStatusService;
@Override
public NodeChangeResult addNode(DataNode dataNode) {
NodeChangeResult nodeChangeResult;
String ipAddress = dataNode.getNodeUrl().getIpAddress();
write.lock();
try {
dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode,
RenewDecorate.DEFAULT_DURATION_SECS));
renew(dataNode, 30);
nodeChangeResult = getNodeChangeResult();
dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD);
} finally {
write.unlock();
}
return nodeChangeResult;
}
}
dataConfirmStatusService.putConfirmNode
同时会存储一个变更事件到队列中,主要用于数据推送,消费处理。
@Override
public void putConfirmNode(DataNode node, DataOperator nodeOperate) {
expectNodesOrders.put(new NodeOperator(node, nodeOperate));
}
2.1.3 DataConfirmStatusService
DataConfirmStatusService 也是注解了RaftService,这说明是一个由 Raft 协议进行同步的存储。
以下的这些存储结构会被同步。
expectNodesOrders
用来存储节点变更事件;expectNodes
用来存储变更事件需要确认的节点,也就是说NodeOperator
只有得到了其他节点的确认,才会从expectNodesOrders
移除;snapShotFileNames
是快照文件名;
@RaftService(uniqueId = "dataServer")
public class DataConfirmStatusService extends AbstractSnapshotProcess
implements NodeConfirmStatusService<DataNode> {
private ConcurrentHashMap<DataNode/*node*/, Map<String/*ipAddress*/, DataNode>> expectNodes = new ConcurrentHashMap<>();
private BlockingQueue<NodeOperator> expectNodesOrders = new LinkedBlockingQueue();
private Set<String> snapShotFileNames
@Override
public void putConfirmNode(DataNode node, DataOperator nodeOperate) {
expectNodesOrders.put(new NodeOperator(node, nodeOperate));
}
public NodeOperator<DataNode> peekConfirmNode() {
return expectNodesOrders.peek();
}
}
2.1.4 消费
事件存储到 BlockingQueue expectNodesOrders 里,哪里去消费呢? 看源码发现,并不是想象中的使用一个线程阻塞的读。
在ExecutorManager
中会启动一个定时任务,轮询该队列有没有数据。即周期性的调用Registry#pushNodeListChange
方法,获取队列的头节点并消费。Data 和 Session 各对应一个任务。
具体流程如下图所示:
- 首先获取队列(expectNodesOrders)头节点,如果为Null直接返回;
- 获取当前数据中心的节点列表,并存储到确认表(expectNodes);
- 提交节点变更推送任务(firePushXxListTask);
- 处理任务,即调用 XxNodeService 的 pushXxxNode 方法,即通过 ConnectionHandler 获取所有的节点连接,发送节点列表;
- 收到回复后,如果需要确认,则会调用
StoreService#confirmNodeStatus
方法,将该节点从expectNodes中移除; - 待所有的节点从 expectNodes 中移除,则将此次操作从 expectNodesOrders 移除,处理完毕;
具体可以细化如下。
2.1.4.1 定时提交节点变更推送任务
本小节对应上面步骤的前三步。
比如关于Data 定时任务如下,里面定期调用了MetaServerRegistry 的 pushNodeListChange函数来处理任务 :
scheduler.schedule(
new TimedSupervisorTask("CheckDataNodeListChangePush", scheduler, checkNodeListChangePushExecutor,
metaServerConfig.getSchedulerCheckNodeListChangePushTimeout(), TimeUnit.SECONDS,
metaServerConfig.getSchedulerCheckNodeListChangePushExpBackOffBound(),
() -> metaServerRegistry.pushNodeListChange(NodeType.DATA)),
metaServerConfig.getSchedulerCheckNodeListChangePushFirstDelay(), TimeUnit.SECONDS);
MetaServerRegistry 中的 pushNodeListChange 定义如下:
@Override
public void pushNodeListChange(NodeType nodeType) {
StoreService storeService = ServiceFactory.getStoreService(nodeType);
if (storeService != null) {
storeService.pushNodeListChange();
}
}
DataStoreService 这里做了以下工作。
- 首先获取队列(expectNodesOrders)头节点,如果为Null直接返回;
- 获取当前数据中心的节点列表,并存储到确认表(expectNodes);
- 提交节点变更推送任务(firePushXxListTask);
@Override
public void pushNodeListChange() {
NodeOperator<DataNode> fireNode;
// 首先获取队列(expectNodesOrders)头节点,如果为Null直接返回;
if ((fireNode = dataConfirmStatusService.peekConfirmNode()) != null) {
NodeChangeResult nodeChangeResult = getNodeChangeResult();
Map<String, Map<String, DataNode>> map = nodeChangeResult.getNodes();
Map<String, DataNode> addNodes = map.get(nodeConfig.getLocalDataCenter());
if (addNodes != null) {
// 获取当前数据中心的节点列表,并存储到确认表(expectNodes);
Map<String, DataNode> previousNodes = dataConfirmStatusService.putExpectNodes(
fireNode.getNode(), addNodes);
// 提交节点变更推送任务(firePushXxListTask);
if (!previousNodes.isEmpty()) {
firePushDataListTask(fireNode, nodeChangeResult, previousNodes, true);
}
}
// 提交节点变更推送任务(firePushXxListTask);
firePushSessionListTask(nodeChangeResult, fireNode.getNodeOperate().toString());
}
}
这样就把任务提交到了Task任务。
2.1.4.2 定时异步处理任务
本小节对应上面步骤的后三步。
下面就是定时异步处理任务 。
- 处理任务,即调用 XxNodeService 的 pushXxxNode 方法,即通过 ConnectionHandler 获取所有的节点连接,发送节点列表;
- 收到回复后,如果需要确认,则会调用
StoreService#confirmNodeStatus
方法,将该节点从expectNodes中移除; - 待所有的节点从 expectNodes 中移除,则将此次操作从 expectNodesOrders 移除,处理完毕;
public class DataNodeServiceImpl implements DataNodeService {
@Override
public void pushDataNodes(NodeChangeResult nodeChangeResult, Map<String, DataNode> targetNodes,
boolean confirm, String confirmNodeIp) {
if (nodeChangeResult != null) {
NodeConnectManager nodeConnectManager = getNodeConnectManager();
Collection<InetSocketAddress> connections = nodeConnectManager.getConnections(null);
// add register confirm
StoreService storeService = ServiceFactory.getStoreService(NodeType.DATA);
DataCenterNodes dataCenterNodes = storeService.getDataCenterNodes();
Map<String, DataNode> registeredNodes = dataCenterNodes.getNodes();
// 通过 ConnectionHandler 获取所有的节点连接,发送节点列表;
for (InetSocketAddress address : connections) {
try {
Request<NodeChangeResult> nodeChangeRequestRequest = new Request<NodeChangeResult>() {
@Override
public NodeChangeResult getRequestBody() {
return nodeChangeResult;
}
@Override
public URL getRequestUrl() {
return new URL(address);
}
};
// 发送节点列表;
Response response = dataNodeExchanger.request(nodeChangeRequestRequest);
if (confirm) {
Object result = response.getResult();
if (result instanceof CommonResponse) {
CommonResponse genericResponse = (CommonResponse) result;
if (genericResponse.isSuccess()) {
// 收到回复后,如果需要确认,则会调用`StoreService#confirmNodeStatus` 方法,将该节点从expectNodes中移除;
confirmStatus(address, confirmNodeIp);
}
}
}
}
}
}
}
}
其堆栈如下:
pushDataNodes:73, DataNodeServiceImpl (com.alipay.sofa.registry.server.meta.node.impl)
execute:86, DataNodeChangePushTask (com.alipay.sofa.registry.server.meta.task)
process:41, DataNodeSingleTaskProcessor (com.alipay.sofa.registry.server.meta.task.processor)
process:32, DataNodeSingleTaskProcessor (com.alipay.sofa.registry.server.meta.task.processor)
run:136, TaskExecutors$WorkerRunnable (com.alipay.sofa.registry.task.batcher)
run:748, Thread (java.lang)
2.1.4.3 确认步骤
进一步细化上文提到的 “确认步骤”
private void confirmStatus(InetSocketAddress address, String confirmNodeIp) {
String ipAddress = address.getAddress().getHostAddress();
dataStoreService.confirmNodeStatus(ipAddress, confirmNodeIp);
}
然后是 DataStoreService
- 收到回复后,如果需要确认,则会调用
StoreService#confirmNodeStatus
方法,将该节点从expectNodes中移除; - 待所有的节点从 expectNodes 中移除,则将此次操作从 expectNodesOrders 移除,处理完毕;
@Override
public void confirmNodeStatus(String ipAddress, String confirmNodeIp) {
NodeOperator<DataNode> fireNode = dataConfirmStatusService.peekConfirmNode();
if (fireNode != null) {
String fireNodeIp = fireNode.getNode().getIp();
Map<String/*ipAddress*/, DataNode> waitNotifyNodes = dataConfirmStatusService
.getExpectNodes(fireNode.getNode());
if (waitNotifyNodes != null) {
Set<String> removeIp = getRemoveIp(waitNotifyNodes.keySet());
removeIp.add(ipAddress);
// 将该节点从expectNodes中移除;
waitNotifyNodes = dataConfirmStatusService.removeExpectConfirmNodes(
fireNode.getNode(), removeIp);
if (waitNotifyNodes.isEmpty()) {
//all node be notified,or some disconnect node be evict
try {
// 待所有的节点从 expectNodes 中移除,则将此次操作从 expectNodesOrders 移除,处理完毕;
if (null != dataConfirmStatusService
.removeExpectNodes((dataConfirmStatusService.pollConfirmNode())
.getNode())) {
//add init status must notify
if (fireNode.getNodeOperate() == DataOperator.ADD) {
notifyConfirm(fireNode.getNode());
}
}
}
}
} else {
try {
//wait node not exist,
dataConfirmStatusService.pollConfirmNode();
}
}
}
}
2.1.4.4 总结消费流程
对于消费流程,总结如下图:
+-------------------+ +-----------+ +------------------------+ +----------------+
|ServiceStateMachine+------> | Processor | +-----> |DataConfirmStatusService| | loop |
+-------------------+ +-----------+ +-----+------------------+ v |
| +------+------+ |
| put |TaskExecutors| |
v +------+------+ |
+--------+--------+ remo^e | |
+------------------> |expectNodesOrders| <-------+ | |
| loop +--------+--------+ | v |
| | | +------------+--------------+ |
| | peekConfirmNode | |DataNodeSingleTaskProcessor| |
| | | +------------+--------------+ |
| | | | |
+---------------------------+--------------------+ v | | |
| ExecutorManager | +----+---+ | v |
| +--------------------------------------------+ | |fireNode| | +----------+-----------+ |
| | TimedSuper^isorTask | | +----+---+ | |DataNodeChangePushTask| |
| | +---------------------------------------+ | | | | +----------+-----------+ |
| | | metaServerRegistry.pushNodeListChange | | | | putExpectNodes | | |
| | +---------------------------------------+ | | | | | |
| +--------------------------------------------+ | v | v |
+------------------------------------------------+ +------+------+ remo^e | +---------+---------+ |
^ | expectNodes | <---------+------------------<--------+ |DataNodeServiceImpl| |
| +------+------+ | +---------+---------+ |
| | | | |
| v | | |
| +-------------+--------+ | v |
| | firePushDataListTask +-----------------------------------> | +------+-------+ |
| +-------------+--------+ taskListenerManager.sendTaskEvent | | StoreService | |
| | | +------+-------+ ^
| v | | |
| +-------------+-----------+ | v |
^<--------------+ firePushSessionListTask +--------------------------------> | +-----+-------+ |
+-------------------------+taskListenerManager.sendTaskEvent +-----+confirmStatus| |
+-----+-------+ |
| |
>----------------+
手机上参见如下
2.1.5 总结
数据节点注册流程总结如下图。
+----------------------------+
put +----------------------+ | Map(String, NodeRepository)|
+----------> |dataRepositoryService +----> | registry |
| +----------------------+ +----------------------------+
|
register addNode |
+----------------+ +------------------+ +-------+--------+
| DataNodeHandler+----->+metaServerRegistry+------->+DataStoreService| TimedSupervisorTask
+----------------+ +------------------+ +-------+--------+
| +------------------------+ +------------------+
+----------> |dataConfirmStatusService| +------> |pushNodeListChange|
putConfirmNode +------------------------+ +------------------+
手机上参见如下
2.2 SessionNodeHandler
Session节点的注册和Data节点几乎类似。
SessionNodeHandler 定义如下 :
public class SessionNodeHandler extends AbstractServerHandler<SessionNode> {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionNodeHandler.class);
@Autowired
private Registry metaServerRegistry;
@Override
public Object reply(Channel channel, SessionNode sessionNode) {
NodeChangeResult nodeChangeResult;
try {
nodeChangeResult = metaServerRegistry.register(sessionNode);
}
return nodeChangeResult;
}
}
然后是进行业务处理的SessionStoreService。
public class SessionStoreService implements StoreService<SessionNode> {
@Override
public NodeChangeResult addNode(SessionNode sessionNode) {
write.lock();
try {
String ipAddress = sessionNode.getNodeUrl().getIpAddress();
sessionRepositoryService.put(ipAddress, new RenewDecorate(sessionNode,
RenewDecorate.DEFAULT_DURATION_SECS));
sessionVersionRepositoryService.checkAndUpdateVersions(nodeConfig.getLocalDataCenter(),
System.currentTimeMillis());
renew(sessionNode, 30);
sessionConfirmStatusService.putConfirmNode(sessionNode, DataOperator.ADD);
} finally {
write.unlock();
}
return dataStoreService.getNodeChangeResult();
}
}
然后是SessionRepositoryService。
注意,这里map的key是ip,这是与Data不一致的地方。
@RaftService(uniqueId = "sessionServer")
public class SessionRepositoryService extends AbstractSnapshotProcess
implements
RepositoryService<String, RenewDecorate<SessionNode>> {
/**
* session node store
*/
private ConcurrentHashMap<String/*ipAddress*/, RenewDecorate<SessionNode>> registry = new ConcurrentHashMap<>();
@Override
public RenewDecorate<SessionNode> put(String ipAddress, RenewDecorate<SessionNode> sessionNode,
Long currentTimeMillis) {
try {
RenewDecorate oldRenewDecorate = registry.get(ipAddress);
registry.put(ipAddress, sessionNode);
}
return sessionNode;
}
}
由上节可知,DataServer 和 SessionServer 都有处理节点注册请求的 Handler。注册行为由 Registry 完成。
比如 metaServerHandlers。
@Bean(name = "metaServerHandlers")
public Collection<AbstractServerHandler> metaServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(metaConnectionHandler());
list.add(getNodesRequestHandler());
return list;
}
0x03 注册信息续约
3.1 关键类 RenewDecorate
节点注册的时候,节点信息被 RenewDecorate
包装起来了,这个就是实现注册信息续约和驱逐的关键,该类定义 如下:
public class RenewDecorate<T> implements Serializable {
public static final int DEFAULT_DURATION_SECS = 15;
private T renewal; // 节点对象封装
private long beginTimestamp; // 注册事件
private volatile long lastUpdateTimestamp; // 续约时间
private long duration; // 超时时间
public boolean isExpired() {
return System.currentTimeMillis() > lastUpdateTimestamp + duration;
}
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
public void renew(long durationSECS) {
lastUpdateTimestamp = System.currentTimeMillis() + durationSECS * 1000;
}
}
该对象为注册节点信息,附加了注册时间、上次续约时间、过期时间。
续约操作就是修改lastUpdateTimestamp
,是否过期就是判断System.currentTimeMillis() - lastUpdateTimestamp > duration
是否成立,成立则认为节点超时进行驱逐。
3.2 执行路径
可以看到,renew可以从多条执行路径调用。
路径一:
renew:70, RenewDecorate (com.alipay.sofa.registry.server.meta.store)
replace:176, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:45, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:39, RepositoryService (com.alipay.sofa.registry.server.meta.repository)
invokeInterface_L3_L:-1, 1889671331 (java.lang.invoke.LambdaForm$DMH)
reinvoke:-1, 171310548 (java.lang.invoke.LambdaForm$BMH)
invoker:-1, 559310456 (java.lang.invoke.LambdaForm$MH)
invokeExact_MT:-1, 426124479 (java.lang.invoke.LambdaForm$MH)
invokeWithArguments:627, MethodHandle (java.lang.invoke)
process:123, Processor (com.alipay.sofa.registry.jraft.processor)
onApply:133, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
doApplyTasks:534, FSMCallerImpl (com.alipay.sofa.jraft.core)
doCommitted:503, FSMCallerImpl (com.alipay.sofa.jraft.core)
runApplyTask:431, FSMCallerImpl (com.alipay.sofa.jraft.core)
access$100:72, FSMCallerImpl (com.alipay.sofa.jraft.core)
onEvent:147, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
onEvent:141, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
run:137, BatchEventProcessor (com.lmax.disruptor)
run:748, Thread (java.lang)
路径二:
renew:171, DataStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:111, DataStoreService (com.alipay.sofa.registry.server.meta.store)
addNode:60, DataStoreService (com.alipay.sofa.registry.server.meta.store)
register:52, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
reply:43, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
reply:32, DataNodeHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
handleRequest:54, SyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
dispatchToUserProcessor:239, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
doProcess:145, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
run:366, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)
比如在 DataStoreService 之中
@Override
public NodeChangeResult addNode(DataNode dataNode) {
NodeChangeResult nodeChangeResult;
String ipAddress = dataNode.getNodeUrl().getIpAddress();
write.lock();
try {
dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode,
RenewDecorate.DEFAULT_DURATION_SECS));
renew(dataNode, 30); // 续约
nodeChangeResult = getNodeChangeResult();
dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD);
} finally {
write.unlock();
}
return nodeChangeResult;
}
进而调用renew
@Override
public void renew(DataNode dataNode, int duration) {
write.lock();
try {
String ipAddress = dataNode.getNodeUrl().getIpAddress();
RenewDecorate renewer = dataRepositoryService.get(ipAddress);
if (renewer == null) {
addNode(dataNode); // 新增
} else {
// 续约
if (duration > 0) {
dataRepositoryService.replace(ipAddress, new RenewDecorate(dataNode, duration));
} else {
dataRepositoryService.replace(ipAddress, new RenewDecorate(dataNode,
RenewDecorate.DEFAULT_DURATION_SECS));
}
}
} finally {
write.unlock();
}
}
因为是Raft对Repository进行数据一致性维护,所以 dataRepositoryService.replace 会被 Proxy 替换,进而来到了ProxyHandler。
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
try {
ProcessRequest request = new ProcessRequest();
request.setMethodArgSigs(createParamSignature(method.getParameterTypes()));
request.setMethodName(method.getName());
request.setMethodArgs(args);
request.setServiceName(serviceId);
if (Processor.getInstance().isLeaderReadMethod(method)) {
return doInvokeMethod(request);
}
return client.sendRequest(request);
}
}
当 Raft 来到 服务端,堆栈如下 :
replace:165, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:45, DataRepositoryService (com.alipay.sofa.registry.server.meta.repository.service)
replace:39, RepositoryService (com.alipay.sofa.registry.server.meta.repository)
invokeInterface_L3_L:-1, 1177311202 (java.lang.invoke.LambdaForm$DMH)
reinvoke:-1, 960369282 (java.lang.invoke.LambdaForm$BMH)
invoker:-1, 198860519 (java.lang.invoke.LambdaForm$MH)
invokeExact_MT:-1, 2035225037 (java.lang.invoke.LambdaForm$MH)
invokeWithArguments:627, MethodHandle (java.lang.invoke)
process:123, Processor (com.alipay.sofa.registry.jraft.processor)
onApply:133, ServiceStateMachine (com.alipay.sofa.registry.jraft.bootstrap)
doApplyTasks:534, FSMCallerImpl (com.alipay.sofa.jraft.core)
doCommitted:503, FSMCallerImpl (com.alipay.sofa.jraft.core)
runApplyTask:431, FSMCallerImpl (com.alipay.sofa.jraft.core)
access$100:72, FSMCallerImpl (com.alipay.sofa.jraft.core)
onEvent:147, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
onEvent:141, FSMCallerImpl$ApplyTaskHandler (com.alipay.sofa.jraft.core)
run:137, BatchEventProcessor (com.lmax.disruptor)
run:748, Thread (java.lang)
具体renew代码如下,这样就最终完成了更新:
@Override
public RenewDecorate<DataNode> replace(String ipAddress, RenewDecorate<DataNode> dataNode,
Long currentTimeMillis) {
try {
String dataCenter = dataNode.getRenewal().getDataCenter();
NodeRepository<DataNode> dataNodeRepository = registry.get(dataCenter);
if (dataNodeRepository != null) {
Map<String/*ipAddress*/, RenewDecorate<DataNode>> dataNodes = dataNodeRepository
.getNodeMap();
RenewDecorate<DataNode> oldRenewDecorate = dataNodes.get(ipAddress);
if (oldRenewDecorate != null && oldRenewDecorate.getRenewal() != null) {
oldRenewDecorate.setRenewal(dataNode.getRenewal());
oldRenewDecorate.renew();
}
}
return dataNode;
}
}
具体如下图
+---------------------------------------+ +---------------------------------------------+
| +------------------------------+ | | +----------------------------------+ |
| | +----------------+ registry |Client| | Server| +----------------------+registry | |
| | |DataStoreService| | | | | | DataRepositoryService| | |
| | +-----+----------+ | | | | +---------+------------+ | |
| | | replace | | | | ^ replace | |
| | | | | | | | | |
| | v | | | | +------+----+ | |
| | +-----+--------------------+ | | | | | Processor | | |
| | |DataRepositoryService stub| | | | | +------+----+ | |
| | +-----+--------------------+ | | | | ^ onApply | |
| | | | | | | | | |
| | v | | | | +-------+------+ | |
| | +-+---+ | | | | | StateMachine | | |
| | |Proxy| | | | | +-------+------+ | |
| | +-+---+ | | | | ^ process | |
| | | invoke | | | | | | |
| | v | | | | | | |
| | +----+-------+ | | | | +------+------+ | |
| | |ProxyHandler| | | | | |FSMCallerImpl| | |
| | +----+-------+ | | | | +------+------+ | |
| | | sendRequest | | | | ^ | |
| | v | | | | | received | |
| | +---+------+ | | | | | | |
| | |RaftClient| | | | | +-----------------+ | |
| | +----------+ | | network| | |RaftServerHandler| | |
| | | +-------------------> | +-----------------+ | |
| +------------------------------+ | | +----------------------------------+ |
| | | |
+---------------------------------------+ +---------------------------------------------+
手机上如下
3.3 ReNewNodesRequestHandler
和注册一样,续约请求的处理 Handler 为ReNewNodesRequestHandler
,最终交由 StoreService 进行续约操作。另外一点,续约的时候如果没有查询到注册节点,会触发节点注册的操作。
在初始化时候,就设置了ReNew handler。
@Bean(name = "sessionServerHandlers")
public Collection<AbstractServerHandler> sessionServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(sessionConnectionHandler());
list.add(sessionNodeHandler());
list.add(renewNodesRequestHandler()); // 注册到Server handler
list.add(getNodesRequestHandler());
list.add(fetchProvideDataRequestHandler());
return list;
}
@Bean(name = "dataServerHandlers")
public Collection<AbstractServerHandler> dataServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(dataConnectionHandler());
list.add(getNodesRequestHandler());
list.add(dataNodeHandler());
list.add(renewNodesRequestHandler()); // 注册到Server handler
list.add(fetchProvideDataRequestHandler());
return list;
}
具体定义如下:
public class RenewNodesRequestHandler extends AbstractServerHandler<RenewNodesRequest> {
@Autowired
private Registry metaServerRegistry;
@Override
public Object reply(Channel channel, RenewNodesRequest renewNodesRequest) {
Node renewNode = null;
renewNode = renewNodesRequest.getNode();
metaServerRegistry.renew(renewNode, renewNodesRequest.getDuration());
return null;
}
@Override
public Class interest() {
return RenewNodesRequest.class;
}
@Override
public HandlerType getType() {
return HandlerType.PROCESSER;
}
}
0x04 驱除
驱出的操作是由定时任务完成,MetaServer 在启动时会启动多个定时任务,详见ExecutorManager#startScheduler
,其中一个任务会调用Registry#evict
,其实现为遍历存储的 Map, 获得过期的列表,调用StoreService#removeNodes
方法,将他们从 Repository
中移除,这个操作也会触发变更通知。该任务默认每3秒执行一次。
4.1 配置
public void startScheduler() {
scheduler.schedule(new TimedSupervisorTask("HeartbeatCheck", scheduler, heartbeatCheckExecutor,
metaServerConfig.getSchedulerHeartbeatTimeout(), TimeUnit.SECONDS,
metaServerConfig.getSchedulerHeartbeatExpBackOffBound(), () -> metaServerRegistry.evict()),
metaServerConfig.getSchedulerHeartbeatFirstDelay(), TimeUnit.SECONDS);
}
4.2 驱除
这里就是遍历各种StoreService,获取其中的过期节点,然后进行驱除。
public class MetaServerRegistry implements Registry<Node> {
@Override
public void evict() {
for (NodeType nodeType : NodeType.values()) {
StoreService storeService = ServiceFactory.getStoreService(nodeType);
if (storeService != null) {
Collection<Node> expiredNodes = storeService.getExpired();
if (expiredNodes != null && !expiredNodes.isEmpty()) {
storeService.removeNodes(expiredNodes);
}
}
}
}
}
具体又会继续调用 DataStoreService。
@Override
public void removeNodes(Collection<DataNode> nodes) {
write.lock();
try {
if (nodes != null && !nodes.isEmpty()) {
for (DataNode dataNode : nodes) {
String ipAddress = dataNode.getNodeUrl().getIpAddress();
RenewDecorate<DataNode> dataNodeRemove = dataRepositoryService
.remove(ipAddress);
if (dataNodeRemove != null) {
dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.REMOVE);
}
}
}
} finally {
write.unlock();
}
}
最后调用到 DataRepositoryService
@Override
public RenewDecorate<DataNode> remove(Object key, Long currentTimeMillis) {
try {
String ipAddress = (String) key;
String dataCenter = nodeConfig.getLocalDataCenter();
NodeRepository<DataNode> dataNodeRepository = registry.get(dataCenter);
if (dataNodeRepository != null) {
Map<String/*ipAddress*/, RenewDecorate<DataNode>> dataNodes = dataNodeRepository
.getNodeMap();
if (dataNodes != null) {
RenewDecorate<DataNode> oldRenewDecorate = dataNodes.remove(ipAddress);
dataNodeRepository.setVersion(currentTimeMillis);
return oldRenewDecorate;
}
}
}
}
0x05 节点列表查询
Data,Meta, Session Server 都提供 getNodesRequestHandler
,用于处理查询当前节点列表的请求,其本质上从底层存储 Repository 读取数据返回。返回的结果的具体结构见 NodeChangeResult
类,包含各个数据中心的节点列表以及版本号。
public class NodeChangeResult<T extends Node> implements Serializable {
private final NodeType nodeType;
private Map<String/*dataCenter id*/, Map<String /*ipAddress*/, T>> nodes;
private Long version;
private Map<String/*dataCenter*/, Long /*version*/> dataCenterListVersions;
/** local dataCenter id */
private String localDataCenter;
}
5.1 配置
Data,Meta,Session Server 都提供 getNodesRequestHandler
。具体如下:
@Bean(name = "sessionServerHandlers")
public Collection<AbstractServerHandler> sessionServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(sessionConnectionHandler());
list.add(sessionNodeHandler());
list.add(renewNodesRequestHandler());
list.add(getNodesRequestHandler()); // 做了配置
list.add(fetchProvideDataRequestHandler());
return list;
}
@Bean(name = "dataServerHandlers")
public Collection<AbstractServerHandler> dataServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(dataConnectionHandler());
list.add(getNodesRequestHandler()); // 做了配置
list.add(dataNodeHandler());
list.add(renewNodesRequestHandler());
list.add(fetchProvideDataRequestHandler());
return list;
}
@Bean(name = "metaServerHandlers")
public Collection<AbstractServerHandler> metaServerHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(metaConnectionHandler());
list.add(getNodesRequestHandler()); // 做了配置
return list;
}
getNodesRequestHandler的Bean配置生成如下:
@Configuration
public static class MetaServerRemotingConfiguration {
@Bean
public AbstractServerHandler getNodesRequestHandler() {
return new GetNodesRequestHandler();
}
}
5.2 响应handler
NodesRequest 是通过Bolt来响应消息。
public class GetNodesRequestHandler extends AbstractServerHandler<GetNodesRequest> {
@Autowired
private Registry metaServerRegistry;
@Override
public Object reply(Channel channel, GetNodesRequest getNodesRequest) {
NodeChangeResult nodeChangeResult;
try {
nodeChangeResult = metaServerRegistry.getAllNodes(getNodesRequest.getNodeType());
}
return nodeChangeResult;
}
}
堆栈为:
getNodeChangeResult:188, MetaStoreService (com.alipay.sofa.registry.server.meta.store)
getAllNodes:96, MetaServerRegistry (com.alipay.sofa.registry.server.meta.registry)
reply:44, GetNodesRequestHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
reply:33, GetNodesRequestHandler (com.alipay.sofa.registry.server.meta.remoting.handler)
handleRequest:54, SyncUserProcessorAdapter (com.alipay.sofa.registry.remoting.bolt)
dispatchToUserProcessor:239, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
doProcess:145, RpcRequestProcessor (com.alipay.remoting.rpc.protocol)
run:366, RpcRequestProcessor$ProcessTask (com.alipay.remoting.rpc.protocol)
runWorker:1149, ThreadPoolExecutor (java.util.concurrent)
run:624, ThreadPoolExecutor$Worker (java.util.concurrent)
run:748, Thread (java.lang)
具体如下图所示:
+-------------------------+
| SyncUserProcessorAdapter|
+-----------+-------------+
|
|
v handleRequest
+----------+-----------+
|GetNodesRequestHandler|
+----------+-----------+
|
|
v getAllNodes
+--------+---------+
|MetaServerRegistry|
+--------+---------+
|
|
v getNodeChangeResult
+-------+--------+
|MetaStoreService|
+----+------+----+
| |
+-----------+ +------------+
| |
getNodeMap v v getNodeMap
+------------------+-----------+ +----------+-------------------+
|dataCenter, metaNodeRepository| ... |dataCenter, metaNodeRepository|
+------------------------------+ +------------------------------+
我们可以再具体深入下。
5.3 Registry操作
Registry 只是简单调用StoreService。
public class MetaServerRegistry implements Registry<Node> {
@Override
public NodeChangeResult getAllNodes(NodeType nodeType) {
StoreService storeService = ServiceFactory.getStoreService(nodeType);
return storeService.getNodeChangeResult();
}
}
5.4 StoreService操作
Service 会遍历数据中心,获取对应的Node列表以及version,最终返回。
public class MetaStoreService implements StoreService<MetaNode> {
@Override
public NodeChangeResult getNodeChangeResult() {
NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.META);
String localDataCenter = nodeConfig.getLocalDataCenter();
Map<String/*dataCenter*/, NodeRepository> metaRepositoryMap = metaRepositoryService.getNodeRepositories();
ConcurrentHashMap<String/*dataCenter*/, Map<String/*ipAddress*/, MetaNode>> pushNodes = new ConcurrentHashMap<>();
Map<String/*dataCenter*/, Long> versionMap = new ConcurrentHashMap<>();
metaRepositoryMap.forEach((dataCenter, metaNodeRepository) -> {
if (localDataCenter.equalsIgnoreCase(dataCenter)) {
nodeChangeResult.setVersion(metaNodeRepository.getVersion());
}
versionMap.put(dataCenter, metaNodeRepository.getVersion());
Map<String, RenewDecorate<MetaNode>> dataMap = metaNodeRepository.getNodeMap();
Map<String, MetaNode> newMap = new ConcurrentHashMap<>();
dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal()));
pushNodes.put(dataCenter, newMap);
});
nodeChangeResult.setLocalDataCenter(localDataCenter);
nodeChangeResult.setNodes(pushNodes);
nodeChangeResult.setDataCenterListVersions(versionMap);
return nodeChangeResult;
}
}
最后结果是:
nodeChangeResult =
nodeType = {Node$NodeType@7190} "DATA"
nodes = {ConcurrentHashMap@8267} size = 1
"DefaultDataCenter" -> {ConcurrentHashMap@8276} size = 0
key = "DefaultDataCenter"
value = {ConcurrentHashMap@8276} size = 0
version = {Long@8268} 1601126414990
dataCenterListVersions = {ConcurrentHashMap@8269} size = 1
"DefaultDataCenter" -> {Long@8268} 1601126414990
key = "DefaultDataCenter"
value = {Long@8268} 1601126414990
localDataCenter = "DefaultDataCenter"
value = {char[17]@8280}
hash = 761435552
5.5 节点变更时的数据同步
MetaServer 会通过网络连接感知到新节点上线或者下线,所有的 DataServer 中运行着一个定时刷新连接的任务 ConnectionRefreshTask,该任务定时去轮询 MetaServer,获取数据节点的信息。需要注意的是,除了 DataServer 主动去 MetaServer 拉取节点信息外,MetaServer 也会主动发送 NodeChangeResult 请求到各个节点,通知节点信息发生变化,推拉获取信息的最终效果是一致的。
本文提到了很多Raft相关信息,下篇文章就具体探究下究竟MetaServer是如何使用 Raft。
0xFF 参考
服务注册中心 MetaServer 功能介绍和实现剖析 | SOFARegistry 解析
服务注册中心如何实现 DataServer 平滑扩缩容 | SOFARegistry 解析
服务注册中心数据一致性方案分析 | SOFARegistry 解析
服务注册中心如何实现秒级服务上下线通知 | SOFARegistry 解析
服务注册中心 Session 存储策略 | SOFARegistry 解析
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
服务注册中心 SOFARegistry 解析 | 服务发现优化之路
海量数据下的注册中心 – SOFARegistry 架构介绍
详解蚂蚁金服 SOFAJRaft | 生产级高性能 Java 实现
怎样打造一个分布式数据库——rocksDB, raft, mvcc,本质上是为了解决跨数据中心的复制
SOFAJRaft 实现原理 – 生产级 Raft 算法库存储模块剖析