SOFAJRaft—初次使用

  • 2019 年 10 月 6 日
  • 筆記

SOFAJRaft 是基於 Raft 演算法的生產級高性能 Java 實現,支援 MULTI-RAFT-GROUP。應用場景有 Leader 選舉、分散式鎖服務、高可靠的元資訊管理、分散式存儲系統。

如果不了解Raft演算法的朋友可以去看看這篇文章:Raft 為什麼是更易理解的分散式一致性演算法,寫的很詳細了。

這張圖是SOFAJRaft的設計圖,其中Node 代表了一個 SOFAJRaft Server 節點。

由於SOFAJRaft的Node節點是一個分散式的結構,所以Node節點需要將資訊傳遞給其他Node,所以Replicator的作用就是用來複制資訊給其他的Node。多個Replicator共同組成一個ReplicatorGroup。

Snapshot是表示一個快照,就是對數據當前值的一個記錄,會存檔保存,提供冷備數據功能。
Leader 生成快照有這麼幾個作用:

  • 當有新的 Node 加入集群的時候,不用只靠日誌複製、回放去和 Leader 保持數據一致,而是通過安裝 Leader 的快照來跳過早期大量日誌的回放;
  • Leader 用快照替代 Log 複製可以減少網路上的數據量;
  • 用快照替代早期的 Log 可以節省存儲空間;

StateMachine 介面是用來給用戶去實現的部分。通過用戶實現具體的業務邏輯從而在分散式系統中達成共識。
在 StateMachine 上,我們要去實現狀態機暴露給我們待實現的幾個介面,最重要的是 onApply 介面,要在這個介面里將 Cilent 的請求指令進行運算,轉換成具體的計數器值。而 onSnapshotSave 和 onSnapshotLoad 介面則是負責快照的生成和載入。

Client也是需要用戶去實現的部分,用戶需要去定義不同的消息類型和客戶端的處理邏輯。

實現Counter分散式計數器

下面我們給出個需求: 提供一個 Counter,Client 每次計數時可以指定步幅,也可以隨時發起查詢。
將它翻譯成具體的功能點,主要有三部分:

  1. 實現:Counter server,具備計數功能,具體運算公式為:Cn = Cn-1 + delta;
  2. 提供寫服務,寫入 delta 觸發計數器運算;
  3. 提供讀服務,讀取當前 Cn 值;

具體程式碼:Counter

在這個demo中,我們啟動三個server作為一個group,傳入下面的參數:

/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083  /tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083  /tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

表示使用/tmp/server1 ,/tmp/server2,/tmp/server3三個目錄用來存儲數據,raft group名稱為 counter,節點ip也分別為

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

然後啟動客戶端,並傳入下面參數:

counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

表示綁定的raft group名稱為 counter,集群為:

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

服務端

CounterServer

