[从源码学设计]蚂蚁金服SOFARegistry之网络封装和操作
- 2020 年 11 月 24 日
- 筆記
- 005_源码分析, 008_微服务, 010_业界方案, 210_SOFAStack
[从源码学设计]蚂蚁金服SOFARegistry之网络封装和操作
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第二篇,介绍SOFARegistry的网络封装和操作。
0x01 业务领域
1.1 SOFARegistry 总体架构
因为有的兄弟可能没有读过前面MetaServer的文章,所以这里回忆下SOFARegistry 总体架构。
- Client 层
应用服务器集群。Client 层是应用层,每个应用系统通过依赖注册中心相关的客户端 jar 包,通过编程方式来使用服务注册中心的服务发布和服务订阅能力。
- Session 层
Session 服务器集群。顾名思义,Session 层是会话层,通过长连接和 Client 层的应用服务器保持通讯,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,可以随着 Client 层应用规模的增长而扩容。
- Data 层
数据服务器集群。Data 层通过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的唯一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增长,Data 层如何在不影响业务的前提下实现平滑的扩缩容。
- Meta 层
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
1.2 应用场景
DataServer,SessionServer,MetaServer 本质上都是网络应用程序。这就决定了网络封装和操作是本系统的基础模块及功能,下面我们讲讲其应用场景。
1.2.1 单元化状态
SOFARegistry 的应用场景是单元化状态下。
在单元化状态下,一个单元,是一个五脏俱全的缩小版整站,它是全能的,因为部署了所有应用;但它不是全量的,因为只能操作一部分数据。能够单元化的系统,很容易在多机房中部署,因为可以轻易的把几个单元部署在一个机房,而把另外几个部署在其他机房。借由在业务入口处设置一个流量调配器,可以调整业务流量在单元之间的比例。
1.2.2 内网通讯
所以 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通信。高吞吐、高并发的通信,数量众多的连接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都需要在通信协议和实现框架上做文章。
1.2.3 Http协议
服务器也有若干配置需求,这用简单的http协议即可。
1.3 问题点
在这种内网单元化场景下,能够想到的问题点如下:
- 如何制定私有协议。对于私网环境,如果所有应用的节点间,全部通过标准协议来通信,会有很多问题:比如研发效率方面的影响,升级兼容性,无用字段的传输,功能定制也可能不那么灵活。
- 如何进行链接管理(无锁建连、定时断连、自动重连);
- 如何进行精细的线程模型的设计;
- 如何进行超时控制;
- 如何进行批量解包和批量提交处理;
- 如何进行心跳控制;
- 如何支持通信模型(oneway、sync、callback、future);
- 如何实现长连接;
- 如何实现推拉模式;
- 如何进行节点判活;
1.4 解决方案
对于这种高性能,高并发的场景,在Java体系下,必然选择非阻塞IO复用,那么自然选择基于Netty进行开发。
1.5 阿里方案
阿里就是借助 SOFABolt 通信框架,实现基于TCP长连接的节点判活与推模式的变更推送,服务上下线通知时效性在秒级以内。
sofa-bolt是蚂蚁开源的一款基于Netty的网络通信框架。在Netty的基础上对网络编程常见问题进行了一层简单封装,让中间件开发者更关注于中间件产品本身。
大体功能为:
- 连接管理
- 请求处理
SOFABolt可以理解为Netty的最佳实践,并额外进行了一些优化工作。
- 基于Netty的高效的网络IO于线程模型的应用
- 链接管理(无锁建连、定时断连、自动重连)
- 通信模型(oneway、sync、callback、future)
- 超时控制
- 批量解包和批量提交处理
- 心跳于IDLE机制
SOFABolt框架我们后续可能会专门有系列进行分析,目前认为基于SOFABolt此可以满足我们需求,所以我们会简单介绍SOFABolt,重点在于如何使用以及业务实现。
1.6 实现问题
在确定了采用SOFABolt之后,之前提到的问题点就基本被SOFABolt解决了,所以我们暂时能想到的其他问题大致如下:
- 对于网络中的各种功能模块,是否需要封装,如果封装,到什么程度比较恰当;
- 具体封装,纵向横向分别需要细化到什么程度;
- Data Server 既要对外提供服务,也会作为客户端向其他模块发送请求,这两个功能是否需要抽象出来;
- 是否需要按照具体业务功能进行分类封装;
1.7 总述
我们提前剧透,即从逻辑上看,阿里提供了两个层级的封装
从连接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
- 因为SOFABolt基于Netty,所以封装的核心是netty.channel.Channel。
- 在此基础上, SOFABolt封装了com.alipay.remoting.Connection
- 然后 SOFARegistry 基于SOFABolt 封装了 BoltChannel
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
- SOFABolt构建了 RpcServer,RpcClient。
- SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
- 然后 SOFARegistry 基于此构建了 BoltExchange。作为 Client / Server 连接的抽象,负责节点之间的连接。
- 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把所有 Data Server 相关的 非 “Server,Client概念 强相关” 的网络操作统一集中在这里,用户可以直接使用。以DataServer业务模块为例,其内部按照业务不同,实现了 DataNodeExchanger 和 MetaNodeExchanger。用以让:
- DataServer 内部直接使用 DataNodeExchanger 与其他 DataServer 交互,
- DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。
具体逻辑大致如下:
+---------------------+ +---------------------+
| | | |
| DataNodeExchanger | | MetaNodeExchanger |
| | | |
+----------------+----+ +--------+------------+
| |
+-----------+ +-------------+
| |
v v
+---------+---------------------+--------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
0x02 基础封装
SOFARegistry 对网络基础功能做了封装,也对外提供了API。以下是封装模块以及对外接口 registry-remoting-api。
├── CallbackHandler.java
├── Channel.java
├── ChannelHandler.java
├── Client.java
├── Endpoint.java
├── RemotingException.java
├── Server.java
└── exchange
├── Exchange.java
├── NodeExchanger.java
├── RequestException.java
└── message
├── Request.java
└── Response.java
其中比较关键的是四个接口:Server,Client,Exchange,Channel,因此这些就是网络封装的最基本概念。
2.1 Channel
Channel 这个概念比较普遍,代表了IO源与目标打开的连接。我们先以Java的Channel为例来进行说明。
2.1.1 Java Channel
Java 的Channel 由java.nio.channels包定义的,Channel表示IO源与目标打开的连接,Channel类似于传统的“流”,只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。
Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,通过通道,我们可以以最小的开销来访问操作系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
由java.nio.channels包定义的,Channel表示IO源与目标打开的连接,Channel类似于传统的“流”,只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另一侧的实体(如文件、套接字…),反之亦然;通道是访问IO服务的导管,通过通道,我们可以以最小的开销来访问操作系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
2.1.2 SOFA Channel
从SOFARegistry的Channel定义可以看出其基本功能主要是属性相关功能。
public interface Channel {
InetSocketAddress getRemoteAddress();
InetSocketAddress getLocalAddress();
boolean isConnected();
Object getAttribute(String key);
void setAttribute(String key, Object value);
WebTarget getWebTarget();
void close();
}
2.2 Server
Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。
public interface Server extends Endpoint {
boolean isOpen();
Collection<Channel> getChannels();
Channel getChannel(InetSocketAddress remoteAddress);
Channel getChannel(URL url);
void close(Channel channel);
int getChannelCount();
Object sendSync(final Channel channel, final Object message, final int timeoutMillis);
void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis);
}
2.3 Client
Client是客户端对应的封装,其基本功能也是基于Channel进行交互。
public interface Client extends Endpoint {
Channel getChannel(URL url);
Channel connect(URL url);
Object sendSync(final URL url, final Object message, final int timeoutMillis);
Object sendSync(final Channel channel, final Object message, final int timeoutMillis);
void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler,
final int timeoutMillis);
}
2.4 Exchange
Exchange 作为 Client / Server 连接的进一步抽象,负责同类型server之间的连接。
public interface Exchange<T> {
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
/**
* connect same type server,one server ip one connection
* such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
* so we must connect by serverType,and get Client instance by serverType
* @param serverType
* @param serverUrl
* @param channelHandlers
*/
Client connect(String serverType, URL serverUrl, T... channelHandlers);
/**
* connect same type server,one server ip one connection
* such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
* so we must connect by serverType,and get Client instance by serverType
* @param serverType
* @param connNum connection number per serverUrl
* @param serverUrl
* @param channelHandlers
*/
Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);
/**
* bind server by server port in url parameter,one port must by same server type
* @param url
* @param channelHandlers
*/
Server open(URL url, T... channelHandlers);
Client getClient(String serverType);
Server getServer(Integer port);
}
2.5 ChannelHandler
在建立连接中,可以设置一系列应对不同任务的 handler (称之为 ChannelHandler)。
这些 ChannelHandler 有的作为 Listener 用来处理连接事件,有的作为 Processor 用来处理各种指定的事件,比如服务信息数据变化、Subscriber 注册等事件。
public interface ChannelHandler<T> {
enum HandlerType {
LISENTER,
PROCESSER
}
enum InvokeType {
SYNC,
ASYNC
}
/**
* on channel connected.
* @param channel
*/
void connected(Channel channel) throws RemotingException;
/**
* on channel disconnected.
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* on message received.
* @param channel channel.
* @param message message.
*/
void received(Channel channel, T message) throws RemotingException;
/**
* on message reply.
*
* @param channel
* @param message
*/
Object reply(Channel channel, T message) throws RemotingException;
/**
* on exception caught.
* @param channel channel.
* @param message message.
* @param exception exception.
* @throws RemotingException
*/
void caught(Channel channel, T message, Throwable exception) throws RemotingException;
HandlerType getType();
/**
* return processor request class name
*/
Class interest();
/**
* Select Sync process by reply or Async process by received
*/
default InvokeType getInvokeType() {
return InvokeType.SYNC;
}
/**
* specify executor for processor handler
*/
default Executor getExecutor() {
return null;
}
}
因此,网络基本对外接口如下:
+-------------------------------------------------------------------------+
|[registry+remoting+api] |
| |
| +----------+ +-------------+ |
| | Exchange | |NodeExchanger| |
| ++-----+--++ +----+--------+ |
| | | | | |
| | | +----------------------+ | |
| | | | | |
| +------+ v v v |
| | +--+-----+ +-+-----++ |
| | | Server | | Client | |
| | +-----+--+ +-+----+-+ |
| | | | | |
| | +--------+ +-------+ | |
| | | | | |
| v v v v |
| +----+-----------+ +-----+-+ ++--------------+ |
| | ChannelHandler | |Channel| |CallbackHandler| |
| +----------------+ +-------+ +---------------+ |
+-------------------------------------------------------------------------+
0x03 SOFABolt
因为SOFARegistry主要是基于SOFABolt,没法绕开,所以我们需要首先简单介绍SOFABolt。
Bolt是基于Netty,所以要先说明Netty Channel。
3.1 Netty Channel
在Netty框架中,Channel是其中核心概念之一,是Netty网络通信的主体,由它负责同对端进行网络通信、注册和数据操作等功能。
Netty对Jdk原生的ServerSocketChannel
进行了封装和增强封装成了NioXXXChannel
, 相对于原生的JdkChannel, Netty的Channel增加了如下的组件。
- id 标识唯一身份信息
- 可能存在的parent Channel
- 管道 Pipeline
- 用于数据读写的unsafe内部类
- 关联上相伴终生的NioEventLoop
根据服务端和客户端,Channel可以分成两类:
- 服务端:
NioServerSocketChannel
- 客户端:
NioSocketChannel
其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件可以分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有如下图示:
*
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
3.2 Connection
Connection其删减版定义如下,可以看到其主要成员就是 Netty channel 实例:
public class Connection {
private Channel channel;
private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);
/** Attribute key for connection */
public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection");
/** Attribute key for heartbeat count */
public static final AttributeKey<Integer> HEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount");
/** Attribute key for heartbeat switch for each connection */
public static final AttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");
/** Attribute key for protocol */
public static final AttributeKey<ProtocolCode> PROTOCOL = AttributeKey.valueOf("protocol");
/** Attribute key for version */
public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");
private Url url;
private final ConcurrentHashMap<Integer/* id */, String/* poolKey */> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);
private Set<String> poolKeys = new ConcurrentHashSet<String>();
private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes = new ConcurrentHashMap<String, Object>();
}
Connection的辅助类很多,摘录如下:
- ConnectionFactory 连接工厂:创建连接、检测连接等
- ConnectionPool 连接池:存储 { uniqueKey, List
} ,uniqueKey 默认为 ip:port;包含 ConnectionSelectStrategy,从 pool 中选择 Connection - ConnectionEventHandler 和 ConnectionEventListener:事件处理器和监听器
- ConnectionManager 连接管理器:是对外的门面,包含所有与 Connection 相关的对外的接口操作
- Scanner 扫描器:Bolt 提供的一个统一的扫描器,用于执行一些后台任务
另外,需要注意的点如下:
- 服务端创建 Connection 只有一个时机:netty 连接刚刚建立时
- 客户端真正创建连接的时候,是在发起第一次调用的时候。
3.3 Connection消息处理
不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:
- ConnectionEventListener:Connection 事件监听器,存储处理对应 ConnectionEventType 的 ConnectionEventProcessor 列表;
- ConnectionEventProcessor:真正的 Connection 事件处理器接口;可以继承 ConnectionEventProcessor,编写自定义的事件处理类
- 将自定义的事件处理类添加到 ConnectionEventListener 中;
ConnectionEventHandler处理两类事件
- Netty 定义的事件:例如 connect,channelActive 等;
- SOFABolt 定义的事件:事件类型 ConnectionEventType;
之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
3.4 RpcServer
RpcServer实现了一个Server所必须的基本机制,可以直接使用,比如:
- 编解码,协议版本,地址处理;
- workerGroup(static类变量,实现多个 RpcServer 实例共享 workerGroup)与 bossGroup;
- 请求消息处理器;
- 响应消息处理器;
- 心跳消息处理器;
- 用户处理器 UserProcessor;
- 连接Manager;
- RpcServerRemoting (发起底层调用实现类) 实例,因为SOFABolt 可以进行双向调用,server 端也可以调用 client 端,所以此处构建了 RpcServerRemoting 实例;
- ServerBootstrap 实例用以设置一系列 netty 服务端配置;
其中,需要说明的是:
- 请求链
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
- 响应链
RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
- 心跳链
RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
具体定义如下:
public class RpcServer extends AbstractRemotingServer {
/** server bootstrap */
private ServerBootstrap bootstrap;
/** channelFuture */
private ChannelFuture channelFuture;
/** connection event handler */
private ConnectionEventHandler connectionEventHandler;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** user processors of rpc server */
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(
4);
/** boss event loop group, boss group should not be daemon, need shutdown manually*/
private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false));
/** worker event loop group. Reuse I/O worker threads between rpc servers. */
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));
/** address parser to get custom args */
private RemotingAddressParser addressParser;
/** connection manager */
private DefaultConnectionManager connectionManager;
/** rpc remoting */
protected RpcRemoting rpcRemoting;
/** rpc codec */
private Codec codec = new RpcCodec();
}
3.5 RpcClient
RpcClient主要机制如下:
- 用户处理器 UserProcessor ;RpcServer 可以主动向 RpcClient 发起请求,所以RpcClient 也需要创建 UserProcessor 来处理这些请求;
- RpcConnectionFactory 工厂;
- workerGroup(static类变量,实现多个 RpcClient 实例共享 workerGroup);
- Codec 的实现类 RpcCodec 实例,用于创建 netty 的编解码器,实质上是一个工厂类;
- HeartbeatHandler 心跳处理器;
- RpcHandler 实例作为 netty 的业务逻辑处理器;
- ConnectionSelectStrategy 连接选择器;
- DefaultConnectionManager 连接管理器(是
整个 Connection 设计的核心
); - Bootstrap 实例用以设置一系列 netty 客户端配置;
- Remote 层请求和响应封装实体的创建工厂 RpcCommandFactory 实例;
- RpcClientRemoting (发起底层调用实现类) 实例;这是向 RpcServer 发起调用的工具类;
具体代码如下:
public class RpcClient extends AbstractConfigurableInstance {
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>();
/** connection factory */
private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);
/** connection event handler */
private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());
/** reconnect manager */
private ReconnectManager reconnectManager;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** address parser to get custom args */
private RemotingAddressParser addressParser;
/** connection select strategy */
private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(
switches());
/** connection manager */
private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());
/** rpc remoting */
protected RpcRemoting rpcRemoting;
/** task scanner */
private RpcTaskScanner taskScanner = new RpcTaskScanner();
/** connection monitor */
private DefaultConnectionMonitor connectionMonitor;
/** connection monitor strategy */
private ConnectionMonitorStrategy monitorStrategy;
}
0x04 Bolt封装
针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。以下是SOFABolt的对应封装 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java
├── BoltChannel.java
├── BoltChannelUtil.java
├── BoltClient.java
├── BoltServer.java
├── ConnectionEventAdapter.java
├── SyncUserProcessorAdapter.java
└── exchange
└── BoltExchange.java
4.1 BoltChannel
BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。
感觉Channel封装的不够彻底,还是把Connection暴露出来了,以此得到本地IP,port,远端IP,port等。
public class BoltChannel implements Channel {
private Connection connection;
private AsyncContext asyncContext;
private BizContext bizContext;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
}
4.2 BoltServer
BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把 handler 注册到 RpcServer。
用 ConcurrentHashMap 记录了所有连接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server {
/**
* accoding server port
* can not be null
*/
private final URL url;
private final List<ChannelHandler> channelHandlers;
/**
* bolt server
*/
private RpcServer boltServer;
/**
* started status
*/
private AtomicBoolean isStarted = new AtomicBoolean(false);
private Map<String, Channel> channels = new ConcurrentHashMap<>();
private AtomicBoolean initHandler = new AtomicBoolean(false);
}
其主要功能如下,基本就是调用Bolt的功能:
@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
if (channel != null && channel.isConnected()) {
Url boltUrl = null;
try {
boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
.getRemoteAddress().getPort());
return boltServer.invokeSync(boltUrl, message, timeoutMillis);
}
}
@Override
public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler,
int timeoutMillis) {
if (channel != null && channel.isConnected()) {
Url boltUrl = null;
try {
boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
.getRemoteAddress().getPort());
boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {
@Override
public void onResponse(Object result) {
callbackHandler.onCallback(channel, result);
}
@Override
public void onException(Throwable e) {
callbackHandler.onException(channel, e);
}
@Override
public Executor getExecutor() {
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
}
}
4.3 BoltClient
主要就是封装了 com.alipay.remoting.rpc.RpcClient。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把 handler 注册到 RpcClient。
public class BoltClient implements Client {
private RpcClient rpcClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private int connectTimeout = 2000;
private final int connNum;
}
主要函数如下:
@Override
public Channel connect(URL url) {
try {
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
return channel;
}
}
protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException {
Url boltUrl = createBoltUrl(url);
try {
Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);
if (connection == null || !connection.isFine()) {
if (connection != null) {
connection.close();
}
}
return connection;
}
}
@Override
public Object sendSync(URL url, Object message, int timeoutMillis) {
return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis);
}
@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
if (channel != null && channel.isConnected()) {
BoltChannel boltChannel = (BoltChannel) channel;
return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis);
}
}
@Override
public void sendCallback(URL url, Object message, CallbackHandler callbackHandler,
int timeoutMillis) {
try {
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {
@Override
public void onResponse(Object result) {
callbackHandler.onCallback(channel, result);
}
@Override
public void onException(Throwable e) {
callbackHandler.onException(channel, e);
}
@Override
public Executor getExecutor() {
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
}
}
4.4 BoltExchange
BoltExchange 的主要作用是维护了Client和Server两个ConcurrentHashMap。就是所有的Clients和Servers。
这里进行了第一层连接维护。
Map<String, Client> clients 是依据String对Client做了区分,String包括如下:
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。
ConcurrentHashMap<Integer, Server> serverMap 是依据本身端口对 本身启动的Server 做了区分。
public class BoltExchange implements Exchange<ChannelHandler> {
private Map<String, Client> clients = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();
@Override
public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {
return this.connect(serverType, 1, serverUrl, channelHandlers);
}
@Override
public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) {
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
client.connect(serverUrl);
return client;
}
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
@Override
public Client getClient(String serverType) {
return clients.get(serverType);
}
@Override
public Server getServer(Integer port) {
return serverMap.get(port);
}
/**
* add server into serverMap
* @param server
* @param url
*/
public void setServer(Server server, URL url) {
serverMap.putIfAbsent(url.getPort(), server);
}
private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) {
BoltClient boltClient = createBoltClient(connNum);
boltClient.initHandlers(Arrays.asList(channelHandlers));
return boltClient;
}
protected BoltClient createBoltClient(int connNum) {
return new BoltClient(connNum);
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
}
4.5 BoltExchange 获取Bolt Client
内部会根据不同的server type从 boltExchange 取出对应Bolt Client。
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
用如下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
Bolt用如下办法获取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
得到对应的Client之后,然后分别根据参数的url建立Channel,或者发送请求。
Channel channel = client.getChannel(url);
client.sendCallback(request.getRequestUrl()....;
此时大致逻辑如下:
+----------------------------------------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
0x04 Http封装
以下是基于Jetty封装的Http模块 registry-remoting-http。
├── JerseyChannel.java
├── JerseyClient.java
├── JerseyJettyServer.java
├── exchange
│ └── JerseyExchange.java
└── jetty
└── server
├── HttpChannelOverHttpCustom.java
├── HttpConnectionCustom.java
└── HttpConnectionCustomFactory.java
因为 Http 服务不是SOFTRegistry主要功能,所以此处略去。
0x05 功能模块
我们从目录结构可以大致看出功能模块划分。
├── remoting
│ ├── DataNodeExchanger.java
│ ├── MetaNodeExchanger.java
│ ├── dataserver
│ │ ├── DataServerConnectionFactory.java
│ │ ├── DataServerNodeFactory.java
│ │ ├── GetSyncDataHandler.java
│ │ ├── SyncDataCallback.java
│ │ ├── handler
│ │ │ ├── DataSyncServerConnectionHandler.java
│ │ │ ├── FetchDataHandler.java
│ │ │ ├── NotifyDataSyncHandler.java
│ │ │ ├── NotifyFetchDatumHandler.java
│ │ │ ├── NotifyOnlineHandler.java
│ │ │ └── SyncDataHandler.java
│ │ └── task
│ │ ├── AbstractTask.java
│ │ ├── ConnectionRefreshTask.java
│ │ └── RenewNodeTask.java
│ ├── handler
│ │ ├── AbstractClientHandler.java
│ │ └── AbstractServerHandler.java
│ ├── metaserver
│ │ ├── handler
│ │ ├── provideData
│ │ └── task
│ └── sessionserver
│ ├── disconnect
│ ├── forward
│ └── handler
因为每个大功能模块大同小异,所以我们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。
Data Server 比较复杂,即是服务器也是客户端,所以分别做了不同的组件来抽象这两个概念。
5.1 Server组件
DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。
public void start() {
try {
openDataServer();
openDataSyncServer();
openHttpServer();
startRaftClient();
}
}
各 Handler 具体作用如图所示:
5.2 Bolt Server
DataServer 和 DataSyncServer 是 Bolt Server,是节点间的 bolt 通信组件,其中:
- boltExchange。bolt组件通讯组件,用来给server和dataSyncServer提供通讯服务;
- DataServer。dataServer 则负责数据相关服务,比如数据服务,获取数据的推送,服务上下线通知等;
- dataSyncServer。dataSyncServer 主要是处理一些数据同步相关的服务;
具体代码如下:
private void openDataServer() {
try {
if (serverForSessionStarted.compareAndSet(false, true)) {
server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
dataServerConfig.getPort()), serverHandlers
.toArray(new ChannelHandler[serverHandlers.size()]));
}
}
}
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
这两个server的handlers有部分重复,怀疑开发者在做功能迁移。
@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler());
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler());
return list;
}
@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(clientOffHandler());
list.add(getDataVersionsHandler());
list.add(publishDataProcessor());
list.add(sessionServerRegisterHandler());
list.add(unPublishDataHandler());
list.add(dataServerConnectionHandler());
list.add(renewDatumHandler());
list.add(datumSnapshotHandler());
return list;
}
5.2.1 DataSyncServer
这里用DataSyncServer做具体说明。
启动 DataSyncServer 时,注册了如下几个 handler 用于处理 bolt 请求 :
DayaSyncServer 注册的 Handler 如下:
- getDataHandler
该 Handler 主要用于数据的获取,当一个请求过来时,会通过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。
- publishDataProcessor \ unPublishDataHandler
当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,用于异步地通知事件变更中心数据的变更。事件变更中心收到该事件之后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不同的事件类型异步地对上下线数据进行相应的处理。
与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。
- notifyFetchDatumHandler
这是一个数据拉取请求,当该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操作,保证数据是最新的。
- notifyOnlineHandler
这是一个 DataServer 上线通知请求 Handler,当其他节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,究竟是 INITIAL 还是 WORKING 。
- syncDataHandler
节点间数据同步 Handler,该 Handler 被触发时,会通过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回所有大于当前请求数据版本号的所有数据,便于节点间进行数据同步。
- dataSyncServerConnectionHandler
连接管理 Handler,当其他 DataServer 节点与当前 DataServer 节点连接时,会触发 connect 方法,从而在本地缓存中注册连接信息,而当其他 DataServer 节点与当前节点断连时,则会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有所有与之连接的 DataServer 节点。
5.2.2 调用链
dataSyncServer 调用链如下:
在 DataServerBootstrap 中有
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
然后有
public class BoltExchange implements Exchange<ChannelHandler> {
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
}
BoltServer启动如下,并且用户自定义了UserProcessor。
public class BoltServer implements Server {
public BoltServer(URL url, List<ChannelHandler> channelHandlers) {
this.channelHandlers = channelHandlers;
this.url = url;
}
public void startServer() {
if (isStarted.compareAndSet(false, true)) {
boltServer = new RpcServer(url.getPort(), true);
initHandler();
boltServer.start();
}
}
private void initHandler() {
if (initHandler.compareAndSet(false, true)) {
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
new ConnectionEventAdapter(ConnectionEventType.CONNECT,
getConnectionEventHandler(), this));
boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
this));
boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
getConnectionEventHandler(), this));
registerUserProcessorHandler();
}
}
//这里分了同步和异步
private void registerUserProcessorHandler() {
if (channelHandlers != null) {
for (ChannelHandler channelHandler : channelHandlers) {
if (HandlerType.PROCESSER.equals(channelHandler.getType())) {
if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) {
boltServer.registerUserProcessor(new SyncUserProcessorAdapter(
channelHandler));
} else {
boltServer.registerUserProcessor(new AsyncUserProcessorAdapter(
channelHandler));
}
}
}
}
}
}
5.3 HttpServer
HttpServer 是用于控制的Http 通信组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;
- jerseyExchange。jersey组件通讯组件,提供服务;
- httpServer 主要提供一系列 http 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() {
try {
if (httpServerStarted.compareAndSet(false, true)) {
bindResourceConfig();
httpServer = jerseyExchange.open(
new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
.getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
}
}
}
5.4 RaftClient
RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
具体DefaultMetaServiceImpl中实现代码如下:
@Override
public void startRaftClient() {
try {
if (clientStart.compareAndSet(false, true)) {
String serverConf = getServerConfig();
raftClient = new RaftClient(getGroup(), serverConf);
raftClient.start();
}
}
}
功能模块最后逻辑大致如下:
+-> getDataHandler
|
+-> publishDataProcessor
|
+--> unPublishDataHandler
|
+---------------+ +--> notifyFetchDatumHandler
+----> | DataSyncServer+->+
| +---------------+ +--> notifyOnlineHandler
| |
| +--> syncDataHandler
| +-------------+ |
+-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler
|DataServerBootstrap+-->+ +-------------+
+-------------------+ |
|
| +------------+ +-> getDataHandler
+-----> | RaftClient | |
| +------------+ +--> clientOffHandler
| |
| +-------------+ +--> getDataVersionsHandler
+-----> | DataServer +-->+
+-------------+ +--> publishDataProcessor
|
+--> sessionServerRegisterHandler
|
+--> unPublishDataHandler
|
+--> dataServerConnectionHandler
|
+--> renewDatumHandler
|
+-> datumSnapshotHandler
0x06 连接抽象 Exchange
Exchange 作为 Client / Server 连接的抽象,负责节点之间的连接。
Data Server 这里主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:
-
封装 BoltExchange
-
把 Bolt Client 和 Bolt Channel 进行抽象。
-
提供可以直接使用的网络API,比如ForwardServiceImpl,GetSyncDataHandler这些散落的Bean可以直接使用DataNodeExchanger来做网络交互。
从对外接口中可以看出来,
- connect 函数用来创建连接,返回Channel,这个设置了handler,用来处理服务器推送;
- request 函数用来发起请求;
这里有两个问题:
- 为什么没有 SessionNodeExchanger?
- 同样是网络资源的管理,这里和 ConnectionFactory有什么区别?
可能是因为Session Server可能会很多,没必要保存Bolt client和Server,但是Session对应的有ConnectionFactory。ConectionFactory 是较低层次的封装,下文会讲解。
6.1 DataNodeExchanger
我们以 DataNodeExchanger 为例。
把所有 Data Server 相关的 非 “Server,Client概念 直接强相关” 的网络操作统一集中在这里。
6.1.1 具体实现
可以看到,主要是对 boltExchange 进行了更高层次的封装。
具体代码如下:
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;
public class DataNodeExchanger implements NodeExchanger {
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Resource(name = "dataClientHandlers")
private Collection<AbstractClientHandler> dataClientHandlers;
@Override
public Response request(Request request) {
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (null != request.getCallBackHandler()) {
client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
request.getCallBackHandler(),
request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
return () -> Response.ResultStatus.SUCCESSFUL;
} else {
final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
dataServerConfig.getRpcTimeout());
return () -> result;
}
}
public Channel connect(URL url) {
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (client == null) {
synchronized (this) {
client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (client == null) {
client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url,
dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()]));
}
}
}
Channel channel = client.getChannel(url);
if (channel == null) {
synchronized (this) {
channel = client.getChannel(url);
if (channel == null) {
channel = client.connect(url);
}
}
}
return channel;
}
}
6.1.2 推送Handler相关Bean
上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。
@Bean(name = "dataClientHandlers")
public Collection<AbstractClientHandler> dataClientHandlers() {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(notifyDataSyncHandler());
list.add(fetchDataHandler());
return list;
}
此时具体网络概念如下:
+-------------------+
| ForwardServiceImpl| +-----------------+
+-------------------+ |
|
+--------------------+ | +-------------------+
| GetSyncDataHandler | +-----------------------> | DataNodeExchanger |
+--------------------+ | +-------+-----------+
| |
+----------------------------------+ | |
| LocalDataServerChangeEventHandler| +--+ |
+----------------------------------+ | |
| |
+------------------------------+ | v
| DataServerChangeEventHandler +--------+ +-----+--------+
+------------------------------+ | BoltExchange |
+-----+--------+
|
+-----------------+ |
| JerseyExchange | |
+-------+---------+ +-------+-------+
| | |
| | |
v v v
+--------+----------+ +------+-----+ +-----+------+
| JerseyJettyServer | | BoltServer | | BoltServer |
+-------------------+ +------------+ +------------+
httpServer server dataSyncServer
0x07 Handler
AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler
进行了实现。
这里需要结合SOFABolt来讲解。
7.1 SOFABolt
以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:
- 用户请求处理器 (UserProcessor) :SOFABolt 提供了两种用户请求处理器,SyncUserProcessor 与 AsyncUserProcessor。 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 存根,可以在当前线程,也可以在异步线程,调用
sendResponse
方法返回处理结果; - 连接事件处理器 (ConnectionEventProcessor) :SOFABolt 提供了两种事件监听,建连事件(ConnectionEventType.CONNECT)与断连事件(ConnectionEventType.CLOSE),用户可以创建自己的事件处理器,并注册到客户端或者服务端。客户端与服务端,都可以监听到各自的建连与断连事件;
7.2 定义
Handler主要代码分别如下:
public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {
protected NodeType getConnectNodeType() {
return NodeType.DATA;
}
@Override
public Object reply(Channel channel, T request) {
try {
logRequest(channel, request);
checkParam(request);
return doHandle(channel, request);
} catch (Exception e) {
return buildFailedResponse(e.getMessage());
}
}
}
public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {
@Override
public Object reply(Channel channel, T request) {
try {
logRequest(channel, request);
checkParam(request);
return doHandle(channel, request);
} catch (Exception e) {
return buildFailedResponse(e.getMessage());
}
}
}
系统可以据此实现各种派生类。
这里需要注意的是:ChannelHandler之中分成两类,分别如下,分别对应了RpcServer中的listener和userProcessor。
enum HandlerType {
LISENTER,
PROCESSER
}
以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其余都是Processor。
这也符合常理,因为消息响应函数就是应该只有一个。
@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler());
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler()); 只有这个是Listener,其余都是Processor。
return list;
}
总结如下:
+-> getDataHandler
|
+-> publishDataProcessor
|
+--> unPublishDataHandler
+-------------------+ |
| serverSyncHandlers+-------> notifyFetchDatumHandler
+-------------------+ |
+--> notifyOnlineHandler
|
+--> syncDataHandler
|
+-> dataSyncServerConnectionHandler(Listener)
7.3 使用
在启动时,会使用serverSyncHandlers完成BoltServer的启动。
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
BoltExchange中有:
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
在BoltServer中有如下代码,主要是设置Handler。
public void startServer() {
if (isStarted.compareAndSet(false, true)) {
try {
boltServer = new RpcServer(url.getPort(), true);
initHandler();
boltServer.start();
}
}
}
private void initHandler() {
if (initHandler.compareAndSet(false, true)) {
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
new ConnectionEventAdapter(ConnectionEventType.CONNECT,
getConnectionEventHandler(), this));
boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
this));
boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
getConnectionEventHandler(), this));
registerUserProcessorHandler();
}
}
最终则是调用到RpcServer之中,注册了连接响应函数和用户定义函数。
/**
* Add processor to process connection event.
*/
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
/**
* Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side.
*/
@Override
public void registerUserProcessor(UserProcessor<?> processor) {
UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}
此时与SOFABolt逻辑如下:
+----------------------------+
| [RpcServer] |
| |
| | +--> EXCEPTION +--> DataSyncServerConnectionHandler
| | |
| connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler
| | |
| | +--> CLOSE +-----> DataSyncServerConnectionHandler
| |
| |
| | +--> EXCEPTION +--> DataSyncServerConnectionHandler
| | |
| connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler
| | |
| | +--> CLOSE +-----> DataSyncServerConnectionHandler
| |
| |
| | +----> GetDataRequest +--------> getDataHandler
| | |
| | +----> PublishDataRequest +-----> publishDataProcessor
| | |
| userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler
| | |
| | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler
| | |
| | +----> NotifyOnlineRequest +-----> notifyOnlineHandler
+----------------------------+ |
+----> SyncDataRequest +-------> syncDataHandler
0x08 总结
至此,我们把SOFARegistry网络封装和操作大致梳理了下。
从逻辑上看,阿里提供了两个层级的封装:
从连接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
- 因为SOFABolt基于Netty,所以封装的核心是netty.channel.Channel。
- 在此基础上, SOFABolt封装了com.alipay.remoting.Connection
- 然后 SOFARegistry 基于SOFABolt 封装了 BoltChannel
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
- SOFABolt构建了 RpcServer,RpcClient。
- SOFARegisty 基于 RpcServer,RpcClient 构建了BoltClient 和 BoltServer。
- 然后 SOFARegistry 基于此构建了 BoltExchange。作为 Client / Server 连接的抽象,负责节点之间的连接。
- 最后构建了 XXXNodeExchanger,在 BoltExchange 基础上,把所有 Data Server 内部的 非 “Server,Client概念 强相关” 的网络操作统一集中在这里,用户可以直接使用。以DataServer业务模块为例,其内部按照业务不同,实现了 DataNodeExchanger 和 MetaNodeExchanger。用以让:
- DataServer 内部直接使用 DataNodeExchanger 与其他 DataServer 交互,
- DataServer 内部直接使用 MetaNodeExchanger 与 MetaServer 交互。
具体逻辑大致如下:
+---------------------+ +---------------------+
| | | |
| DataNodeExchanger | | MetaNodeExchanger |
| | | |
+----------------+----+ +--------+------------+
| |
+-----------+ +-------------+
| |
v v
+---------+---------------------+--------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
阿里这里封装的非常细致。因为SOFARegistry是比较繁杂的系统,所以把网络概念,功能做封装是相当有必要的。大家在日常开发中可能不用这么细致的封装,可以参考阿里的思路,自己做选择和裁剪即可。
0xFF 参考
//timyang.net/architecture/cell-distributed-system/
SOFABolt 源码分析12 – Connection 连接管理设计
SOFABolt 源码分析2 – RpcServer 服务端启动的设计
SOFABolt 源码分析3 – RpcClient 客户端启动的设计
SOFABolt 源码分析9 – UserProcessor 自定义处理器的设计
SOFABolt 源码分析13 – Connection 事件处理机制的设计