BFT-SMaRt:用Netty做客戶端的可靠信道
- 2020 年 2 月 14 日
- 筆記
關鍵字:Netty BFT-SMaRt Channel findCache KeyLoader Bootstrap NioEventLoopGroup ChannelFuture 視圖
Netty是目前最高效便捷的NIO框架。Netty可提供更加高可用、更好健壯性的穩定大規模連接的IO通道。任何一款區塊鏈早期的技術產品,都是從聯盟鏈開始演進,因為聯盟鏈降低了很多原教旨的難度。回到BFT-SMaRt,它的網絡連接分為節點之間的連接,節點與客戶端之間的連接。節點之間的連接,我們在BFT-SMaRt:用Java做節點間的可靠信道一文中詳細分析了在共識邏輯之前節點之間能夠做到的連接準備。那麼,本文將繼續探索在BFT-SMaRt項目中,節點與客戶端之間的連接是如何實現的。
作為源碼研究的起點,有兩個現成的入口:
- 服務端:ServerCommunicationSystem構造函數的最後一個步驟,即clientsConn的創建。
- 客戶端:CounterClient類的入口命令,將本地作為客戶端對節點發起訪問請求。
一、Netty服務端的構建
首先構建服務端,轉到ServerCommunicationSystem構造函數的最後一行。
clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller);
這裡採用了工廠模式的設計:構建一個controller基類,業務方可有多個實現類,在工廠get方法中傳入實現類對象,通過不同的實現類,返回不同的處理對象。BFT-SMaRt並未有多個實現類,這裡可以在上層業務方進行豐富。
public class CommunicationSystemServerSideFactory { public static CommunicationSystemServerSide getCommunicationSystemServerSide(ServerViewController controller) { return new NettyClientServerCommunicationSystemServerSide(controller); } // 直接返回NettyClientServerCommunicationSystemServerSide對象 }
直接返回NettyClientServerCommunicationSystemServerSide對象,以下稱NettyClientServerCommunicationSystemServerSide類為Netty服務端類。
1. 父類構造函數
直接進入NettyClientServerCommunicationSystemServerSide類的構造函數,函數體內無super指定父類構造函數,因此隱式調用父類SimpleChannelInboundHandler的無參構造函數。
對於不熟悉繼承關係下構造函數的執行順序的朋友,請自行補充上。
protected SimpleChannelInboundHandler() { this(true); }
父類的無參構造函數指定了本地的有參構造。設定了本地屬性autoRelease為true。
protected SimpleChannelInboundHandler(boolean autoRelease) { this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I"); this.autoRelease = autoRelease; }
接下來執行TypeParameterMatcher的find方法。find方法主要維護一個查找緩存,包括構建和使用。
① 查找緩存
該方法首先獲得並配置查找緩存findCache:
Map<Class<?>, Map<String, TypeParameterMatcher>> findCache = InternalThreadLocalMap.get().typeParameterMatcherFindCache(); // InternalThreadLocalMap容器 Class<?> thisClass = object.getClass(); Map<String, TypeParameterMatcher> map = (Map)findCache.get(thisClass); // 類型參數匹配器 if (map == null) { map = new HashMap(); findCache.put(thisClass, map); }
查找緩存會將熱度較高的內容優先緩存,以增進查詢速度。
查找緩存的容器結構是通過InternalThreadLocalMap來構建,注意從SimpleChannelInboundHandler開始,始終帶着泛型<I>進入,而本例中的泛型類為TOMMessage,該類是共識排序消息類,將會在BFT-SMaRt共識部分展開介紹。那麼,find方法會將泛型類放置到查找緩存findCache中。
a) 匹配器
接下來,獲得並配置類型參數匹配器,也是用於增強查找。
TypeParameterMatcher matcher = (TypeParameterMatcher)((Map)map).get(typeParamName); if (matcher == null) { matcher = get(find0(object, parametrizedSuperclass, typeParamName)); ((Map)map).put(typeParamName, matcher); } return matcher;
匹配器使用到Java的反射機制來查找類。
首先通過本地map查找類型參數匹配器,如果沒有查到,則初始構建。使用調用find時傳入的類型參數名,調用find0方法通過反射機製得到泛型類,然後調用get方法通過反射機制獲得對應匹配器,最後填充進匹配器map,共同構成查找緩存findCache的內容。最後回顧一下findCache容器的結構。
Map<Class<?>, Map<String, TypeParameterMatcher>>
因此,一個類可以有多個對應不同類型參數名的匹配器。
② 相關日誌
該緩存的容器結構是InternalThreadLocalMap,類加載進入內存,首先執行static靜態方法。
static { logger.debug("-Dio.netty.threadLocalMap.stringBuilder.initialSize: {}", STRING_BUILDER_INITIAL_SIZE); STRING_BUILDER_MAX_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.maxSize", 4096); logger.debug("-Dio.netty.threadLocalMap.stringBuilder.maxSize: {}", STRING_BUILDER_MAX_SIZE); }
打印出日誌,StringBuilder的初始化長度以及最大長度。日誌輸出如下:
11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 11:18:32.645 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2. 服務端構造
回到NettyClientServerCommunicationSystemServerSide的構造函數,首先是配置讀取及分析。通過配置文件獲得私鑰、IP、端口號、節點總數、節點id等信息。
① 配置讀取
配置讀取可分為三方面:
- 私鑰讀取
- IP加端口號讀取處理
- 配置域信息讀取
a) 私鑰讀取
Netty服務端類有一個私有屬性字段privKey,用於存儲私鑰,以備後續簽名使用。
private PrivateKey privKey;
該字段通過服務端類的構造函數賦值。
privKey = controller.getStaticConf().getPrivateKey();
跳轉到Configuration類,調用getPrivateKey方法。私鑰內容是從配置域controller中獲取。
return keyLoader.loadPrivateKey();
keyLoader對象是在Configuration類構造時傳入。而Configuration類的構造要追蹤到其子類TOMConfiguration的構造函數,繼續TOMConfiguration是在ViewController構造時調用。這部分內容將在CounterClient入口時展開。回到keyLoader,它是KeyLoader的實例,而KeyLoader有三個子類。
- RSAKeyLoader,適用於RSA類非對稱加密算法簇的秘鑰加載。
- ECDSAKeyLoader,適用於ECDSA類非對稱加密算法簇的秘鑰加載,全稱橢圓曲線數字簽名算法。是ECC與DSA的結合。Java原生類庫中在jdk1.7以後已經加入支持。
- SunECKeyLoader,適用於jdk自帶的sunEC加密秘鑰的加載,位於sun.security.ec.SunEC。
下面是他們的類圖關係。
b) IP端口號
接下來是從配置域中讀取節點服務器端的IP端口號。
// 獲取IP、端口號 String myAddress; String confAddress = controller.getStaticConf().getRemoteAddress(controller.getStaticConf().getProcessId()) .getAddress().getHostAddress(); if (InetAddress.getLoopbackAddress().getHostAddress().equals(confAddress)) { myAddress = InetAddress.getLoopbackAddress().getHostAddress(); } else if (controller.getStaticConf().getBindAddress().equals("")) { myAddress = InetAddress.getLocalHost().getHostAddress(); // 如果Netty綁定到環回地址,客戶端將無法連接節點。為了解決這個問題,我們綁定到config/hosts.config中提供的地址。 if (InetAddress.getLoopbackAddress().getHostAddress().equals(myAddress) && !myAddress.equals(confAddress)) { myAddress = confAddress; } } else { myAddress = controller.getStaticConf().getBindAddress(); } int myPort = controller.getStaticConf().getPort(controller.getStaticConf().getProcessId());
這段讀取代碼與上一篇節點間通信如出一轍。但值得注意的是,配置域端口號是由兩項組成,我們再次查看配置域內容。
#server id, address and port (the ids from 0 to n-1 are the service replicas) 0 127.0.0.1 11000 11001 1 127.0.0.1 11010 11011 2 127.0.0.1 11020 11021 3 127.0.0.1 11030 11031
IP後面有兩個端口號,第一列為客戶端通信端口,第二列為節點間通信端口。就拿節點id為0的第一行舉例,本地作為節點服務,其他節點要通過(server <-> server)11001端口進行訪問,而其他客戶端需要通過(client <-> server)11000端口進行訪問。這一段在下面日誌輸出代碼中也有體現。
logger.info("Port (client <-> server) = " + controller.getStaticConf().getPort(controller.getStaticConf().getProcessId())); logger.info("Port (server <-> server) = "
日誌打印:
14:36:19.223 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (client <-> server) = 11000 14:38:02.617 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Port (server <-> server) = 11001
c) 配置域信息
最後就是配置中的其他信息了。首先看代碼,然後看日誌輸出。
logger.info("ID = " + controller.getStaticConf().getProcessId()); // 節點id logger.info("N = " + controller.getCurrentViewN()); // 節點總數 logger.info("F = " + controller.getCurrentViewF()); // 節點最大容錯數 logger.info("requestTimeout = " + controller.getStaticConf().getRequestTimeout()); logger.info("maxBatch = " + controller.getStaticConf().getMaxBatchSize()); // 根據配置中是否使用簽名,打印不同的提示信息 if(controller.getStaticConf().getUseSignatures() == 1) logger.info("Using Signatures"); else if (controller.getStaticConf().getUseSignatures() == 2) logger.info("Using benchmark signature verification"); logger.info("Binded replica to IP address " + myAddress); // SSL/TLS 協議版本 logger.info("SSL/TLS enabled, protocol version: {}", controller.getStaticConf().getSSLTLSProtocolVersion());
系統配置中關於是否使用簽名的配置項,用於定義客戶端是否應該對消息認證碼使用簽名。
system.communication.useSignatures
接下來相關日誌輸出內容。
14:36:16.545 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - ID = 0 14:36:16.989 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - N = 4 14:36:17.230 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - F = 1 14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - requestTimeout = 2000 14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - maxBatch = 1024 14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - Binded replica to IP address 127.0.0.1 14:43:47.637 [main] INFO bftsmart.communication.client.netty.NettyClientServerCommunicationSystemServerSide - SSL/TLS enabled, protocol version: TLSv1.2
② 服務端配置
講到這裡,想延伸討論一下Netty的必要性和實現原理。Netty的必要性可以從Socket(Socket之前的網絡傳輸技術發展就不贅述了)說起。
a) Netty的必要性
一個Socket通常是由一個線程來管理,它實現了最初安全可靠的點到點IO通信。但當客戶端較多時,可能會耗盡服務端的線程資源,這是一種阻塞IO的模型。實踐過程中,大量的由線程維護的網絡連接始終在監聽狀態而沒有數據傳輸,這是對於線程資源的浪費。
我們在上一篇關於節點間通信的研究中,使用的就是這種模型,然而那是基於節點數量不多的聯盟鏈場景。4節點的網絡只需要6條線程即可承擔,不用實現複雜邏輯來處理大流量的資源維護,簡單穩定的阻塞IO模型顯然是更加適用的。但是,本文的研究重點轉向了客戶端通信,這就需要一個能夠處理大流量的新模型。
設想一種任務加線程池的方式。線程不再死盯着一條兩方參與的連接,而是被線程池統一管理起來。網絡傳輸的工作會被放到任務中去,線程池通過調度機制領取任務並執行網絡傳輸工作。在任務多的時候,調度邏輯會將任務排到一個隊列中去,然後根據調度機制,啟動對應規模的線程數量來控制處理任務的速度。每條線程執行完任務就會自動回歸到線程池可用資源庫,等待執行新的通信任務。這就是一種非阻塞IO的模型。
我們繼續延伸,所謂的「調度邏輯」是如何接收任務的,這裡引用到linux的多路復用IO模型,即一個select可以通過順序掃描(輪詢)的方式監測多個通道是否有通信就緒的狀態,一旦有,就會啟動一個回調函數將該通信內容封裝到任務容器,並排到隊列中去。回到函數會啟動線程池資源的實例來處理IO的工作,而select將該通信實例交出去以後,就可以釋放資源繼續監聽。
Netty就是對以上內容的封裝框架,更易於使用。
b) Netty的實現原理
Netty是基於事件驅動模式的、Reactor線程模型的。事件驅動是相對於主動輪詢提出的,主動輪詢是說主線程在不斷檢查是否有事件發生,如果有則調用事件處理函數。事件驅動仍舊是主線程去檢查事件的發生,當有事件時將事件放到一個隊列,同時還有一條線程在不斷的消費這個隊列,消費時調用事件的處理函數。事件驅動方式基於主動輪詢,又提出了一個線程專門作為事件消費對象,分擔了原主線程的工作內容,這取自觀察者模式,也更加符合單一職責原則。Reactor線程模型是一種分發機制,首先它會不斷的執行selector.select()方法,用來檢測併產生新的事件,然後分發事件給到適當的線程來處理。Reactor模型良好地實現了事件驅動理念。Netty應用Reactor線程模型,分為主從關係,主線程用於發現事件,從線程用於消費事件。
- bossGroup,線程池在bind一個端口以後返回一條線程作為主線程,接收產生新事件。
- workerGroup,線程池用來消費事件。
Netty有一個Bootstrap概念,BoostStrap是引導程序的含義,通過引導程序,可以快速配置,串聯搭建起來一個Netty項目,其中ServerBootstrap是針對服務端的,Bootstrap是針對客戶端的。所以,包括但不限於以上兩個線程池的內容,全部被包含在Bootstrap的實例中。Netty同時也是一個異步框架,所有的操作包括綁定、IO通信等都會返回一個ChannelFuture對象,該對象可以判斷isDone,isSuccess,getCause,isCanceled,以及通過addListener加入監聽回調。Channel是具體的Netty中用於處理通信的組件,針對不同的通信環境,都會有不同的Channel子類來處理,處理內容包括維持通道、連接配置參數、異步IO處理ChannelHandler、返回ChannelFuture。Selector是Channel的管理器,輪詢器,可以管理Channel。另外,所有Group均為線程池的意思,而NioEventLoop的含義是一個維護隊列的線程。
c) 結合源碼
接下來回到Netty服務端的源碼配置。
ServerBootstrap b = new ServerBootstrap(); // 綜上所述,先構建一個服務端啟動程序。 EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreads); // 構建主線程池,初始容量為8條,用於監聽Accept事件。 EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); // 構建從線程池,初始容量為當前系統中所有的可用線程數量。
我們按流程構建了ServerBootstrap對象,然後構建了主線程池bossGroup,初始設定為8,最後構建了從線程池workerGroup,初始設定為系統當前所有可用線程數量。這也不難理解,因為實踐過程中,我們總會注意到事件的發現相較於事件的處理是更快速的。因此8條主線程可以覆蓋事件發現的工作,而為了更高效使用機器性能,剩餘的線程資源都用來事件的消費了。下面是BFT-SMaRt自定義的編解碼工具工廠。
sessionReplicaToClient = new ConcurrentHashMap<>(); // 並發主流容器,線程安全且高效的HashMap rl = new ReentrantReadWriteLock(); // 可重入讀寫鎖 serverPipelineFactory = new NettyServerPipelineFactory(this, sessionReplicaToClient, controller, rl); // 本地開發的工具工廠,用於編解碼處理。
接下來將以上準備好的資源設定配置到ServerBootstrap。
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_SNDBUF, tcpSendBufferSize) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec) .option(ChannelOption.SO_BACKLOG, connectionBacklog) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverPipelineFactory.getDecoder()); ch.pipeline().addLast(serverPipelineFactory.getEncoder()); ch.pipeline().addLast(serverPipelineFactory.getHandler()); } }) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true);
首先通過ServerBootstrap的group方法配置主從線程池,然後配置channel類,接着配置一系列參數option。最後為事件消費線程配置參數和channel初始化initChannel,編輯碼部分不多展開(編解碼也是Netty的功能特色),主要是消費事件的處理類,設定為this,即當前服務端類。接下來給ServerBootstrap綁定IP端口 。
ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();
ServerBootstrap綁定IP端口會返回一個ChannelFuture對象,通過sync()阻塞等待綁定完成返回狀態給新建ChannelFuture對象f。最後,將f的channel賦值給服務端類的私有屬性Channel對象mainChannel。
mainChannel = f.channel();
3. 服務端功能
NettyClientServerCommunicationSystemServerSide類繼承自SimpleChannelInboundHandler<TOMMessage>,實現了CommunicationSystemServerSide接口。其中CommunicationSystemServerSide接口是BFT-SMaRt自定義的,主要用於描述客戶端通信中服務端的常用功能。
① 通用接口功能
下面進入CommunicationSystemServerSide接口,查看接口函數。
public interface CommunicationSystemServerSide { public void send(int[] targets, TOMMessage sm, boolean serializeClassHeaders); public int[] getClients(); public void setRequestReceiver(RequestReceiver requestReceiver); public void shutdown(); }
其中send方法是網絡通信中,節點給客戶端發送消息的具體方法,消息類型為TOMMessage。getClient方法會遍歷sessionReplicaToClient數據集合,將已建立的節點-客戶端會話連接中的客戶端統計出來並返回一個整形數組,目前該方法未被使用。setRequestReceiver是設置本地屬性requestReceiver。shutdown方法是關閉當前Netty系統。
② Channel處理器
前面介紹了,Bootstrap構建的Netty系統中的Channel處理器就是服務端類本身,我們回到該類的聲明部分,也看到了它是繼承自SimpleChannelInboundHandler<TOMMessage>,繼續追本溯源,它是ChannelHandler的子類,這符合處理器的聲明。
public ServerBootstrap childHandler(ChannelHandler childHandler)
下面,我們查看在服務端類中關於Channel處理器的重寫方法,也是包含4個方法:
- channelActive
- channelInactive
- exceptionCaught
- channelRead0
前三個方法都是來自於祖父類ChannelInboundHandlerAdapter,這三個方法是捕捉了channel的三個生命周期,方法體就是在不同的生命周期需要補充做的事。他們的實現更多像是一種標準。第四個方法來自於父類SimpleChannelInboundHandler,是核心的channel讀取數據的方法。
a) Channel生命周期
進入ChannelInboundHandlerAdapter類,該類完整地展示了Channel生命周期中的所有狀態。
一個Channel從最初註冊到Selector上面,變為活躍狀態,便可以讀取數據,期間可以更改可寫入能力,捕獲異常,觸發用戶事件。接着Channel可能變為不活躍狀態。Channel也可以隨時選擇從Selector上解除註冊。
服務端類對於Channel生命周期的3個實現,是一些常規處理和日誌提醒。
b) 讀取數據
channelRead0方法是Channel讀取信道中數據的核心方法。
protected void channelRead0(ChannelHandlerContext ctx, TOMMessage sm) throws Exception { if (this.closed) { closeChannelAndEventLoop(ctx.channel()); return; } // 交付消息到TOM層 if (requestReceiver == null) logger.warn("Request receiver is still null!"); else requestReceiver.requestReceived(sm); }
closeChannelAndEventLoop是前面生命周期方法中常用的方法,用於清空數據、解除註冊、關閉channel,關閉相關線程。下面核心方法是調用了requestReceiver的requestReceived方法。requestReceiver是前面介紹的通用接口setRequestReceiver設置的,RequestReceiver接口目前只有一個實現類是TOMLayer。這也放在共識階段再研究。
4. 節點通信層已完成
到目前為止,結合前一篇文章,本地節點的服務端通信系統ServerCommunicationSystem就全部構建完成了。
public ServerCommunicationSystem(ServerViewController controller, ServiceReplica replica) throws Exception { super("Server Comm. System"); this.controller = controller; messageHandler = new MessageHandler(); inQueue = new LinkedBlockingQueue<SystemMessage>(controller.getStaticConf().getInQueueSize()); serversConn = new ServersCommunicationLayer(controller, inQueue, replica); clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(controller); }
目前,節點服務部分重新回到bftsmart.tom.ServiceReplica#init方法,
cs = new ServerCommunicationSystem(this.SVController, this);
這一行代碼的內容通過這兩篇文章已經全部介紹完畢,從下一篇開始,繼續向下分析,將initTOMLayer()作為入口介紹節點服務中共識部分的實現原理。
二、CounterClient入口
前文通過一個章節的敘述,分析了BFT-SMaRt在節點客戶端通信過程中Netty服務端的構建。本章介紹另一方,即Netty客戶端的構建,計劃從CounterClient的main函數作為入口開始研究。
public static void main(String[] args) throws IOException { if (args.length < 2) { // 參數個數校驗,最少2個 System.out.println("CounterClient <process id> <increment> [<operations>]"); System.out.println("if <increment> equals 0, read-only"); System.out.println("default <number of operations> equals 1000"); System.exit(-1); } // (1)通過節點id,建立客戶端服務代理 ServiceProxy counterProxy = new ServiceProxy(Integer.parseInt(args[0])); try { int inc = Integer.parseInt(args[1]); // 影響值 // 操作次數,默認1000次 int numberOfOps = (args.length > 2) ? Integer.parseInt(args[2]) : 1000; for (int i = 0; i < numberOfOps; i++) { // 按照操作次數循環 // 將影響值放入輸出流out ByteArrayOutputStream out = new ByteArrayOutputStream(4); new DataOutputStream(out).writeInt(inc); System.out.print("Invocation " + i); // (2)調用實操方法,通過輸出流傳入影響值 byte[] reply = (inc == 0) ? counterProxy.invokeUnordered(out.toByteArray()) : counterProxy.invokeOrdered(out.toByteArray()); //magic happens here if (reply != null) { // 通過輸入流讀取返回值 int newValue = new DataInputStream(new ByteArrayInputStream(reply)).readInt(); System.out.println(", returned value: " + newValue); } else { System.out.println(", ERROR! Exiting."); break; } } } catch (IOException | NumberFormatException e) { counterProxy.close(); // 關閉代理 } }
該方法中比較重要的兩個步驟,
- 其一是為客戶端構建服務代理
- 其二是調用實操方法,消費影響值。
1. 構建服務代理
進入ServiceProxy類的構造函數,
public ServiceProxy(int processId) { this(processId, null, null, null, null); }
跳轉到本地的五參數的構造函數,
public ServiceProxy(int processId, String configHome, Comparator<byte[]> replyComparator, Extractor replyExtractor, KeyLoader loader) { // 代理服務初始化,包括網絡通信、共識視圖、系統配置 if (configHome == null) { init(processId, loader); } else { init(processId, configHome, loader); } // 構建一個TOMMessage數組,大小為節點總數。 replies = new TOMMessage[getViewManager().getCurrentViewN()]; // 比較器,繼承自jdk工具Comparator,重寫方法實現可比較兩個位元組數組是否相等的功能。 comparator = (replyComparator != null) ? replyComparator : new Comparator<byte[]>() { @Override public int compare(byte[] o1, byte[] o2) { return Arrays.equals(o1, o2) ? 0 : -1; } }; // 導出器,繼承自工具Extractor,可構建自定義的響應消息提取器。 extractor = (replyExtractor != null) ? replyExtractor : new Extractor() { @Override public TOMMessage extractResponse(TOMMessage[] replies, int sameContent, int lastReceived) { return replies[lastReceived]; } }; }
主要代碼為初始化,由於當前configHome傳入為null,調用的兩個參數的init方法。由於ServiceProxy類是TOMSender的子類,因此調用的init方法是父類的方法。
public void init(int processId, KeyLoader loader) { // 構建視圖控制器 this.viewController = new ClientViewController(processId, loader); // 啟動Netty通信 startsCS(processId); }
① 視圖控制器
ViewController類是系統的上下文環境,是由系統配置項構建。以下是BFT-SMaRt關於視圖的類圖關係。
視圖層級的根節點是View類,它實現了Serializable接口,所以視圖都是可序列化的。視圖最基本的屬性就是id,容錯數,節點id數組以及連接地址集合。在視圖控制器ViewController中,最終可以得到所有網絡配置屬性及方法。
a) 配置整合
ViewController類是View子類,包含了更全面的屬性字段,這些屬性的值來自於兩個渠道:
- 配置文件包括host.config以及system.config
- 配置類Configuration。
而TOMConfiguration類通過一個map容器Map<String, String>對象configs,有機地將以上兩個渠道的所有配置全部提取出來,在內存中構建了靜態配置對象staticConf。
b) 視圖存儲
繼續ViewController類的研究,視圖除了在內存中使用,也可以被持久化存儲在文件中。這個能力來自於接口ViewStorage,該接口提供了兩個功能,
public interface ViewStorage { public boolean storeView(View view); // 是否存儲成功 public View readView(); // 讀取視圖 }
目前該接口的實現只有DefaultViewStorage類,它可以將視圖對象通過對象輸出流寫入文件保存在磁盤上,同時還可以從磁盤上通過對象輸入流將文件數據恢復成內存中的View對象。
c) 服務端視圖控制器
根據上面的類圖,ServerViewController是ViewController的一個子類。作為共識節點服務端,它主要提供了共識方面的屬性功能。核心屬性如下:
private int quorumBFT; // ((n + f) / 2) replicas private int quorumCFT; // (n / 2) replicas private int[] otherProcesses; private int[] lastJoinStet; private List<TOMMessage> updates = new LinkedList<TOMMessage>(); private TOMLayer tomLayer;
quorumBFT是指在BFT網絡中的有效確認數,同理quorumCFT則是在CFT網絡中的有效確認數。lastJoinStet是用來記錄最後加入加點的,是指那些配置域以外的節點,可以是TTP,或者是陌生節點,需要重新配置reconfigure視圖參數。updates是共識消息TOMMessage的容器,tomLayer是共識層的對象。ServerViewController對象是構建節點通信系統的參數,這是前面所遺漏的部分,在此補充上。
首先在節點服務類的構建中包含:
this.SVController = new ServerViewController(id, configHome, loader);
接着構造函數內繼續執行init方法,會構建節點通信系統,
cs = new ServerCommunicationSystem(this.SVController, this);
第一個參數傳入的就是服務端視圖對象,該對象在構建節點通信系統時發揮了重要作用,例如讀取系統配置,判斷節點來源等等。那麼後續的內容在上一篇博文中就已經非常詳細了,這裡就到此為止。
d) 客戶端視圖控制器
我們回到TOMSender的init方法,構建客戶端視圖控制器。相對來講,ClientViewController的內容就很少了,它只有兩個構造函數和兩個自有方法。
public ClientViewController(int procId, KeyLoader loader) { super(procId, loader); // 初始化系統配置 View cv = getViewStore().readView(); // 從磁盤讀取視圖對象 // 調用reconfigureTo將視圖內容配置View屬性。 if(cv == null){ // 若未讀取成功,則通過配置參數構建新視圖 reconfigureTo(new View(0, getStaticConf().getInitialView(), getStaticConf().getF(), getInitAdddresses())); }else{ reconfigureTo(cv); } }
初始化配置,然後讀取視圖,通過ViewController的reconfigureTo方法(注意區分ServiceProxy也有一個reconfigureTo方法,要比這個方法複雜)替換視圖。
public void reconfigureTo(View newView) { this.lastView = this.currentView; // 將當前視圖變為上一個視圖 this.currentView = newView; // 傳入的新視圖變為當前視圖 }
到此,客戶端的視圖控制器就構建完成了。
② 啟動Netty客戶端
前面第一章節已經詳細介紹了節點Netty服務端的構建,下面就開始啟動相對應的Netty客戶端。通過TOMSender類的startCS方法。可以注意到參數clientId在前面的名字是processId,該參數是用來標識客戶端的,而不是用來指定請求節點的。
private void startsCS(int clientId) { this.cs = CommunicationSystemClientSideFactory.getCommunicationSystemClientSide(clientId, this.viewController); this.cs.setReplyReceiver(this); // This object itself shall be a reply receiver this.me = this.viewController.getStaticConf().getProcessId(); this.useSignatures = this.viewController.getStaticConf().getUseSignatures() == 1; this.session = new Random().nextInt(); }
我們注意到,Netty服務端的構建只傳入了一個服務端視圖控制器ServerViewController對象,
public NettyClientServerCommunicationSystemServerSide(ServerViewController controller)
而Netty客戶端的構建傳入了客戶端id和客戶端視圖控制器ClientViewController對象兩個參數。
public NettyClientServerCommunicationSystemClientSide(int clientId, ClientViewController controller)
- NettyClientServerCommunicationSystemClientSide,後面簡稱為Netty客戶端類。
- NettyClientServerCommunicationSystemServerSide,前面介紹過了,簡稱為Netty服務端類。
下面進入到客戶端類的構造函數,首先執行的父類構造函數super(),由於Netty客戶端類和服務端類都是繼承自同一個父類SimpleChannelInboundHandler,因此可參照(一-1)父類構造函數的內容。接下來,客戶端類並沒有主線程池而只有從線程池workerGroup,即bossGroup是Netty服務端類特有的。
this.workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
構建從線程池,初始容量為當前系統中所有的可用線程數量。接下來,私鑰的讀取也可參考(一-2-①-a)。然後通過視圖對象獲取所有的節點id。
int[] currV = controller.getCurrentViewProcesses();
對節點id數組進行遍歷,向每個節點發起連接請求,secretKeyFactory是加密工具組件。
ChannelFuture future = connectToReplica(replicaId, secretKeyFactory);
客戶端指定自身id,然後向共識網絡發起請求,而不是指定節點,進入共識網絡以後,會遍歷節點分別建立連接,這與理論部分中的邏輯圖是吻合的。
a) 連接到指定節點
進入connectToReplica連接到指定節點方法。
public synchronized ChannelFuture connectToReplica(int replicaId, SecretKeyFactory fac) throws NoSuchAlgorithmException, InvalidKeySpecException, InvalidKeyException { // 2端參與的連接認證密碼,暫未使用 String str = this.clientId + ":" + replicaId; PBEKeySpec spec = TOMUtil.generateKeySpec(str.toCharArray()); SecretKey authKey = fac.generateSecret(spec); // 配置啟動程序 Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.TCP_NODELAY, true); b.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMsec); b.handler(getChannelInitializer()); // 添加channel處理器 // 啟動連接到指定的節點replicaId,返回ChannelFuture ChannelFuture channelFuture = b.connect(controller.getRemoteAddress(replicaId)); // 緩存連接會話到sessionClientToReplica,這是一個ConcurrentHashMap容器 NettyClientServerSession ncss = new NettyClientServerSession( channelFuture.channel(), replicaId); // 構建Netty客戶端請求服務端的會話對象 sessionClientToReplica.put(replicaId, ncss); return channelFuture; }
相對Netty服務端來講,連接過程差不多但簡單很多,仍舊通過啟動程序Bootstrap完成快速構建。首先為Bootstrap添加了線程池workerGroup,然後指定了Channel類型為NioSocketChannel(注意區分服務端的channel類型為NioServerSocketChannel),接着配置參數,添加Channel處理器。然後,通過連接方法建立與指定節點的連接通信。
ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId));
這對應的是Netty服務端的,
ChannelFuture f = b.bind(new InetSocketAddress(myAddress, myPort)).sync();
客戶端的connect也可以加一個阻塞等待sync(),即:
ChannelFuture f = b.connect(controller.getRemoteAddress(replicaId)).sync();
服務端成功綁定了IP端口,就開始監聽該地址了。此時客戶端通過connect方法連接指定地址的節點。connect方法的參數是通過controller.getRemoteAddress(replicaId)返回的SocketAddress類型對象,內容就是IP端口號。連接成功以後,客戶端ChannelFuture可以通過isSuccess()方法返回true來得到結果。回到客戶端類的構造函數,
future.awaitUninterruptibly();
將線程阻塞在這一行代碼,保持線程的通信可用性,直到Channel關閉,此處是通過捕捉connect方法拋出的異常而完成線程關閉。
public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } else { this.validate(); return this.doResolveAndConnect(remoteAddress, this.config.localAddress()); } }
connect方法中往下調用,會根據通道的不同情況拋出異常。
b) Channel處理器
注意觀察以上代碼,作為重要的組成部分,channel處理器是通過getChannelInitializer()構建的。
private ChannelInitializer getChannelInitializer() throws NoSuchAlgorithmException { final NettyClientPipelineFactory nettyClientPipelineFactory = new NettyClientPipelineFactory(this, sessionClientToReplica, controller, rl); ChannelInitializer channelInitializer = new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(nettyClientPipelineFactory.getDecoder()); ch.pipeline().addLast(nettyClientPipelineFactory.getEncoder()); ch.pipeline().addLast(nettyClientPipelineFactory.getHandler()); } }; return channelInitializer; }
這部分代碼與Netty服務端也很相似,最終Channel處理器也指向了客戶端類本身。
2. Netty通信原理
到此為止,我們構建了Netty服務端,也建立了客戶端並對服務端發起了連接請求。那麼通過以上Netty相關的內容,下面通過一張圖來介紹Netty的通信原理。
首先由服務端啟動程序初始化channel,然後發起綁定將channel註冊到主線程池的按順序的某一個線程的selector。Selector會輪詢channel的accept事件。這時候如果有客戶端啟動程序發起的connect連接觸發accept事件,該事件會執行一個任務並被放到任務隊列中去,等待消費。當runAllTasks消費到該任務,則建立起另一條channel並註冊到從線程池的按順序的某一個線程的selector。Selector會輪詢讀寫事件。Channel中當數據被接收完成,表示讀就緒就是讀事件;同樣的,Channel中當可以寫數據時,標識寫就緒就是寫事件。讀寫事件發生都會單獨執行一個任務並被放到任務隊列中去,等待任務消費。當runAllTasks消費到該任務,則會處理具體讀寫事件。
3. 客戶端功能
客戶端與節點之間通過Netty建立了通信。客戶端類實現了CommunicationSystemClientSide接口,與服務端同樣繼承了SimpleChannelInboundHandler<TOMMessage>,因此也可分為通用客戶端接口類和Channel生命周期方法。
① 通用接口功能
客戶端類實現了CommunicationSystemClientSide接口,該接口定義了客戶端節點通信中,節點應該具備的功能。
public interface CommunicationSystemClientSide { public void send(boolean sign, int[] targets, TOMMessage sm); public void setReplyReceiver(ReplyReceiver trr); // 設置ReplyReceiver字段 public void sign(TOMMessage sm); // 未使用 public void close(); public void updateConnections(); }
a) 發送消息準備
首先看send接口,在客戶端類的實現體中,send首先計算出基於BFT或CFT的最少確認數quorum。
int quorum; Integer[] targetArray = Arrays.stream(targets).boxed().toArray(Integer[]::new); Collections.shuffle(Arrays.asList(targetArray), new Random()); if (controller.getStaticConf().isBFT()) { quorum = (int) Math.ceil((controller.getCurrentViewN() + controller.getCurrentViewF()) / 2) + 1; } else { quorum = (int) Math.ceil((controller.getCurrentViewN()) / 2) + 1; } listener.waitForChannels(quorum); // 等待前面的傳輸完成,收集足夠的消息確認數
當共識要求的最少確認數達成以後,客戶端才可以發起請求。請求的類型是TOMMessage,被發送前要通過位元組數組輸出流序列化得到消息對象sm的序列化消息serializedMessage。(這個在EOS的合約請求中也是常見的,data對象中除明文參數以外,還會有hex作為請求的序列化消息,便於傳輸。)
if (sm.serializedMessage == null) { DataOutputStream dos = null; try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); dos = new DataOutputStream(baos); sm.wExternal(dos); dos.flush(); sm.serializedMessage = baos.toByteArray(); } catch (IOException ex) { logger.debug("Impossible to serialize message: " + sm); } }
b) 消息簽名
接下來,要通過本地私鑰給已序列化的消息進行簽名。
sm.serializedMessageSignature = signMessage(privKey, sm.serializedMessage);
調用本地signMessage方法進行簽名。
public byte[] signMessage(PrivateKey key, byte[] message) { try { // 簽名引擎,用於簽名的工具。 if (signatureEngine == null) { signatureEngine = TOMUtil.getSigEngine(); } byte[] result = null; // 載入私鑰 java.security.Signature.initSign(java.security.PrivateKey) signatureEngine.initSign(key); // 載入待簽名消息 java.security.Signature.update(byte[]) signatureEngine.update(message); // 執行簽名 java.security.Signature.sign() result = signatureEngine.sign(); // 返回簽名結果 return result; } catch (Exception e) { logger.error("Failed to sign message", e); return null; } }
簽名的底層代碼是由jdk中java.security.Signature包所完成的,感興趣的小夥伴可深入研究。簽名後得到了消息對象sm的serializedMessageSignature字段的值。
c) 發送消息
接下來,會遍歷發送終點,即所有的節點。每拿到一個節點,則將該節點作為目的地存入消息對象sm的destination字段。
sm.destination = targets[target];
接下來上鎖並獲得channel,
rl.readLock().lock(); Channel channel = ((NettyClientServerSession) sessionClientToReplica.get(targets[target])).getChannel(); rl.readLock().unlock();
sessionClientToReplica容器在前面建立Netty客戶端時談到過,是一個連接會話的容器,當再次獲取連接時不必重新構建。在這個容器中按照節點查找得到指定節點的連接channel。判斷如果該channel是可用的,則發送。
if (channel.isActive()) { sm.signed = sign; // 簽名成功後會將該標示位sign置為true。 ChannelFuture f = channel.writeAndFlush(sm); f.addListener(listener); sent++; // 發送計數器,用於共識確認數。 }
將消息寫入channel,然後為該處理添加監聽器,以便能捕捉處理狀態事件。該監聽器在Netty客戶端類構建時被賦值。
this.listener = new SyncListener();
SyncListener類是Netty客戶端類的內部類,它重寫了operationComplete事件。同時,增加了方法waitForChannels,用於共識機制中,收集回復確認數的相關通道的等待。
d) 關閉通道
close方法是用來將通道關閉的。
public void close() { this.closed = true; // 設置關閉標誌位 rl.readLock().lock(); // 上鎖 ArrayList<NettyClientServerSession> sessions = new ArrayList<>(sessionClientToReplica.values()); // 讀取連接會話容器中的所有連接 rl.readLock().unlock(); for (NettyClientServerSession ncss : sessions) { // 遍歷關閉 Channel c = ncss.getChannel(); // 從對象中獲取channel closeChannelAndEventLoop(c); // 安全關閉channel以及EventLoop線程 } }
e) 更新連接(視圖)
當有新的節點加入時,需要更新視圖。以便於客戶端遍歷節點發送消息,因此會涉及修改Netty客戶端的連接。我們知道,Netty客戶端與各個節點的連接被放在了連接會話容器sessionClientToReplica。更新連接時,說明視圖已經更新完畢。那麼要對視圖中所有節點進行遍歷。
int[] currV = controller.getCurrentViewProcesses(); for (int i = 0; i < currV.length; i++) { ... }
遍歷每一個節點,然後判斷是否在sessionClientToReplica存在連接,如果存在說明是老節點,如果不存在說明是新節點。那麼針對新節點,要建立新的連接並放到sessionClientToReplica。建立連接的方式與新建是相同的。
ChannelFuture future = connectToReplica(replicaId, secretKeyFactory); future.awaitUninterruptibly();
參考前面(二-1-②-a)。
② channel處理器
這部分主要是處理channel的生命周期,與Netty服務端的內容基本一致。也是四個方法:
- channelActive,標識
- channelInactive,調用scheduleReconnect
- channelUnregistered,調用scheduleReconnect
- exceptionCaught,輸出錯誤日誌
這四個實現與Netty服務端完全一致。這裡補充一下scheduleReconnect方法的內容。
a) 定時重連
scheduleReconnect顧名思義,是定時重連的含義。
private void scheduleReconnect(final ChannelHandlerContext ctx, int time) { if (closed) { // 如果是已關閉狀態,則走關閉流程。 closeChannelAndEventLoop(ctx.channel()); return; } // 未關閉狀態,則定時重連。 final EventLoop loop = ctx.channel().eventLoop(); // 首先獲得eventLoop線程 loop.schedule(new Runnable() { // 為線程增加定時任務 @Override public void run() { reconnect(ctx); // 任務執行reconnect方法。 } }, time, TimeUnit.SECONDS); }
下面進入重連方法reconnect,這個方法在Netty服務端也有,與新建連接差不多,但是重連一般都會在sessionClientToReplica容器已存在。
public void reconnect(final ChannelHandlerContext ctx) { rl.writeLock().lock(); ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>( sessionClientToReplica.values()); for (NettyClientServerSession ncss : sessions) { // 遍歷連接 if (ncss.getChannel() == ctx.channel()) { int replicaId = ncss.getReplicaId(); try { if (controller.getRemoteAddress(replicaId) != null) { ChannelFuture future; try { // 建立連接 future = connectToReplica(replicaId, secretKeyFactory); } catch (InvalidKeyException | InvalidKeySpecException e) { logger.error("Error in key.",e); } logger.info("ClientID {}, re-connection to replica {}, at address: {}", clientId, replicaId, controller.getRemoteAddress(replicaId)); } else { // 說明該節點已經刪除,則從sessionClientToReplica刪除連接。 removeClient(replicaId); } } catch (NoSuchAlgorithmException ex) { logger.error("Failed to reconnect to replica", ex); } } } rl.writeLock().unlock(); }
建立連接時仍舊調用connectToReplica方法。參考前面(二-1-②-a)。
③ 已完成內容
致此,CounterClient中通過節點id,建立客戶端服務代理的工作已完成。
4. 調用排序消息
BFT-SMaRt中經常出現的TOM前綴的內容,一般我都會歸併到共識中去,這幾篇文章也未展開。那麼TOM的含義是什麼?其實就是Total ordered multicast的含義,也就是全排序多點廣播,這就是共識的一種體現。
byte[] reply = (inc == 0) ? counterProxy.invokeUnordered(out.toByteArray()) : counterProxy.invokeOrdered(out.toByteArray());
回到CounterClient,當影響值為0時,調用無序方法invokeUnordered,對應類型為TOMMessageType.UNORDERED_REQUEST。當影響值為其他值時,調用共識方法invokeOrdered。後續我的猜測是通過Netty服務端拿到這個值,然後通過ServerViewController的lastView的值加上影響值,然後變為currentView即可,返回currentView的最新的值。這部分內容可以在下一篇詳細展開。
三、後記
經過本文以及前面幾篇BFT-SMaRt相關的文章,可靠信道的部分就全部介紹完了。後續會展開節點對於消息的共識邏輯,以及視圖更換後狀態同步的邏輯的研究。
更多文章請轉到一面千人的博客園。