public CounterServer(final String dataPath, final String groupId, final PeerId serverId,                       final NodeOptions nodeOptions) throws IOException {      // 初始化路徑      FileUtils.forceMkdir(new File(dataPath));        // 這裡讓 raft RPC 和業務 RPC 使用同一個 RPC server, 通常也可以分開      final RpcServer rpcServer = new RpcServer(serverId.getPort());      RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);      // 註冊業務處理器      rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));      rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));      // 初始化狀態機      this.fsm = new CounterStateMachine();      // 設置狀態機到啟動參數      nodeOptions.setFsm(this.fsm);      // 設置存儲路徑      // 日誌, 必須      nodeOptions.setLogUri(dataPath + File.separator + "log");      // 元資訊, 必須      nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");      // snapshot, 可選, 一般都推薦      nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");      // 初始化 raft group 服務框架      this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);      // 啟動      this.node = this.raftGroupService.start();  }

服務端CounterServer在實例化的時候會設置相應的處理器,這裡設置了GetValueRequestProcessor和 IncrementAndGetRequestProcessor。
GetValueRequestProcessor用來提供讀服務,讀取當前 Cn 值;
IncrementAndGetRequestProcessor提供寫服務,寫入 delta 觸發計數器運算;
GetValueRequestProcessor

@Override  public Object handleRequest(final BizContext bizCtx, final GetValueRequest request) throws Exception {      if (!this.counterServer.getFsm().isLeader()) {          return this.counterServer.redirect();      }        final ValueResponse response = new ValueResponse();      response.setSuccess(true);      response.setValue(this.counterServer.getFsm().getValue());      return response;  }

GetValueRequestProcessor的處理非常的簡單,直接獲取狀態機的值然後返回。

IncrementAndGetRequestProcessor

public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx,                            final IncrementAndGetRequest request) {      //判斷當前節點是否是leader      if (!this.counterServer.getFsm().isLeader()) {          asyncCtx.sendResponse(this.counterServer.redirect());          return;      }      //設置響應數據      final ValueResponse response = new ValueResponse();      //封裝請求數據,並回調響應結果      final IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response,              status -> {                  //響應成功                  if (!status.isOk()) {                      response.setErrorMsg(status.getErrorMsg());                      response.setSuccess(false);                  }                  //發送響應請求                  asyncCtx.sendResponse(response);              });        try {          final Task task = new Task();          task.setDone(closure);          //序列化請求          task.setData(ByteBuffer                  .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(request)));          //調用node處理請求          // apply task to raft group.          counterServer.getNode().apply(task);      } catch (final CodecException e) {          LOG.error("Fail to encode IncrementAndGetRequest", e);          //請求失敗,則立即響應          response.setSuccess(false);          response.setErrorMsg(e.getMessage());          asyncCtx.sendResponse(response);      }  }

這裡使用IncrementAndAddClosure來封裝響應和請求,並通過回調的方式進行非同步回寫數據到client。然後實例化Task實例,序列化請求數據,調用node的apply方法。

然後設置了CounterStateMachine狀態機,並設值了日誌,元資訊和快照的存儲路徑。
CounterStateMachine實現了StateMachineAdapter抽象類,並重寫了3個方法:
onApply用來處理具體的業務
onSnapshotSave保存快照
onSnapshotLoad載入快照
在保存和載入快照的地方使用了CounterSnapshotFile類來進行輔助。

CounterStateMachine

public class CounterStateMachine extends StateMachineAdapter {      ...      private final AtomicLong    value      = new AtomicLong(0);        public void onApply(final Iterator iter) {          //獲取processor中封裝的數據          while (iter.hasNext()) {              long delta = 0;                //用於封裝請求數據和回調結果              IncrementAndAddClosure closure = null;              if (iter.done() != null) {                  // This task is applied by this node, get value from closure to avoid additional parsing.                  closure = (IncrementAndAddClosure) iter.done();                  delta = closure.getRequest().getDelta();              } else {                  // Have to parse FetchAddRequest from this user log.                  final ByteBuffer data = iter.getData();                  try {                      final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2)                              .deserialize(data.array(), IncrementAndGetRequest.class.getName());                      delta = request.getDelta();                  } catch (final CodecException e) {                      LOG.error("Fail to decode IncrementAndGetRequest", e);                  }              }              //獲取當前值              final long prev = this.value.get();              //將當前值加上delta              final long updated = value.addAndGet(delta);              //設置響應,並調用run方法回寫響應方法              if (closure != null) {                  closure.getResponse().setValue(updated);                  closure.getResponse().setSuccess(true);                  closure.run(Status.OK());              }              LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());              iter.next();          }      }  }

這裡的onApply方法首先會獲取processor中封裝的數據,然後獲取processor中傳入的closure實例,然後處理好業務邏輯後調用closure的run進行回調返回數據到客戶端。

客戶端

CounterClient

public static void main(final String[] args) throws Exception {      if (args.length != 2) {          System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}");          System.out              .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");          System.exit(1);      }      final String groupId = args[0];      final String confStr = args[1];        final Configuration conf = new Configuration();      if (!conf.parse(confStr)) {          throw new IllegalArgumentException("Fail to parse conf:" + confStr);      }      // 更新raft group配置      RouteTable.getInstance().updateConfiguration(groupId, conf);      //接下來初始化 RPC 客戶端並更新路由表      final BoltCliClientService cliClientService = new BoltCliClientService();      cliClientService.init(new CliOptions());        if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {          throw new IllegalStateException("Refresh leader failed");      }      //獲取 leader 後發送請求      final PeerId leader = RouteTable.getInstance().selectLeader(groupId);      System.out.println("Leader is " + leader);      final int n = 1000;      final CountDownLatch latch = new CountDownLatch(n);      final long start = System.currentTimeMillis();      for (int i = 0; i < n; i++) {          incrementAndGet(cliClientService, leader, i, latch);      }      latch.await();      System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");      System.exit(0);  }

客戶端先是根據groupId和IP綁定server,然後更新路由表,獲取leader

private static void incrementAndGet(final BoltCliClientService cliClientService, final PeerId leader,                                      final long delta, CountDownLatch latch) throws RemotingException,                                                                             InterruptedException {      final IncrementAndGetRequest request = new IncrementAndGetRequest();      request.setDelta(delta);      cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,          new InvokeCallback() {                @Override              public void onResponse(Object result) {                  latch.countDown();                  System.out.println("incrementAndGet result:" + result);              }                @Override              public void onException(Throwable e) {                  e.printStackTrace();                  latch.countDown();                }                @Override              public Executor getExecutor() {                  return null;              }          }, 5000);  }

然後調用incrementAndGet方法。incrementAndGet方法中使用cliClientService獲取client然後傳入request請求並設值回調函數。

總體流程

這裡總結一下整個server和client的調用流程

首先是CounterClient綁定server後,獲取server的leader節點,然後發送一個IncrementAndGetRequest的request請求到server。

Server接收到請求後根據請求的類型交給IncrementAndGetRequestProcessor處理,並調用handleRequest方法。

然後handleRequest會將數據封裝調用狀態機的onApply方法,處理業務數據後調用closure進行回調。

closure回調後會封裝一個ValueResponse發送響應請求給客戶端。

客戶端會回調onResponse方法。

到這裡整個counter的例子就講解完畢了