源码分析 RocketMQ DLedger(多副本) 之日志复制-下篇
- 2019 年 10 月 7 日
- 筆記
温馨提示:由于微信单篇文章的字数限制,RocketMQ DLedger 日志复制分为两篇文章介绍。本篇紧接着上文源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇。
3、EntryHandler 详解
EntryHandler 同样是一个线程,当节点状态为从节点时激活。
3.1 核心类图

其核心属性如下:
- long lastCheckFastForwardTimeMs 上一次检查主服务器是否有 push 消息的时间戳。
- ConcurrentMap>> writeRequestMap append 请求处理队列。
- BlockingQueue>> compareOrTruncateRequests COMMIT、COMPARE、TRUNCATE 相关请求
3.2 handlePush
从上文得知,主节点会主动向从节点传播日志,从节点会通过网络接受到请求数据进行处理,其调用链如图所示:

最终会调用 EntryHandler 的 handlePush 方法。
EntryHandler#handlePush
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception { //The timeout should smaller than the remoting layer's request timeout CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(); // @1 switch (request.getType()) { case APPEND: // @2 PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); long index = request.getEntry().getIndex(); Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future)); if (old != null) { logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode())); } break; case COMMIT: // @3 compareOrTruncateRequests.put(new Pair<>(request, future)); break; case COMPARE: case TRUNCATE: // @4 PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); writeRequestMap.clear(); compareOrTruncateRequests.put(new Pair<>(request, future)); break; default: logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode())); break; } return future; }
从几点处理主节点的 push 请求,其实现关键点如下。
代码@1:首先构建一个响应结果Future,默认超时时间 1s。
代码@2:如果是 APPEND 请求,放入到 writeRequestMap 集合中,如果已存在该数据结构,说明主节点重复推送,构建返回结果,其状态码为 REPEATED_PUSH。放入到 writeRequestMap 中,由 doWork 方法定时去处理待写入的请求。
代码@3:如果是提交请求, 将请求存入 compareOrTruncateRequests 请求处理中,由 doWork 方法异步处理。
代码@4:如果是 COMPARE 或 TRUNCATE 请求,将待写入队列 writeRequestMap 清空,并将请求放入 compareOrTruncateRequests 请求队列中,由 doWork 方法异步处理。
接下来,我们重点来分析 doWork 方法的实现。
3.3 doWork 方法详解
EntryHandler#doWork
public void doWork() { try { if (!memberState.isFollower()) { // @1 waitForRunning(); return; } if (compareOrTruncateRequests.peek() != null) { // @2 Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll(); PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN); switch (pair.getKey().getType()) { case TRUNCATE: handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue()); break; case COMPARE: handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue()); break; case COMMIT: handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue()); break; default: break; } } else { // @3 long nextIndex = dLedgerStore.getLedgerEndIndex() + ; Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex); if (pair == null) { checkAbnormalFuture(dLedgerStore.getLedgerEndIndex()); waitForRunning(); return; } PushEntryRequest request = pair.getKey(); handleDoAppend(nextIndex, request, pair.getValue()); } } catch (Throwable t) { DLedgerEntryPusher.logger.error("Error in {}", getName(), t); DLedgerUtils.sleep(); } }
代码@1:如果当前节点的状态不是从节点,则跳出。
代码@2:如果 compareOrTruncateRequests 队列不为空,说明有COMMIT、COMPARE、TRUNCATE 等请求,这类请求优先处理。值得注意的是这里使用是 peek、poll 等非阻塞方法,然后根据请求的类型,调用对应的方法。稍后详细介绍。
代码@3:如果只有 append 类请求,则根据当前节点最大的消息序号,尝试从 writeRequestMap 容器中,获取下一个消息复制请求(ledgerEndIndex + 1) 为 key 去查找。如果不为空,则执行 doAppend 请求,如果为空,则调用 checkAbnormalFuture 来处理异常情况。
接下来我们来重点分析各个处理细节。
3.3.1 handleDoCommit
处理提交请求,其处理比较简单,就是调用 DLedgerStore 的 updateCommittedIndex 更新其已提交偏移量,故我们还是具体看一下DLedgerStore 的 updateCommittedIndex 方法。
DLedgerMmapFileStore#updateCommittedIndex
public void updateCommittedIndex(long term, long newCommittedIndex) { // @1 if (newCommittedIndex == - || ledgerEndIndex == - || term < memberState.currTerm() || newCommittedIndex == this.committedIndex) { // @2 return; } if (newCommittedIndex < this.committedIndex || newCommittedIndex < this.ledgerBeginIndex) { // @3 logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex); return; } long endIndex = ledgerEndIndex; if (newCommittedIndex > endIndex) { // @4 //If the node fall behind too much, the committedIndex will be larger than enIndex. newCommittedIndex = endIndex; } DLedgerEntry dLedgerEntry = get(newCommittedIndex); // @5 PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.DISK_ERROR); this.committedIndex = newCommittedIndex; this.committedPos = dLedgerEntry.getPos() + dLedgerEntry.getSize(); // @6 }
代码@1:首先介绍一下方法的参数:
- long term 主节点当前的投票轮次。
- long newCommittedIndex: 主节点发送日志复制请求时的已提交日志序号。
代码@2:如果待更新提交序号为 -1 或 投票轮次小于从节点的投票轮次或主节点投票轮次等于从节点的已提交序号,则直接忽略本次提交动作。
代码@3:如果主节点的已提交日志序号小于从节点的已提交日志序号或待提交序号小于当前节点的最小有效日志序号,则输出警告日志[MONITOR],并忽略本次提交动作。
代码@4:如果从节点落后主节点太多,则重置 提交索引为从节点当前最大有效日志序号。
代码@5:尝试根据待提交序号从从节点查找数据,如果数据不存在,则抛出 DISK_ERROR 错误。
代码@6:更新 commitedIndex、committedPos 两个指针,DledgerStore会定时将已提交指针刷入 checkpoint 文件,达到持久化 commitedIndex 指针的目的。
3.3.2 handleDoCompare
处理主节点发送过来的 COMPARE 请求,其实现也比较简单,最终调用 buildResponse 方法构造响应结果。
EntryHandler#buildResponse
private PushEntryResponse buildResponse(PushEntryRequest request, int code) { PushEntryResponse response = new PushEntryResponse(); response.setGroup(request.getGroup()); response.setCode(code); response.setTerm(request.getTerm()); if (request.getType() != PushEntryRequest.Type.COMMIT) { response.setIndex(request.getEntry().getIndex()); } response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); response.setEndIndex(dLedgerStore.getLedgerEndIndex()); return response; }
主要也是返回当前从几点的 ledgerBeginIndex、ledgerEndIndex 以及投票轮次,供主节点进行判断比较。
3.3.3 handleDoTruncate
handleDoTruncate 方法实现比较简单,删除从节点上 truncateIndex 日志序号之后的所有日志,具体调用dLedgerStore 的 truncate 方法,由于其存储与 RocketMQ 的存储设计基本类似故本文就不在详细介绍,简单介绍其实现要点:根据日志序号,去定位到日志文件,如果命中具体的文件,则修改相应的读写指针、刷盘指针等,并将所在在物理文件之后的所有文件删除。大家如有兴趣,可以查阅笔者的《RocketMQ技术内幕》第4章:RocketMQ 存储相关内容。
3.3.4 handleDoAppend
private void handleDoAppend(long writeIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE); DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId()); PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE); future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex()); } catch (Throwable t) { logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } }
其实现也比较简单,调用DLedgerStore 的 appendAsFollower 方法进行日志的追加,与appendAsLeader 在日志存储部分相同,只是从节点无需再转发日志。
3.3.5 checkAbnormalFuture
该方法是本节的重点,doWork 的从服务器存储的最大有效日志序号(ledgerEndIndex) + 1 序号,尝试从待写请求中获取不到对应的请求时调用,这种情况也很常见,例如主节点并么有将最新的数据 PUSH 给从节点。接下来我们详细来看看该方法的实现细节。
EntryHandler#checkAbnormalFuture
if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < ) { return; } lastCheckFastForwardTimeMs = System.currentTimeMillis(); if (writeRequestMap.isEmpty()) { return; }
Step1:如果上一次检查的时间距现在不到1s,则跳出;如果当前没有积压的append请求,同样跳出,因为可以同样明确的判断出主节点还未推送日志。
EntryHandler#checkAbnormalFuture
for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) { long index = pair.getKey().getEntry().getIndex(); // @1 //Fall behind if (index <= endIndex) { // @2 try { DLedgerEntry local = dLedgerStore.get(index); PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE); pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode())); logger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex); } catch (Throwable t) { logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t); pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } writeRequestMap.remove(index); continue; } //Just OK if (index == endIndex + ) { // @3 //The next entry is coming, just return return; } //Fast forward TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue(); // @4 if (!future.isTimeOut()) { continue; } if (index < minFastForwardIndex) { // @5 minFastForwardIndex = index; } }
Step2:遍历当前待写入的日志追加请求(主服务器推送过来的日志复制请求),找到需要快速快进的的索引。其关键实现点如下:
- 代码@1:首先获取待写入日志的序号。
- 代码@2:如果待写入的日志序号小于从节点已追加的日志(endIndex),并且日志的确已存储在从节点,则返回成功,并输出警告日志【PushFallBehind】,继续监测下一条待写入日志。
- 代码@3:如果待写入 index 等于 endIndex + 1,则结束循环,因为下一条日志消息已经在待写入队列中,即将写入。
- 代码@4:如果待写入 index 大于 endIndex + 1,并且未超时,则直接检查下一条待写入日志。
- 代码@5:如果待写入 index 大于 endIndex + 1,并且已经超时,则记录该索引,使用 minFastForwardIndex 存储。
EntryHandler#checkAbnormalFuture
if (minFastForwardIndex == Long.MAX_VALUE) { return; } Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex); if (pair == null) { return; }
Step3:如果未找到需要快速失败的日志序号或 writeRequestMap 中未找到其请求,则直接结束检测。
EntryHandler#checkAbnormalFuture
logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex); pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
Step4:则向主节点报告从节点已经与主节点发生了数据不一致,从节点并没有写入序号 minFastForwardIndex 的日志。如果主节点收到此种响应,将会停止日志转发,转而向各个从节点发送 COMPARE 请求,从而使数据恢复一致。
行为至此,已经详细介绍了主服务器向从服务器发送请求,从服务做出响应,那接下来就来看一下,服务端收到响应结果后的处理,我们要知道主节点会向它所有的从节点传播日志,主节点需要在指定时间内收到超过集群一半节点的确认,才能认为日志写入成功,那我们接下来看一下其实现过程。
4、QuorumAckChecker
日志复制投票器,一个日志写请求只有得到集群内的的大多数节点的响应,日志才会被提交。
4.1 类图

