RocketMQ源碼詳解 | Broker篇 · 其一:執行緒模型與接收鏈路

概述

在上一節 RocketMQ源碼詳解 | Producer篇 · 其二:消息組成、發送鏈路 中,我們終於將消息發送出了 Producer,在短暫的 tcp 握手後,很快它就會進入目的 Broker。這次我們來自底向上的看下 Broker 端是如何接收然後分發處理消息,同時了解 RocketMQ 的 Broker 的執行緒模型。

Netty 組件

如果你還記得上一節的內容的話那應該知道,NettyRomotingAbstract 有兩個實現類,分別是 NettyRemotingClientNettyRemotingServer ,我們已經知道了前者的實現,現在我們再來看看後者

NettyRemotingServer



這個類很長,我們先來看它的屬性

/*    引導類和dispatch執行緒與select執行緒池   */
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
// 配置類
private final NettyServerConfig nettyServerConfig;

// 用來執行 callback 函數的執行緒池
private final ExecutorService publicExecutor;
// 自定義的 Channel 事件監聽器
private final ChannelEventListener channelEventListener;

// 掃描已經超時的 ResponseFeature
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 工作執行緒
private DefaultEventExecutorGroup defaultEventExecutorGroup;


private int port = 0;

private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";

// sharable handlers
private HandshakeHandler handshakeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;

我們主要關心 serverBootStrap 的啟動

首先是它的初始化,初始化程式碼較長,主要做了三件事:

  1. 初始化 callback 函數執行執行緒池
  2. 在 Linux 平台上啟用 epoll
  3. 使用可能存在的 SSL

然後是重頭戲,其具體的創建

ServerBootstrap childHandler =
  this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  // 半連接隊列長度
  .option(ChannelOption.SO_BACKLOG, 1024)
  // 開啟內核中的 net.ipv4.tcp_tw_reuse 選項
  .option(ChannelOption.SO_REUSEADDR, true)
  // 關閉作業系統的連接維護,由自己去干
  .option(ChannelOption.SO_KEEPALIVE, false)
  // 禁用 Nagle 演算法
  .childOption(ChannelOption.TCP_NODELAY, true)
  // 設定發送緩衝區和接收緩衝區大小
  .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
  .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
  // 設置監聽埠(0.0.0.0:xx)
  .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline()
        // 設置握手處理器
        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
        .addLast(defaultEventExecutorGroup,
                 // 設置編解碼器
                 encoder,
                 new NettyDecoder(),
                 // 註冊 Netty 的心跳檢查
                 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                 // 管理連接,超時處理,維護channelTables與存活的連接
                 connectionManageHandler,
                 // 實際上的處理收到的請求
                 serverHandler
                );
    }
  });

這裡需要關注的點很多,我們按照順序來看

首先是執行緒模型,在這裡我們可以看出它是 1(eventLoopGroupBoss) – N(eventLoopGroupSelector) – M(defaultEventExecutorGroup) 的執行緒模型,即有 一個 Acceptor,N 個 Select 執行緒,和 M 個 IO 執行緒。

如果了解過 Reactor 模型的話可以看出這屬於主從多 Reactor 模式,在 Nginx、Kakfa、Tomcat 都能看到類似的設計。

然後需要關注的是 SO_BACKLOG,這裡指定了半隊列的長度為 1024

backlog

在 TCP 的三次握手中,backlog 用於處理從 SYN RECEIVED 到 ESTABLISHED 狀態之間的套接字。

其中具有 SYN 隊列和 accept 隊列:

  • SYN 隊列

    長度由系統調整。

    當伺服器端收到一個 SYN 包時,將其放入 SYN 隊列並返回 ACK+SYN。隊滿則拋棄,客戶端超時後重發。

  • accept 隊列

    長度由程式調整(也就是我們通過 SO_BACKLOG 設置的長度)。

    當伺服器端收到之前自己發送的 SYN 的 ACK 時,會將套接字放入這裡。大多數時候這裡的數據可以很快的被程式通過 accept() 取出。隊滿時拋棄到來的 ACK 包(雖然客戶端已經進入了 ESTABLISHED 狀態,但由於 tcp 的慢啟動,並不會造成太大影響),客戶端重發到一定次數仍未被放入 accept 隊列時會被發送 RST 包。同時在 Linux 中,這裡隊滿時會對 SYN 隊列的接收速率進行控制。

