SOFAJRaft—初次使用
- 2019 年 10 月 6 日
- 筆記
SOFAJRaft 是基於 Raft 演算法的生產級高性能 Java 實現,支援 MULTI-RAFT-GROUP。應用場景有 Leader 選舉、分散式鎖服務、高可靠的元資訊管理、分散式存儲系統。
如果不了解Raft演算法的朋友可以去看看這篇文章:Raft 為什麼是更易理解的分散式一致性演算法,寫的很詳細了。
這張圖是SOFAJRaft的設計圖,其中Node 代表了一個 SOFAJRaft Server 節點。
由於SOFAJRaft的Node節點是一個分散式的結構,所以Node節點需要將資訊傳遞給其他Node,所以Replicator的作用就是用來複制資訊給其他的Node。多個Replicator共同組成一個ReplicatorGroup。
Snapshot是表示一個快照,就是對數據當前值的一個記錄,會存檔保存,提供冷備數據功能。
Leader 生成快照有這麼幾個作用:
- 當有新的 Node 加入集群的時候,不用只靠日誌複製、回放去和 Leader 保持數據一致,而是通過安裝 Leader 的快照來跳過早期大量日誌的回放;
- Leader 用快照替代 Log 複製可以減少網路上的數據量;
- 用快照替代早期的 Log 可以節省存儲空間;
StateMachine 介面是用來給用戶去實現的部分。通過用戶實現具體的業務邏輯從而在分散式系統中達成共識。
在 StateMachine 上,我們要去實現狀態機暴露給我們待實現的幾個介面,最重要的是 onApply 介面,要在這個介面里將 Cilent 的請求指令進行運算,轉換成具體的計數器值。而 onSnapshotSave 和 onSnapshotLoad 介面則是負責快照的生成和載入。
Client也是需要用戶去實現的部分,用戶需要去定義不同的消息類型和客戶端的處理邏輯。
實現Counter分散式計數器
下面我們給出個需求: 提供一個 Counter,Client 每次計數時可以指定步幅,也可以隨時發起查詢。
將它翻譯成具體的功能點,主要有三部分:
- 實現:Counter server,具備計數功能,具體運算公式為:Cn = Cn-1 + delta;
- 提供寫服務,寫入 delta 觸發計數器運算;
- 提供讀服務,讀取當前 Cn 值;
具體程式碼:Counter
在這個demo中,我們啟動三個server作為一個group,傳入下面的參數:
/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 /tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083 /tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
表示使用/tmp/server1 ,/tmp/server2,/tmp/server3三個目錄用來存儲數據,raft group名稱為 counter,節點ip也分別為
127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
然後啟動客戶端,並傳入下面參數:
counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
表示綁定的raft group名稱為 counter,集群為:
127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
服務端
CounterServer
public CounterServer(final String dataPath, final String groupId, final PeerId serverId, final NodeOptions nodeOptions) throws IOException { // 初始化路徑 FileUtils.forceMkdir(new File(dataPath)); // 這裡讓 raft RPC 和業務 RPC 使用同一個 RPC server, 通常也可以分開 final RpcServer rpcServer = new RpcServer(serverId.getPort()); RaftRpcServerFactory.addRaftRequestProcessors(rpcServer); // 註冊業務處理器 rpcServer.registerUserProcessor(new GetValueRequestProcessor(this)); rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this)); // 初始化狀態機 this.fsm = new CounterStateMachine(); // 設置狀態機到啟動參數 nodeOptions.setFsm(this.fsm); // 設置存儲路徑 // 日誌, 必須 nodeOptions.setLogUri(dataPath + File.separator + "log"); // 元資訊, 必須 nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); // snapshot, 可選, 一般都推薦 nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); // 初始化 raft group 服務框架 this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); // 啟動 this.node = this.raftGroupService.start(); }
服務端CounterServer在實例化的時候會設置相應的處理器,這裡設置了GetValueRequestProcessor和 IncrementAndGetRequestProcessor。
GetValueRequestProcessor用來提供讀服務,讀取當前 Cn 值;
IncrementAndGetRequestProcessor提供寫服務,寫入 delta 觸發計數器運算;
GetValueRequestProcessor
@Override public Object handleRequest(final BizContext bizCtx, final GetValueRequest request) throws Exception { if (!this.counterServer.getFsm().isLeader()) { return this.counterServer.redirect(); } final ValueResponse response = new ValueResponse(); response.setSuccess(true); response.setValue(this.counterServer.getFsm().getValue()); return response; }
GetValueRequestProcessor的處理非常的簡單,直接獲取狀態機的值然後返回。
IncrementAndGetRequestProcessor
public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final IncrementAndGetRequest request) { //判斷當前節點是否是leader if (!this.counterServer.getFsm().isLeader()) { asyncCtx.sendResponse(this.counterServer.redirect()); return; } //設置響應數據 final ValueResponse response = new ValueResponse(); //封裝請求數據,並回調響應結果 final IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response, status -> { //響應成功 if (!status.isOk()) { response.setErrorMsg(status.getErrorMsg()); response.setSuccess(false); } //發送響應請求 asyncCtx.sendResponse(response); }); try { final Task task = new Task(); task.setDone(closure); //序列化請求 task.setData(ByteBuffer .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(request))); //調用node處理請求 // apply task to raft group. counterServer.getNode().apply(task); } catch (final CodecException e) { LOG.error("Fail to encode IncrementAndGetRequest", e); //請求失敗,則立即響應 response.setSuccess(false); response.setErrorMsg(e.getMessage()); asyncCtx.sendResponse(response); } }
這裡使用IncrementAndAddClosure來封裝響應和請求,並通過回調的方式進行非同步回寫數據到client。然後實例化Task實例,序列化請求數據,調用node的apply方法。
然後設置了CounterStateMachine狀態機,並設值了日誌,元資訊和快照的存儲路徑。
CounterStateMachine實現了StateMachineAdapter抽象類,並重寫了3個方法:
onApply用來處理具體的業務
onSnapshotSave保存快照
onSnapshotLoad載入快照
在保存和載入快照的地方使用了CounterSnapshotFile類來進行輔助。
CounterStateMachine
public class CounterStateMachine extends StateMachineAdapter { ... private final AtomicLong value = new AtomicLong(0); public void onApply(final Iterator iter) { //獲取processor中封裝的數據 while (iter.hasNext()) { long delta = 0; //用於封裝請求數據和回調結果 IncrementAndAddClosure closure = null; if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional parsing. closure = (IncrementAndAddClosure) iter.done(); delta = closure.getRequest().getDelta(); } else { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2) .deserialize(data.array(), IncrementAndGetRequest.class.getName()); delta = request.getDelta(); } catch (final CodecException e) { LOG.error("Fail to decode IncrementAndGetRequest", e); } } //獲取當前值 final long prev = this.value.get(); //將當前值加上delta final long updated = value.addAndGet(delta); //設置響應,並調用run方法回寫響應方法 if (closure != null) { closure.getResponse().setValue(updated); closure.getResponse().setSuccess(true); closure.run(Status.OK()); } LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex()); iter.next(); } } }
這裡的onApply方法首先會獲取processor中封裝的數據,然後獲取processor中傳入的closure實例,然後處理好業務邏輯後調用closure的run進行回調返回數據到客戶端。
客戶端
CounterClient
public static void main(final String[] args) throws Exception { if (args.length != 2) { System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}"); System.out .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); System.exit(1); } final String groupId = args[0]; final String confStr = args[1]; final Configuration conf = new Configuration(); if (!conf.parse(confStr)) { throw new IllegalArgumentException("Fail to parse conf:" + confStr); } // 更新raft group配置 RouteTable.getInstance().updateConfiguration(groupId, conf); //接下來初始化 RPC 客戶端並更新路由表 final BoltCliClientService cliClientService = new BoltCliClientService(); cliClientService.init(new CliOptions()); if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) { throw new IllegalStateException("Refresh leader failed"); } //獲取 leader 後發送請求 final PeerId leader = RouteTable.getInstance().selectLeader(groupId); System.out.println("Leader is " + leader); final int n = 1000; final CountDownLatch latch = new CountDownLatch(n); final long start = System.currentTimeMillis(); for (int i = 0; i < n; i++) { incrementAndGet(cliClientService, leader, i, latch); } latch.await(); System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms."); System.exit(0); }
客戶端先是根據groupId和IP綁定server,然後更新路由表,獲取leader
private static void incrementAndGet(final BoltCliClientService cliClientService, final PeerId leader, final long delta, CountDownLatch latch) throws RemotingException, InterruptedException { final IncrementAndGetRequest request = new IncrementAndGetRequest(); request.setDelta(delta); cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request, new InvokeCallback() { @Override public void onResponse(Object result) { latch.countDown(); System.out.println("incrementAndGet result:" + result); } @Override public void onException(Throwable e) { e.printStackTrace(); latch.countDown(); } @Override public Executor getExecutor() { return null; } }, 5000); }
然後調用incrementAndGet方法。incrementAndGet方法中使用cliClientService獲取client然後傳入request請求並設值回調函數。
總體流程
這裡總結一下整個server和client的調用流程
首先是CounterClient綁定server後,獲取server的leader節點,然後發送一個IncrementAndGetRequest的request請求到server。
Server接收到請求後根據請求的類型交給IncrementAndGetRequestProcessor處理,並調用handleRequest方法。
然後handleRequest會將數據封裝調用狀態機的onApply方法,處理業務數據後調用closure進行回調。
closure回調後會封裝一個ValueResponse發送響應請求給客戶端。
客戶端會回調onResponse方法。
到這裡整個counter的例子就講解完畢了