[從源碼學設計]螞蟻金服SOFARegistry之網絡封裝和操作
- 2020 年 11 月 24 日
- 筆記
- 005_源碼分析, 008_微服務, 010_業界方案, 210_SOFAStack
[從源碼學設計]螞蟻金服SOFARegistry之網絡封裝和操作
0x00 摘要
SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。
本文為第二篇,介紹SOFARegistry的網絡封裝和操作。
0x01 業務領域
1.1 SOFARegistry 總體架構
因為有的兄弟可能沒有讀過前面MetaServer的文章,所以這裡回憶下SOFARegistry 總體架構。
- Client 層
應用服務器集群。Client 層是應用層,每個應用系統通過依賴註冊中心相關的客戶端 jar 包,通過編程方式來使用服務註冊中心的服務發佈和服務訂閱能力。
- Session 層
Session 服務器集群。顧名思義,Session 層是會話層,通過長連接和 Client 層的應用服務器保持通訊,負責接收 Client 的服務發佈和服務訂閱請求。該層只在內存中保存各個服務的發佈訂閱關係,對於具體的服務信息,只在 Client 層和 Data 層之間透傳轉發。Session 層是無狀態的,可以隨着 Client 層應用規模的增長而擴容。
- Data 層
數據服務器集群。Data 層通過分片存儲的方式保存着所用應用的服務註冊數據。數據按照 dataInfoId(每一份服務數據的唯一標識)進行一致性 Hash 分片,多副本備份,保證數據的高可用。下文的重點也在於隨着數據規模的增長,Data 層如何在不影響業務的前提下實現平滑的擴縮容。
- Meta 層
元數據服務器集群。這個集群管轄的範圍是 Session 服務器集群和 Data 服務器集群的服務器信息,其角色就相當於 SOFARegistry 架構內部的服務註冊中心,只不過 SOFARegistry 作為服務註冊中心是服務於廣大應用服務層,而 Meta 集群是服務於 SOFARegistry 內部的 Session 集群和 Data 集群,Meta 層能夠感知到 Session 節點和 Data 節點的變化,並通知集群的其它節點。
1.2 應用場景
DataServer,SessionServer,MetaServer 本質上都是網絡應用程序。這就決定了網絡封裝和操作是本系統的基礎模塊及功能,下面我們講講其應用場景。
1.2.1 單元化狀態
SOFARegistry 的應用場景是單元化狀態下。
在單元化狀態下,一個單元,是一個五臟俱全的縮小版整站,它是全能的,因為部署了所有應用;但它不是全量的,因為只能操作一部分數據。能夠單元化的系統,很容易在多機房中部署,因為可以輕易的把幾個單元部署在一個機房,而把另外幾個部署在其他機房。藉由在業務入口處設置一個流量調配器,可以調整業務流量在單元之間的比例。
1.2.2 內網通訊
所以 SOFARegistry 考慮的就是在 IDC 私網環境中如何進行節點間通信。高吞吐、高並發的通信,數量眾多的連接管理(C10K 問題),便捷的升級機制,兼容性保障,靈活的線程池模型運用,細緻的異常處理與日誌埋點等,這些功能都需要在通信協議和實現框架上做文章。
1.2.3 Http協議
服務器也有若干配置需求,這用簡單的http協議即可。
1.3 問題點
在這種內網單元化場景下,能夠想到的問題點如下:
- 如何制定私有協議。對於私網環境,如果所有應用的節點間,全部通過標準協議來通信,會有很多問題:比如研發效率方面的影響,升級兼容性,無用字段的傳輸,功能定製也可能不那麼靈活。
- 如何進行鏈接管理(無鎖建連、定時斷連、自動重連);
- 如何進行精細的線程模型的設計;
- 如何進行超時控制;
- 如何進行批量解包和批量提交處理;
- 如何進行心跳控制;
- 如何支持通信模型(oneway、sync、callback、future);
- 如何實現長連接;
- 如何實現推拉模式;
- 如何進行節點判活;
1.4 解決方案
對於這種高性能,高並發的場景,在Java體系下,必然選擇非阻塞IO復用,那麼自然選擇基於Netty進行開發。
1.5 阿里方案
阿里就是藉助 SOFABolt 通信框架,實現基於TCP長連接的節點判活與推模式的變更推送,服務上下線通知時效性在秒級以內。
sofa-bolt是螞蟻開源的一款基於Netty的網絡通信框架。在Netty的基礎上對網絡編程常見問題進行了一層簡單封裝,讓中間件開發者更關注於中間件產品本身。
大體功能為:
- 連接管理
- 請求處理
SOFABolt可以理解為Netty的最佳實踐,並額外進行了一些優化工作。
- 基於Netty的高效的網絡IO於線程模型的應用
- 鏈接管理(無鎖建連、定時斷連、自動重連)
- 通信模型(oneway、sync、callback、future)
- 超時控制
- 批量解包和批量提交處理
- 心跳於IDLE機制
SOFABolt框架我們後續可能會專門有系列進行分析,目前認為基於SOFABolt此可以滿足我們需求,所以我們會簡單介紹SOFABolt,重點在於如何使用以及業務實現。
1.6 實現問題
在確定了採用SOFABolt之後,之前提到的問題點就基本被SOFABolt解決了,所以我們暫時能想到的其他問題大致如下:
- 對於網絡中的各種功能模塊,是否需要封裝,如果封裝,到什麼程度比較恰當;
- 具體封裝,縱向橫向分別需要細化到什麼程度;
- Data Server 既要對外提供服務,也會作為客戶端向其他模塊發送請求,這兩個功能是否需要抽象出來;
- 是否需要按照具體業務功能進行分類封裝;
1.7 總述
我們提前劇透,即從邏輯上看,阿里提供了兩個層級的封裝
從連接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:
- 因為SOFABolt基於Netty,所以封裝的核心是netty.channel.Channel。
- 在此基礎上, SOFABolt封裝了com.alipay.remoting.Connection
- 然後 SOFARegistry 基於SOFABolt 封裝了 BoltChannel
從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:
- SOFABolt構建了 RpcServer,RpcClient。
- SOFARegisty 基於 RpcServer,RpcClient 構建了BoltClient 和 BoltServer。
- 然後 SOFARegistry 基於此構建了 BoltExchange。作為 Client / Server 連接的抽象,負責節點之間的連接。
- 最後構建了 XXXNodeExchanger,在 BoltExchange 基礎上,把所有 Data Server 相關的 非 「Server,Client概念 強相關」 的網絡操作統一集中在這裡,用戶可以直接使用。以DataServer業務模塊為例,其內部按照業務不同,實現了 DataNodeExchanger 和 MetaNodeExchanger。用以讓:
- DataServer 內部直接使用 DataNodeExchanger 與其他 DataServer 交互,
- DataServer 內部直接使用 MetaNodeExchanger 與 MetaServer 交互。
具體邏輯大致如下:
+---------------------+ +---------------------+
| | | |
| DataNodeExchanger | | MetaNodeExchanger |
| | | |
+----------------+----+ +--------+------------+
| |
+-----------+ +-------------+
| |
v v
+---------+---------------------+--------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
0x02 基礎封裝
SOFARegistry 對網絡基礎功能做了封裝,也對外提供了API。以下是封裝模塊以及對外接口 registry-remoting-api。
├── CallbackHandler.java
├── Channel.java
├── ChannelHandler.java
├── Client.java
├── Endpoint.java
├── RemotingException.java
├── Server.java
└── exchange
├── Exchange.java
├── NodeExchanger.java
├── RequestException.java
└── message
├── Request.java
└── Response.java
其中比較關鍵的是四個接口:Server,Client,Exchange,Channel,因此這些就是網絡封裝的最基本概念。
2.1 Channel
Channel 這個概念比較普遍,代表了IO源與目標打開的連接。我們先以Java的Channel為例來進行說明。
2.1.1 Java Channel
Java 的Channel 由java.nio.channels包定義的,Channel表示IO源與目標打開的連接,Channel類似於傳統的「流」,只不過Channel本身不能直接訪問數據,Channel只能與Buffer進行交互。
Channel用於在位元組緩衝區和位於通道另一側的實體(通常是一個文件或套接字)之間有效地傳輸數據。通道是訪問IO服務的導管,通過通道,我們可以以最小的開銷來訪問操作系統的I/O服務;順便說下,緩衝區是通道內部發送數據和接收數據的端點。
由java.nio.channels包定義的,Channel表示IO源與目標打開的連接,Channel類似於傳統的「流」,只不過Channel本身不能直接訪問數據,Channel只能與Buffer進行交互。通道主要用於傳輸數據,從緩衝區的一側傳到另一側的實體(如文件、套接字…),反之亦然;通道是訪問IO服務的導管,通過通道,我們可以以最小的開銷來訪問操作系統的I/O服務;順便說下,緩衝區是通道內部發送數據和接收數據的端點。
2.1.2 SOFA Channel
從SOFARegistry的Channel定義可以看出其基本功能主要是屬性相關功能。
public interface Channel {
InetSocketAddress getRemoteAddress();
InetSocketAddress getLocalAddress();
boolean isConnected();
Object getAttribute(String key);
void setAttribute(String key, Object value);
WebTarget getWebTarget();
void close();
}
2.2 Server
Server是服務器對應的封裝,其基本功能由定義可知,主要是基於Channel發送功能。
public interface Server extends Endpoint {
boolean isOpen();
Collection<Channel> getChannels();
Channel getChannel(InetSocketAddress remoteAddress);
Channel getChannel(URL url);
void close(Channel channel);
int getChannelCount();
Object sendSync(final Channel channel, final Object message, final int timeoutMillis);
void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis);
}
2.3 Client
Client是客戶端對應的封裝,其基本功能也是基於Channel進行交互。
public interface Client extends Endpoint {
Channel getChannel(URL url);
Channel connect(URL url);
Object sendSync(final URL url, final Object message, final int timeoutMillis);
Object sendSync(final Channel channel, final Object message, final int timeoutMillis);
void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler,
final int timeoutMillis);
}
2.4 Exchange
Exchange 作為 Client / Server 連接的進一步抽象,負責同類型server之間的連接。
public interface Exchange<T> {
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
/**
* connect same type server,one server ip one connection
* such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
* so we must connect by serverType,and get Client instance by serverType
* @param serverType
* @param serverUrl
* @param channelHandlers
*/
Client connect(String serverType, URL serverUrl, T... channelHandlers);
/**
* connect same type server,one server ip one connection
* such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers,
* so we must connect by serverType,and get Client instance by serverType
* @param serverType
* @param connNum connection number per serverUrl
* @param serverUrl
* @param channelHandlers
*/
Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers);
/**
* bind server by server port in url parameter,one port must by same server type
* @param url
* @param channelHandlers
*/
Server open(URL url, T... channelHandlers);
Client getClient(String serverType);
Server getServer(Integer port);
}
2.5 ChannelHandler
在建立連接中,可以設置一系列應對不同任務的 handler (稱之為 ChannelHandler)。
這些 ChannelHandler 有的作為 Listener 用來處理連接事件,有的作為 Processor 用來處理各種指定的事件,比如服務信息數據變化、Subscriber 註冊等事件。
public interface ChannelHandler<T> {
enum HandlerType {
LISENTER,
PROCESSER
}
enum InvokeType {
SYNC,
ASYNC
}
/**
* on channel connected.
* @param channel
*/
void connected(Channel channel) throws RemotingException;
/**
* on channel disconnected.
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* on message received.
* @param channel channel.
* @param message message.
*/
void received(Channel channel, T message) throws RemotingException;
/**
* on message reply.
*
* @param channel
* @param message
*/
Object reply(Channel channel, T message) throws RemotingException;
/**
* on exception caught.
* @param channel channel.
* @param message message.
* @param exception exception.
* @throws RemotingException
*/
void caught(Channel channel, T message, Throwable exception) throws RemotingException;
HandlerType getType();
/**
* return processor request class name
*/
Class interest();
/**
* Select Sync process by reply or Async process by received
*/
default InvokeType getInvokeType() {
return InvokeType.SYNC;
}
/**
* specify executor for processor handler
*/
default Executor getExecutor() {
return null;
}
}
因此,網絡基本對外接口如下:
+-------------------------------------------------------------------------+
|[registry+remoting+api] |
| |
| +----------+ +-------------+ |
| | Exchange | |NodeExchanger| |
| ++-----+--++ +----+--------+ |
| | | | | |
| | | +----------------------+ | |
| | | | | |
| +------+ v v v |
| | +--+-----+ +-+-----++ |
| | | Server | | Client | |
| | +-----+--+ +-+----+-+ |
| | | | | |
| | +--------+ +-------+ | |
| | | | | |
| v v v v |
| +----+-----------+ +-----+-+ ++--------------+ |
| | ChannelHandler | |Channel| |CallbackHandler| |
| +----------------+ +-------+ +---------------+ |
+-------------------------------------------------------------------------+
0x03 SOFABolt
因為SOFARegistry主要是基於SOFABolt,沒法繞開,所以我們需要首先簡單介紹SOFABolt。
Bolt是基於Netty,所以要先說明Netty Channel。
3.1 Netty Channel
在Netty框架中,Channel是其中核心概念之一,是Netty網絡通信的主體,由它負責同對端進行網絡通信、註冊和數據操作等功能。
Netty對Jdk原生的ServerSocketChannel
進行了封裝和增強封裝成了NioXXXChannel
, 相對於原生的JdkChannel, Netty的Channel增加了如下的組件。
- id 標識唯一身份信息
- 可能存在的parent Channel
- 管道 Pipeline
- 用於數據讀寫的unsafe內部類
- 關聯上相伴終生的NioEventLoop
根據服務端和客戶端,Channel可以分成兩類:
- 服務端:
NioServerSocketChannel
- 客戶端:
NioSocketChannel
其實inbound和outbound分別用於標識 Context 所對應的handler的類型, 在Netty中事件可以分為Inbound和Outbound事件,在ChannelPipeline的類注釋中,有如下圖示:
*
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
3.2 Connection
Connection其刪減版定義如下,可以看到其主要成員就是 Netty channel 實例:
public class Connection {
private Channel channel;
private final ConcurrentHashMap<Integer, InvokeFuture> invokeFutureMap = new ConcurrentHashMap<Integer, InvokeFuture>(4);
/** Attribute key for connection */
public static final AttributeKey<Connection> CONNECTION = AttributeKey.valueOf("connection");
/** Attribute key for heartbeat count */
public static final AttributeKey<Integer> HEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount");
/** Attribute key for heartbeat switch for each connection */
public static final AttributeKey<Boolean> HEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch");
/** Attribute key for protocol */
public static final AttributeKey<ProtocolCode> PROTOCOL = AttributeKey.valueOf("protocol");
/** Attribute key for version */
public static final AttributeKey<Byte> VERSION = AttributeKey.valueOf("version");
private Url url;
private final ConcurrentHashMap<Integer/* id */, String/* poolKey */> id2PoolKey = new ConcurrentHashMap<Integer, String>(256);
private Set<String> poolKeys = new ConcurrentHashSet<String>();
private final ConcurrentHashMap<String/* attr key*/, Object /*attr value*/> attributes = new ConcurrentHashMap<String, Object>();
}
Connection的輔助類很多,摘錄如下:
- ConnectionFactory 連接工廠:創建連接、檢測連接等
- ConnectionPool 連接池:存儲 { uniqueKey, List
} ,uniqueKey 默認為 ip:port;包含 ConnectionSelectStrategy,從 pool 中選擇 Connection - ConnectionEventHandler 和 ConnectionEventListener:事件處理器和監聽器
- ConnectionManager 連接管理器:是對外的門面,包含所有與 Connection 相關的對外的接口操作
- Scanner 掃描器:Bolt 提供的一個統一的掃描器,用於執行一些後台任務
另外,需要注意的點如下:
- 服務端創建 Connection 只有一個時機:netty 連接剛剛建立時
- 客戶端真正創建連接的時候,是在發起第一次調用的時候。
3.3 Connection消息處理
不論是服務端還是客戶端,其實本質都在做一件事情:創建 ConnectionEventHandler 實例並添加到 Netty 的 pipeline 中,基本原理是:
- ConnectionEventListener:Connection 事件監聽器,存儲處理對應 ConnectionEventType 的 ConnectionEventProcessor 列表;
- ConnectionEventProcessor:真正的 Connection 事件處理器接口;可以繼承 ConnectionEventProcessor,編寫自定義的事件處理類
- 將自定義的事件處理類添加到 ConnectionEventListener 中;
ConnectionEventHandler處理兩類事件
- Netty 定義的事件:例如 connect,channelActive 等;
- SOFABolt 定義的事件:事件類型 ConnectionEventType;
之後當有 ConnectionEvent 觸發時(無論是 Netty 定義的事件被觸發,還是 SOFABolt 定義的事件被觸發),ConnectionEventHandler 會通過異步線程執行器通知 ConnectionEventListener,ConnectionEventListener 將消息派發給具體的 ConnectionEventProcessor 實現類。
3.4 RpcServer
RpcServer實現了一個Server所必須的基本機制,可以直接使用,比如:
- 編解碼,協議版本,地址處理;
- workerGroup(static類變量,實現多個 RpcServer 實例共享 workerGroup)與 bossGroup;
- 請求消息處理器;
- 響應消息處理器;
- 心跳消息處理器;
- 用戶處理器 UserProcessor;
- 連接Manager;
- RpcServerRemoting (發起底層調用實現類) 實例,因為SOFABolt 可以進行雙向調用,server 端也可以調用 client 端,所以此處構建了 RpcServerRemoting 實例;
- ServerBootstrap 實例用以設置一系列 netty 服務端配置;
其中,需要說明的是:
- 請求鏈
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
- 響應鏈
RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
- 心跳鏈
RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
具體定義如下:
public class RpcServer extends AbstractRemotingServer {
/** server bootstrap */
private ServerBootstrap bootstrap;
/** channelFuture */
private ChannelFuture channelFuture;
/** connection event handler */
private ConnectionEventHandler connectionEventHandler;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** user processors of rpc server */
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>(
4);
/** boss event loop group, boss group should not be daemon, need shutdown manually*/
private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false));
/** worker event loop group. Reuse I/O worker threads between rpc servers. */
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true));
/** address parser to get custom args */
private RemotingAddressParser addressParser;
/** connection manager */
private DefaultConnectionManager connectionManager;
/** rpc remoting */
protected RpcRemoting rpcRemoting;
/** rpc codec */
private Codec codec = new RpcCodec();
}
3.5 RpcClient
RpcClient主要機制如下:
- 用戶處理器 UserProcessor ;RpcServer 可以主動向 RpcClient 發起請求,所以RpcClient 也需要創建 UserProcessor 來處理這些請求;
- RpcConnectionFactory 工廠;
- workerGroup(static類變量,實現多個 RpcClient 實例共享 workerGroup);
- Codec 的實現類 RpcCodec 實例,用於創建 netty 的編解碼器,實質上是一個工廠類;
- HeartbeatHandler 心跳處理器;
- RpcHandler 實例作為 netty 的業務邏輯處理器;
- ConnectionSelectStrategy 連接選擇器;
- DefaultConnectionManager 連接管理器(是
整個 Connection 設計的核心
); - Bootstrap 實例用以設置一系列 netty 客戶端配置;
- Remote 層請求和響應封裝實體的創建工廠 RpcCommandFactory 實例;
- RpcClientRemoting (發起底層調用實現類) 實例;這是向 RpcServer 發起調用的工具類;
具體代碼如下:
public class RpcClient extends AbstractConfigurableInstance {
private ConcurrentHashMap<String, UserProcessor<?>> userProcessors = new ConcurrentHashMap<String, UserProcessor<?>>();
/** connection factory */
private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this);
/** connection event handler */
private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches());
/** reconnect manager */
private ReconnectManager reconnectManager;
/** connection event listener */
private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
/** address parser to get custom args */
private RemotingAddressParser addressParser;
/** connection select strategy */
private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(
switches());
/** connection manager */
private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches());
/** rpc remoting */
protected RpcRemoting rpcRemoting;
/** task scanner */
private RpcTaskScanner taskScanner = new RpcTaskScanner();
/** connection monitor */
private DefaultConnectionMonitor connectionMonitor;
/** connection monitor strategy */
private ConnectionMonitorStrategy monitorStrategy;
}
0x04 Bolt封裝
針對上述提到的基礎封裝,系統針對Bolt和Http都進行了實現。以下是SOFABolt的對應封裝 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java
├── BoltChannel.java
├── BoltChannelUtil.java
├── BoltClient.java
├── BoltServer.java
├── ConnectionEventAdapter.java
├── SyncUserProcessorAdapter.java
└── exchange
└── BoltExchange.java
4.1 BoltChannel
BoltChannel 主要是封裝了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封裝了io.netty.channel.Channel。
感覺Channel封裝的不夠徹底,還是把Connection暴露出來了,以此得到本地IP,port,遠端IP,port等。
public class BoltChannel implements Channel {
private Connection connection;
private AsyncContext asyncContext;
private BizContext bizContext;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
}
4.2 BoltServer
BoltServer 封裝了 com.alipay.remoting.rpc.RpcServer。
在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor等把 handler 註冊到 RpcServer。
用 ConcurrentHashMap 記錄了所有連接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server {
/**
* accoding server port
* can not be null
*/
private final URL url;
private final List<ChannelHandler> channelHandlers;
/**
* bolt server
*/
private RpcServer boltServer;
/**
* started status
*/
private AtomicBoolean isStarted = new AtomicBoolean(false);
private Map<String, Channel> channels = new ConcurrentHashMap<>();
private AtomicBoolean initHandler = new AtomicBoolean(false);
}
其主要功能如下,基本就是調用Bolt的功能:
@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
if (channel != null && channel.isConnected()) {
Url boltUrl = null;
try {
boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
.getRemoteAddress().getPort());
return boltServer.invokeSync(boltUrl, message, timeoutMillis);
}
}
@Override
public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler,
int timeoutMillis) {
if (channel != null && channel.isConnected()) {
Url boltUrl = null;
try {
boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel
.getRemoteAddress().getPort());
boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() {
@Override
public void onResponse(Object result) {
callbackHandler.onCallback(channel, result);
}
@Override
public void onException(Throwable e) {
callbackHandler.onException(channel, e);
}
@Override
public Executor getExecutor() {
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
}
}
4.3 BoltClient
主要就是封裝了 com.alipay.remoting.rpc.RpcClient。
在初始化的時候,調用 addConnectionEventProcessor,registerUserProcessor 等把 handler 註冊到 RpcClient。
public class BoltClient implements Client {
private RpcClient rpcClient;
private AtomicBoolean closed = new AtomicBoolean(false);
private int connectTimeout = 2000;
private final int connNum;
}
主要函數如下:
@Override
public Channel connect(URL url) {
try {
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
return channel;
}
}
protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException {
Url boltUrl = createBoltUrl(url);
try {
Connection connection = rpcClient.getConnection(boltUrl, connectTimeout);
if (connection == null || !connection.isFine()) {
if (connection != null) {
connection.close();
}
}
return connection;
}
}
@Override
public Object sendSync(URL url, Object message, int timeoutMillis) {
return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis);
}
@Override
public Object sendSync(Channel channel, Object message, int timeoutMillis) {
if (channel != null && channel.isConnected()) {
BoltChannel boltChannel = (BoltChannel) channel;
return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis);
}
}
@Override
public void sendCallback(URL url, Object message, CallbackHandler callbackHandler,
int timeoutMillis) {
try {
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
rpcClient.invokeWithCallback(connection, message, new InvokeCallback() {
@Override
public void onResponse(Object result) {
callbackHandler.onCallback(channel, result);
}
@Override
public void onException(Throwable e) {
callbackHandler.onException(channel, e);
}
@Override
public Executor getExecutor() {
return callbackHandler.getExecutor();
}
}, timeoutMillis);
return;
}
}
4.4 BoltExchange
BoltExchange 的主要作用是維護了Client和Server兩個ConcurrentHashMap。就是所有的Clients和Servers。
這裡進行了第一層連接維護。
Map<String, Client> clients 是依據String對Client做了區分,String包括如下:
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
就是說,假如 Data Server 使用了BoltExchange,則其內部只有兩個BoltClient,這兩個Client分別被 同 dataServer 和 metaServer 的交互 所復用。
ConcurrentHashMap<Integer, Server> serverMap 是依據本身端口對 本身啟動的Server 做了區分。
public class BoltExchange implements Exchange<ChannelHandler> {
private Map<String, Client> clients = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, Server> serverMap = new ConcurrentHashMap<>();
@Override
public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) {
return this.connect(serverType, 1, serverUrl, channelHandlers);
}
@Override
public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) {
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
client.connect(serverUrl);
return client;
}
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
@Override
public Client getClient(String serverType) {
return clients.get(serverType);
}
@Override
public Server getServer(Integer port) {
return serverMap.get(port);
}
/**
* add server into serverMap
* @param server
* @param url
*/
public void setServer(Server server, URL url) {
serverMap.putIfAbsent(url.getPort(), server);
}
private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) {
BoltClient boltClient = createBoltClient(connNum);
boltClient.initHandlers(Arrays.asList(channelHandlers));
return boltClient;
}
protected BoltClient createBoltClient(int connNum) {
return new BoltClient(connNum);
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
}
4.5 BoltExchange 獲取Bolt Client
內部會根據不同的server type從 boltExchange 取出對應Bolt Client。
String DATA_SERVER_TYPE = "dataServer";
String META_SERVER_TYPE = "metaServer";
用如下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
Bolt用如下辦法獲取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
得到對應的Client之後,然後分別根據參數的url建立Channel,或者發送請求。
Channel channel = client.getChannel(url);
client.sendCallback(request.getRequestUrl()....;
此時大致邏輯如下:
+----------------------------------------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
0x04 Http封裝
以下是基於Jetty封裝的Http模塊 registry-remoting-http。
├── JerseyChannel.java
├── JerseyClient.java
├── JerseyJettyServer.java
├── exchange
│ └── JerseyExchange.java
└── jetty
└── server
├── HttpChannelOverHttpCustom.java
├── HttpConnectionCustom.java
└── HttpConnectionCustomFactory.java
因為 Http 服務不是SOFTRegistry主要功能,所以此處略去。
0x05 功能模塊
我們從目錄結構可以大致看出功能模塊劃分。
├── remoting
│ ├── DataNodeExchanger.java
│ ├── MetaNodeExchanger.java
│ ├── dataserver
│ │ ├── DataServerConnectionFactory.java
│ │ ├── DataServerNodeFactory.java
│ │ ├── GetSyncDataHandler.java
│ │ ├── SyncDataCallback.java
│ │ ├── handler
│ │ │ ├── DataSyncServerConnectionHandler.java
│ │ │ ├── FetchDataHandler.java
│ │ │ ├── NotifyDataSyncHandler.java
│ │ │ ├── NotifyFetchDatumHandler.java
│ │ │ ├── NotifyOnlineHandler.java
│ │ │ └── SyncDataHandler.java
│ │ └── task
│ │ ├── AbstractTask.java
│ │ ├── ConnectionRefreshTask.java
│ │ └── RenewNodeTask.java
│ ├── handler
│ │ ├── AbstractClientHandler.java
│ │ └── AbstractServerHandler.java
│ ├── metaserver
│ │ ├── handler
│ │ ├── provideData
│ │ └── task
│ └── sessionserver
│ ├── disconnect
│ ├── forward
│ └── handler
因為每個大功能模塊大同小異,所以我們下面主要以 dataserver 目錄下為主,兼顧 metaserver 和 sessionserver目錄下的特殊部分。
Data Server 比較複雜,即是服務器也是客戶端,所以分別做了不同的組件來抽象這兩個概念。
5.1 Server組件
DataServerBootstrap#start 方法,用於啟動一系列的初始化服務。在此函數中,啟動了若干網絡服務,用來提供 對外接口。
public void start() {
try {
openDataServer();
openDataSyncServer();
openHttpServer();
startRaftClient();
}
}
各 Handler 具體作用如圖所示:
5.2 Bolt Server
DataServer 和 DataSyncServer 是 Bolt Server,是節點間的 bolt 通信組件,其中:
- boltExchange。bolt組件通訊組件,用來給server和dataSyncServer提供通訊服務;
- DataServer。dataServer 則負責數據相關服務,比如數據服務,獲取數據的推送,服務上下線通知等;
- dataSyncServer。dataSyncServer 主要是處理一些數據同步相關的服務;
具體代碼如下:
private void openDataServer() {
try {
if (serverForSessionStarted.compareAndSet(false, true)) {
server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
dataServerConfig.getPort()), serverHandlers
.toArray(new ChannelHandler[serverHandlers.size()]));
}
}
}
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
這兩個server的handlers有部分重複,懷疑開發者在做功能遷移。
@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler());
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler());
return list;
}
@Bean(name = "serverHandlers")
public Collection<AbstractServerHandler> serverHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(clientOffHandler());
list.add(getDataVersionsHandler());
list.add(publishDataProcessor());
list.add(sessionServerRegisterHandler());
list.add(unPublishDataHandler());
list.add(dataServerConnectionHandler());
list.add(renewDatumHandler());
list.add(datumSnapshotHandler());
return list;
}
5.2.1 DataSyncServer
這裡用DataSyncServer做具體說明。
啟動 DataSyncServer 時,註冊了如下幾個 handler 用於處理 bolt 請求 :
DayaSyncServer 註冊的 Handler 如下:
- getDataHandler
該 Handler 主要用於數據的獲取,當一個請求過來時,會通過請求中的 DataCenter 和 DataInfoId 獲取當前 DataServer 節點存儲的相應數據。
- publishDataProcessor \ unPublishDataHandler
當有數據發佈者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler ,Handler 會往 dataChangeEventCenter 中添加一個數據變更事件,用於異步地通知事件變更中心數據的變更。事件變更中心收到該事件之後,會往隊列中加入事件。此時 dataChangeEventCenter 會根據不同的事件類型異步地對上下線數據進行相應的處理。
與此同時,DataChangeHandler 會把這個事件變更信息通過 ChangeNotifier 對外發佈,通知其他節點進行數據同步。
- notifyFetchDatumHandler
這是一個數據拉取請求,當該 Handler 被觸發時,通知當前 DataServer 節點進行版本號對比,若請求中數據的版本號高於當前節點緩存中的版本號,則會進行數據同步操作,保證數據是最新的。
- notifyOnlineHandler
這是一個 DataServer 上線通知請求 Handler,當其他節點上線時,會觸發該 Handler,從而當前節點在緩存中存儲新增的節點信息。用於管理節點狀態,究竟是 INITIAL 還是 WORKING 。
- syncDataHandler
節點間數據同步 Handler,該 Handler 被觸發時,會通過版本號進行比對,若當前 DataServer 所存儲數據版本號含有當前請求版本號,則會返回所有大於當前請求數據版本號的所有數據,便於節點間進行數據同步。
- dataSyncServerConnectionHandler
連接管理 Handler,當其他 DataServer 節點與當前 DataServer 節點連接時,會觸發 connect 方法,從而在本地緩存中註冊連接信息,而當其他 DataServer 節點與當前節點斷連時,則會觸發 disconnect 方法,從而刪除緩存信息,進而保證當前 DataServer 節點存儲有所有與之連接的 DataServer 節點。
5.2.2 調用鏈
dataSyncServer 調用鏈如下:
在 DataServerBootstrap 中有
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
然後有
public class BoltExchange implements Exchange<ChannelHandler> {
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
}
BoltServer啟動如下,並且用戶自定義了UserProcessor。
public class BoltServer implements Server {
public BoltServer(URL url, List<ChannelHandler> channelHandlers) {
this.channelHandlers = channelHandlers;
this.url = url;
}
public void startServer() {
if (isStarted.compareAndSet(false, true)) {
boltServer = new RpcServer(url.getPort(), true);
initHandler();
boltServer.start();
}
}
private void initHandler() {
if (initHandler.compareAndSet(false, true)) {
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
new ConnectionEventAdapter(ConnectionEventType.CONNECT,
getConnectionEventHandler(), this));
boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
this));
boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
getConnectionEventHandler(), this));
registerUserProcessorHandler();
}
}
//這裡分了同步和異步
private void registerUserProcessorHandler() {
if (channelHandlers != null) {
for (ChannelHandler channelHandler : channelHandlers) {
if (HandlerType.PROCESSER.equals(channelHandler.getType())) {
if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) {
boltServer.registerUserProcessor(new SyncUserProcessorAdapter(
channelHandler));
} else {
boltServer.registerUserProcessor(new AsyncUserProcessorAdapter(
channelHandler));
}
}
}
}
}
}
5.3 HttpServer
HttpServer 是用於控制的Http 通信組件以及其配置,提供一系列 REST 接口,用於 dashboard 管理、數據查詢等;
- jerseyExchange。jersey組件通訊組件,提供服務;
- httpServer 主要提供一系列 http 接口,用於 dashboard 管理、數據查詢等;
private void openHttpServer() {
try {
if (httpServerStarted.compareAndSet(false, true)) {
bindResourceConfig();
httpServer = jerseyExchange.open(
new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
.getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
}
}
}
5.4 RaftClient
RaftClient 是基於Raft協議的客戶端,用來基於raft協議獲取meta server leader信息。
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
具體DefaultMetaServiceImpl中實現代碼如下:
@Override
public void startRaftClient() {
try {
if (clientStart.compareAndSet(false, true)) {
String serverConf = getServerConfig();
raftClient = new RaftClient(getGroup(), serverConf);
raftClient.start();
}
}
}
功能模塊最後邏輯大致如下:
+-> getDataHandler
|
+-> publishDataProcessor
|
+--> unPublishDataHandler
|
+---------------+ +--> notifyFetchDatumHandler
+----> | DataSyncServer+->+
| +---------------+ +--> notifyOnlineHandler
| |
| +--> syncDataHandler
| +-------------+ |
+-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler
|DataServerBootstrap+-->+ +-------------+
+-------------------+ |
|
| +------------+ +-> getDataHandler
+-----> | RaftClient | |
| +------------+ +--> clientOffHandler
| |
| +-------------+ +--> getDataVersionsHandler
+-----> | DataServer +-->+
+-------------+ +--> publishDataProcessor
|
+--> sessionServerRegisterHandler
|
+--> unPublishDataHandler
|
+--> dataServerConnectionHandler
|
+--> renewDatumHandler
|
+-> datumSnapshotHandler
0x06 連接抽象 Exchange
Exchange 作為 Client / Server 連接的抽象,負責節點之間的連接。
Data Server 這裡主要是 DataNodeExchanger 和 MetaNodeExchanger,用來:
-
封裝 BoltExchange
-
把 Bolt Client 和 Bolt Channel 進行抽象。
-
提供可以直接使用的網絡API,比如ForwardServiceImpl,GetSyncDataHandler這些散落的Bean可以直接使用DataNodeExchanger來做網絡交互。
從對外接口中可以看出來,
- connect 函數用來創建連接,返回Channel,這個設置了handler,用來處理服務器推送;
- request 函數用來發起請求;
這裡有兩個問題:
- 為什麼沒有 SessionNodeExchanger?
- 同樣是網絡資源的管理,這裡和 ConnectionFactory有什麼區別?
可能是因為Session Server可能會很多,沒必要保存Bolt client和Server,但是Session對應的有ConnectionFactory。ConectionFactory 是較低層次的封裝,下文會講解。
6.1 DataNodeExchanger
我們以 DataNodeExchanger 為例。
把所有 Data Server 相關的 非 「Server,Client概念 直接強相關」 的網絡操作統一集中在這裡。
6.1.1 具體實現
可以看到,主要是對 boltExchange 進行了更高層次的封裝。
具體代碼如下:
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Client;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.remoting.exchange.NodeExchanger;
import com.alipay.sofa.registry.remoting.exchange.message.Request;
import com.alipay.sofa.registry.remoting.exchange.message.Response;
public class DataNodeExchanger implements NodeExchanger {
@Autowired
private Exchange boltExchange;
@Autowired
private DataServerConfig dataServerConfig;
@Resource(name = "dataClientHandlers")
private Collection<AbstractClientHandler> dataClientHandlers;
@Override
public Response request(Request request) {
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (null != request.getCallBackHandler()) {
client.sendCallback(request.getRequestUrl(), request.getRequestBody(),
request.getCallBackHandler(),
request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
return () -> Response.ResultStatus.SUCCESSFUL;
} else {
final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
dataServerConfig.getRpcTimeout());
return () -> result;
}
}
public Channel connect(URL url) {
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (client == null) {
synchronized (this) {
client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE);
if (client == null) {
client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url,
dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()]));
}
}
}
Channel channel = client.getChannel(url);
if (channel == null) {
synchronized (this) {
channel = client.getChannel(url);
if (channel == null) {
channel = client.connect(url);
}
}
}
return channel;
}
}
6.1.2 推送Handler相關Bean
上面代碼中使用了 dataClientHandlers,其是BoltClient所使用的,Server會對此進行推送,這兩個Handler會處理。
@Bean(name = "dataClientHandlers")
public Collection<AbstractClientHandler> dataClientHandlers() {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(notifyDataSyncHandler());
list.add(fetchDataHandler());
return list;
}
此時具體網絡概念如下:
+-------------------+
| ForwardServiceImpl| +-----------------+
+-------------------+ |
|
+--------------------+ | +-------------------+
| GetSyncDataHandler | +-----------------------> | DataNodeExchanger |
+--------------------+ | +-------+-----------+
| |
+----------------------------------+ | |
| LocalDataServerChangeEventHandler| +--+ |
+----------------------------------+ | |
| |
+------------------------------+ | v
| DataServerChangeEventHandler +--------+ +-----+--------+
+------------------------------+ | BoltExchange |
+-----+--------+
|
+-----------------+ |
| JerseyExchange | |
+-------+---------+ +-------+-------+
| | |
| | |
v v v
+--------+----------+ +------+-----+ +-----+------+
| JerseyJettyServer | | BoltServer | | BoltServer |
+-------------------+ +------------+ +------------+
httpServer server dataSyncServer
0x07 Handler
AbstractServerHandler 和 AbstractClientHandler 對 com.alipay.sofa.registry.remoting.ChannelHandler
進行了實現。
這裡需要結合SOFABolt來講解。
7.1 SOFABolt
以 RpcServer 為例,SOFABolt在這裡的使用是兩種處理器:
- 用戶請求處理器 (UserProcessor) :SOFABolt 提供了兩種用戶請求處理器,SyncUserProcessor 與 AsyncUserProcessor。 二者的區別在於,前者需要在當前處理線程以return返回值的形式返回處理結果;而後者,有一個 AsyncContext 存根,可以在當前線程,也可以在異步線程,調用
sendResponse
方法返回處理結果; - 連接事件處理器 (ConnectionEventProcessor) :SOFABolt 提供了兩種事件監聽,建連事件(ConnectionEventType.CONNECT)與斷連事件(ConnectionEventType.CLOSE),用戶可以創建自己的事件處理器,並註冊到客戶端或者服務端。客戶端與服務端,都可以監聽到各自的建連與斷連事件;
7.2 定義
Handler主要代碼分別如下:
public abstract class AbstractServerHandler<T> implements ChannelHandler<T> {
protected NodeType getConnectNodeType() {
return NodeType.DATA;
}
@Override
public Object reply(Channel channel, T request) {
try {
logRequest(channel, request);
checkParam(request);
return doHandle(channel, request);
} catch (Exception e) {
return buildFailedResponse(e.getMessage());
}
}
}
public abstract class AbstractClientHandler<T> implements ChannelHandler<T> {
@Override
public Object reply(Channel channel, T request) {
try {
logRequest(channel, request);
checkParam(request);
return doHandle(channel, request);
} catch (Exception e) {
return buildFailedResponse(e.getMessage());
}
}
}
系統可以據此實現各種派生類。
這裡需要注意的是:ChannelHandler之中分成兩類,分別如下,分別對應了RpcServer中的listener和userProcessor。
enum HandlerType {
LISENTER,
PROCESSER
}
以serverSyncHandlers為例,只有dataSyncServerConnectionHandler是Listener,其餘都是Processor。
這也符合常理,因為消息響應函數就是應該只有一個。
@Bean(name = "serverSyncHandlers")
public Collection<AbstractServerHandler> serverSyncHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(getDataHandler());
list.add(publishDataProcessor());
list.add(unPublishDataHandler());
list.add(notifyFetchDatumHandler());
list.add(notifyOnlineHandler());
list.add(syncDataHandler());
list.add(dataSyncServerConnectionHandler()); 只有這個是Listener,其餘都是Processor。
return list;
}
總結如下:
+-> getDataHandler
|
+-> publishDataProcessor
|
+--> unPublishDataHandler
+-------------------+ |
| serverSyncHandlers+-------> notifyFetchDatumHandler
+-------------------+ |
+--> notifyOnlineHandler
|
+--> syncDataHandler
|
+-> dataSyncServerConnectionHandler(Listener)
7.3 使用
在啟動時,會使用serverSyncHandlers完成BoltServer的啟動。
private void openDataSyncServer() {
try {
if (serverForDataSyncStarted.compareAndSet(false, true)) {
dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
.getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
.toArray(new ChannelHandler[serverSyncHandlers.size()]));
}
}
}
BoltExchange中有:
@Override
public Server open(URL url, ChannelHandler... channelHandlers) {
BoltServer server = createBoltServer(url, channelHandlers);
setServer(server, url);
server.startServer();
return server;
}
protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) {
return new BoltServer(url, Arrays.asList(channelHandlers));
}
在BoltServer中有如下代碼,主要是設置Handler。
public void startServer() {
if (isStarted.compareAndSet(false, true)) {
try {
boltServer = new RpcServer(url.getPort(), true);
initHandler();
boltServer.start();
}
}
}
private void initHandler() {
if (initHandler.compareAndSet(false, true)) {
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT,
new ConnectionEventAdapter(ConnectionEventType.CONNECT,
getConnectionEventHandler(), this));
boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(),
this));
boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION,
new ConnectionEventAdapter(ConnectionEventType.EXCEPTION,
getConnectionEventHandler(), this));
registerUserProcessorHandler();
}
}
最終則是調用到RpcServer之中,註冊了連接響應函數和用戶定義函數。
/**
* Add processor to process connection event.
*/
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
/**
* Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side.
*/
@Override
public void registerUserProcessor(UserProcessor<?> processor) {
UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}
此時與SOFABolt邏輯如下:
+----------------------------+
| [RpcServer] |
| |
| | +--> EXCEPTION +--> DataSyncServerConnectionHandler
| | |
| connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler
| | |
| | +--> CLOSE +-----> DataSyncServerConnectionHandler
| |
| |
| | +--> EXCEPTION +--> DataSyncServerConnectionHandler
| | |
| connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler
| | |
| | +--> CLOSE +-----> DataSyncServerConnectionHandler
| |
| |
| | +----> GetDataRequest +--------> getDataHandler
| | |
| | +----> PublishDataRequest +-----> publishDataProcessor
| | |
| userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler
| | |
| | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler
| | |
| | +----> NotifyOnlineRequest +-----> notifyOnlineHandler
+----------------------------+ |
+----> SyncDataRequest +-------> syncDataHandler
0x08 總結
至此,我們把SOFARegistry網絡封裝和操作大致梳理了下。
從邏輯上看,阿里提供了兩個層級的封裝:
從連接角度看,阿里實現了基於 netty.channel.Channel 的封裝,從下往上看是:
- 因為SOFABolt基於Netty,所以封裝的核心是netty.channel.Channel。
- 在此基礎上, SOFABolt封裝了com.alipay.remoting.Connection
- 然後 SOFARegistry 基於SOFABolt 封裝了 BoltChannel
從應用角度看,阿里實現了Server,Client層次的封裝,從下往上看是:
- SOFABolt構建了 RpcServer,RpcClient。
- SOFARegisty 基於 RpcServer,RpcClient 構建了BoltClient 和 BoltServer。
- 然後 SOFARegistry 基於此構建了 BoltExchange。作為 Client / Server 連接的抽象,負責節點之間的連接。
- 最後構建了 XXXNodeExchanger,在 BoltExchange 基礎上,把所有 Data Server 內部的 非 「Server,Client概念 強相關」 的網絡操作統一集中在這裡,用戶可以直接使用。以DataServer業務模塊為例,其內部按照業務不同,實現了 DataNodeExchanger 和 MetaNodeExchanger。用以讓:
- DataServer 內部直接使用 DataNodeExchanger 與其他 DataServer 交互,
- DataServer 內部直接使用 MetaNodeExchanger 與 MetaServer 交互。
具體邏輯大致如下:
+---------------------+ +---------------------+
| | | |
| DataNodeExchanger | | MetaNodeExchanger |
| | | |
+----------------+----+ +--------+------------+
| |
+-----------+ +-------------+
| |
v v
+---------+---------------------+--------+
| BoltExchange |
| +------------------------------------+ |
| | | |
| | Map<String, Client> | |
| | | |
| | ConcurrentHashMap<Integer, Server> | |
| | | |
| +------------------------------------+ |
+----------------------------------------+
| |
| |
| |
v v
+-----------------------+----------+ +---------+---------------------------------+
| BoltClient | | BoltServer |
| | | |
| +------------------------------+ | | +---------------------------------------+ |
| | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | |
| | +-------------------------+ | | | | | |
| | | ConnectionEventHandler | | | | | Map<String, Channel> channels | |
| | | | | | | | | |
| | | ConnectionFactory | | | | | List<ChannelHandler> channelHandlers | |
| | +-------------------------+ | | | | | |
| +------------------------------+ | | +---------------------------------------+ |
+----------------------------------+ +-------------------------------------------+
| | | | |
| | +---------------------------+ |
| v | | v
| +---+------------+ <--------------+ +-----v--------------+--------------+
| | ChannelHandler | | BoltChannel |
| +----------------+ | |
| | +------------------------------+ |
| | |com.alipay.remoting.Connection| |
| | +------------------------------+ |
| +-----------------------------------+
| |
v v
+---+-------------+ +---------+------------+
| CallbackHandler | | netty.channel.Channel|
+-----------------+ +----------------------+
阿里這裡封裝的非常細緻。因為SOFARegistry是比較繁雜的系統,所以把網絡概念,功能做封裝是相當有必要的。大家在日常開發中可能不用這麼細緻的封裝,可以參考阿里的思路,自己做選擇和裁剪即可。
0xFF 參考
//timyang.net/architecture/cell-distributed-system/
SOFABolt 源碼分析12 – Connection 連接管理設計
SOFABolt 源碼分析2 – RpcServer 服務端啟動的設計
SOFABolt 源碼分析3 – RpcClient 客戶端啟動的設計
SOFABolt 源碼分析9 – UserProcessor 自定義處理器的設計
SOFABolt 源碼分析13 – Connection 事件處理機制的設計