再通過 SO_REUSEADDR 開啟了內核的 net.ipv4.tcp_tw_reuse 選項

net.ipv4.tcp_tw_reuse

這個選項主要用在具有大量短連接的應用。

問題:

在具有大量短連接時,伺服器端上具有太多屬於同一個客戶端的處於 TIME_WAIT 狀態的連接,而導致該客戶端不能建立新的連接。

處理方法:

在 Linux 中,TCP 的 TIME_WAIT 時間默認為 1 分鐘,而 TIME_WAIT 被設計出來的主要目的有兩個:

  1. 避免新的連接收到舊的連接的重發數據包
  2. 確保遠程端不是在 LAST_ACK 狀態

在開啟這個選項後,如果 TIME_WAIT 狀態的連接過多,會使用在 TCP 可選頭部中的時間戳選項,來和之前存儲的時間戳對比,若該大,則從 TIME_WAIT 狀態的存活連接中隨機選取一個並分配給該 TCP 連接。

對於需要解決問題 1,由於舊的連接的重發包具有過期的時間戳,所以會被丟棄;

對於問題 2 ,當處於 LAST_ACK 的一端收到新的 TCP 連接的 SYN 包後,會將其丟棄,然後重發 FIN 包,處於 SYN_SEND 狀態的一端收到這種錯誤的包後會發送 RST 包,然後再發送 SYN 包重試。

然後使用 SO_KEEPALIVE 關閉作業系統自帶的 KeepAlive 機制。

這是因為作業系統的連接維護默認為 2 小時,對其修改需要系統調用,且當協議被切換為 UDP 時會失效,故我們在後面使用了 IdleStateHandler 來註冊 Netty 自己實現的心跳檢測

接著將 TCP_NODELAY 設置為 True 來禁用 Nagle 演算法。

這是因為 Nagle 演算法會等待當前 TCP 的包到達了足夠的大小才會發送,這會造成發送延遲

再往後看可以發現是先註冊了 HandshakeHandler,我們來看它幹了什麼

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  // 標記當前位置以便恢復。因為我們接下來需要查看第一個位元組以確定內容是否以 TLS 握手開始
  msg.markReaderIndex();

  byte b = msg.getByte(0);

  // 握手的魔數,如果是說明這是個tls握手
  if (b == HANDSHAKE_MAGIC_CODE) {
    switch (tlsMode) {
        // 禁用 SSL
      case DISABLED:
        ctx.close();
        log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
        break;
        // 可用或必須使用 SSL
      case PERMISSIVE:
      case ENFORCING:
        if (null != sslContext) {
          // 添加 SSL handler
          ctx.pipeline()
            // SSL 隧道
            .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
            // 用來保證文件在零拷貝時也進入能被 SSL 加密
            .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
          log.info("Handlers prepended to channel pipeline to establish SSL connection");
        } else {
          ctx.close();
          log.error("Trying to establish an SSL connection but sslContext is null");
        }
        break;

      default:
        log.warn("Unknown TLS mode");
        break;
    }
  } else if (tlsMode == TlsMode.ENFORCING) {
    ctx.close();
    log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
  }

  // 恢復read索引,以便握手協商可以正常進行。
  msg.resetReaderIndex();

  try {
    // 完成 SSL 的判定後將被於本 pipeline 中移除
    ctx.pipeline().remove(this);
  } catch (NoSuchElementException e) {
    log.error("Error while removing HandshakeHandler", e);
  }

  // 交給下一個 handler
  ctx.fireChannelRead(msg.retain());
}