其核心属性如下:
- long lastPrintWatermarkTimeMs 上次打印水位线的时间戳,单位为毫秒。
- long lastCheckLeakTimeMs 上次检测泄漏的时间戳,单位为毫秒。
- long lastQuorumIndex 已投票仲裁的日志序号。
4.2 doWork 详解
QuorumAckChecker#doWork
if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > ) { logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm)); lastPrintWatermarkTimeMs = System.currentTimeMillis(); }
Step1:如果离上一次打印 watermak 的时间超过3s,则打印一下当前的 term、ledgerBegin、ledgerEnd、committed、peerWaterMarksByTerm 这些数据日志。
QuorumAckChecker#doWork
if (!memberState.isLeader()) { // @2 waitForRunning(); return; }
Step2:如果当前节点不是主节点,直接返回,不作为。
QuorumAckChecker#doWork
if (pendingAppendResponsesByTerm.size() > ) { // @1 for (Long term : pendingAppendResponsesByTerm.keySet()) { if (term == currTerm) { continue; } for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) { AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setIndex(futureEntry.getKey()); response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode()); response.setLeaderId(memberState.getLeaderId()); logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm); futureEntry.getValue().complete(response); } pendingAppendResponsesByTerm.remove(term); } } if (peerWaterMarksByTerm.size() > ) { for (Long term : peerWaterMarksByTerm.keySet()) { if (term == currTerm) { continue; } logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm); peerWaterMarksByTerm.remove(term); } }
Step3:清理pendingAppendResponsesByTerm、peerWaterMarksByTerm 中本次投票轮次的数据,避免一些不必要的内存使用。
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm); long quorumIndex = -; for (Long index : peerWaterMarks.values()) { // @1 int num = ; for (Long another : peerWaterMarks.values()) { // @2 if (another >= index) { num++; } } if (memberState.isQuorum(num) && index > quorumIndex) { // @3 quorumIndex = index; } } dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); // @4
Step4:根据各个从节点反馈的进度,进行仲裁,确定已提交序号。为了加深对这段代码的理解,再来啰嗦一下 peerWaterMarks 的作用,存储的是各个从节点当前已成功追加的日志序号。例如一个三节点的 DLedger 集群,peerWaterMarks 数据存储大概如下:
{ “dledger_group_01_0” : , "dledger_group_01_1" : , }
其中 dledger_group_01_0 为从节点1的ID,当前已复制的序号为 100,而 dledger_group_01_1 为节点2的ID,当前已复制的序号为 101。再加上主节点,如何确定可提交序号呢?
- 代码@1:首先遍历 peerWaterMarks 的 value 集合,即上述示例中的 {100, 101},用临时变量 index 来表示待投票的日志序号,需要集群内超过半数的节点的已复制序号超过该值,则该日志能被确认提交。
- 代码@2:遍历 peerWaterMarks 中的所有已提交序号,与当前值进行比较,如果节点的已提交序号大于等于待投票的日志序号(index),num 加一,表示投赞成票。
- 代码@3:对 index 进行仲裁,如果超过半数 并且 index 大于 quorumIndex,更新 quorumIndex 的值为 index。quorumIndex 经过遍历的,得出当前最大的可提交日志序号。
- 代码@4:更新 committedIndex 索引,方便 DLedgerStore 定时将 committedIndex 写入 checkpoint 中。
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm); boolean needCheck = false; int ackNum = ; if (quorumIndex >= ) { for (Long i = quorumIndex; i >= ; i--) { // @1 try { CompletableFuture<AppendEntryResponse> future = responses.remove(i); // @2 if (future == null) { // @3 needCheck = lastQuorumIndex != - && lastQuorumIndex != quorumIndex && i != lastQuorumIndex; break; } else if (!future.isDone()) { // @4 AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setTerm(currTerm); response.setIndex(i); response.setLeaderId(memberState.getSelfId()); response.setPos(((AppendFuture) future).getPos()); future.complete(response); } ackNum++; // @5 } catch (Throwable t) { logger.error("Error in ack to index={} term={}", i, currTerm, t); } } }
Step5:处理 quorumIndex 之前的挂起请求,需要发送响应到客户端,其实现步骤:
- 代码@1:从 quorumIndex 开始处理,没处理一条,该序号减一,直到大于0或主动退出,请看后面的退出逻辑。
- 代码@2:responses 中移除该日志条目的挂起请求。
- 代码@3:如果未找到挂起请求,说明前面挂起的请求已经全部处理完毕,准备退出,退出之前再 设置 needCheck 的值,其依据如下(三个条件必须同时满足):
- 最后一次仲裁的日志序号不等于-1
- 并且最后一次不等于本次新仲裁的日志序号
- 最后一次仲裁的日志序号不等于最后一次仲裁的日志。正常情况一下,条件一、条件二通常为true,但这一条大概率会返回false。
- 代码@4:向客户端返回结果。
- 代码@5:ackNum,表示本次确认的数量。
if (ackNum == ) { for (long i = quorumIndex + ; i < Integer.MAX_VALUE; i++) { TimeoutFuture<AppendEntryResponse> future = responses.get(i); if (future == null) { break; } else if (future.isTimeOut()) { AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode()); response.setTerm(currTerm); response.setIndex(i); response.setLeaderId(memberState.getSelfId()); future.complete(response); } else { break; } } waitForRunning(); }
Step6:如果本次确认的个数为0,则尝试去判断超过该仲裁序号的请求,是否已经超时,如果已超时,则返回超时响应结果。
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > || needCheck) { updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex()); for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) { if (futureEntry.getKey() < quorumIndex) { AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setTerm(currTerm); response.setIndex(futureEntry.getKey()); response.setLeaderId(memberState.getSelfId()); response.setPos(((AppendFuture) futureEntry.getValue()).getPos()); futureEntry.getValue().complete(response); responses.remove(futureEntry.getKey()); } } lastCheckLeakTimeMs = System.currentTimeMillis(); }
Step7:检查是否发送泄漏。其判断泄漏的依据是如果挂起的请求的日志序号小于已提交的序号,则移除。
Step8:一次日志仲裁就结束了,最后更新 lastQuorumIndex 为本次仲裁的的新的提交值。