從程式碼我們可以知道,這個 Handler 用於判斷是否使用 SSL 對連接進行加密,有的話則使用

然後是我們之前提到過的 IdleStateHandler ,它的幾個參數分別是:

  1. 讀超時時間
  2. 寫超時時間
  3. 讀寫超時時間

而我們在這將 1 和 2 都設置為了 0,即不進行觸發

一旦超時,它將會產生 IdleStateEvent ,在下一個 Handler NettyConnectManageHandler 中,我們可以看到它被捕獲了

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof IdleStateEvent) {
    IdleStateEvent event = (IdleStateEvent) evt;
    if (event.state().equals(IdleState.ALL_IDLE)) {
      final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
      log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
      RemotingUtil.closeChannel(ctx.channel());
      if (NettyRemotingServer.this.channelEventListener != null) {
        NettyRemotingServer.this
          .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
      }
    }
  }

  ctx.fireUserEventTriggered(evt);
}

最後其他的組件都和上一章講過差不多,故不再重複。接下來主要看一個和 Client 不同的地方。

ChannelEventListener

在上一章了解 Client 時,NettyConnectManageHandler 中在每一個狀態中都有以下程式碼

if (NettyRemotingServer.this.channelEventListener != null) {
  NettyRemotingServer.this
    .putNettyEvent(new NettyEvent(NettyEventType./*  XXX  */, remoteAddress, ctx.channel()));
}

Client 由於沒有註冊 channelEventListener 而沒有使用,在 NettyRemotingServer 中則在執行構造器時註冊了 ClientHousekeepingService ,當然是 Broekr 端,還有一個是 BrokerHousekeepingService ,用於 NameServer

public void start() {
  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
      try {
        ClientHousekeepingService.this.scanExceptionChannel();
      } catch (Throwable e) {
        log.error("Error occurred when scan not active client channels.", e);
      }
    }
  }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}

private void scanExceptionChannel() {
  this.brokerController.getProducerManager().scanNotActiveChannel();
  this.brokerController.getConsumerManager().scanNotActiveChannel();
  this.brokerController.getFilterServerManager().scanNotActiveChannel();
}

從實現就能看出來,這個類是在定期掃描過期的 Channel 並移除,同時通過監聽事件在其 close、exception、idle 時移除

NettyRemotingAbstract

最後回到 NettyRemotingAbstractprocessRequestCommand 方法,雖然在上一節中已經看過了,不過我們再來詳細看一次

final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

首先我們可以知道在 processorTable 中存放著響應碼和其對應的請求處理器與執行執行緒池,如果沒有會使用默認處理器。

然後是使用其對應的執行緒池來執行業務請求,並使用處理回調函數

try {
  doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
  final RemotingResponseCallback callback = response -> { /* xxx */ };
  // 如果是非同步請求處理器,則將回調函數交給其
  if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
    AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
    processor.asyncProcessRequest(ctx, cmd, callback);
  } else {
    NettyRequestProcessor processor = pair.getObject1();
    RemotingCommand response = processor.processRequest(ctx, cmd);
    // 否則進行同步的調用
    callback.callback(response);
  }
} catch (Throwable e) {
	/* xxx  */
}

那麼,這些響應函數和執行緒池是在什麼時候放入的呢?通過追蹤,我們發現了 BrokerController 類,其在初始化時調用的 registerProcessor 函數如下:

// 用於處理消息的發送請求
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
 * PullMessageProcessor
 */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

/**
 * ReplyMessageProcessor
 */
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

/*  以下略   */

我們主要觀察到了幾個重點:

  1. 每一類業務處理都由該業務類型對應的執行緒池來處理

  2. 同時維護 remotingServer 和 fastRemotingServer 兩個處理服務

    如果你對在第一節提到過的 VIP 還有印象的話,應該可以想起 VIP 埠就是 普通埠號-2。而這裡的 fastRemotingServer,監控的就是 VIP 埠

至此,我們終於可以畫出 RocketMQ 在 Broker 端的執行緒模型了




Tags: