RocketMQ源碼詳解 | Broker篇 · 其一:執行緒模型與接收鏈路
概述
在上一節 RocketMQ源碼詳解 | Producer篇 · 其二:消息組成、發送鏈路 中,我們終於將消息發送出了 Producer,在短暫的 tcp 握手後,很快它就會進入目的 Broker。這次我們來自底向上的看下 Broker 端是如何接收然後分發處理消息,同時了解 RocketMQ 的 Broker 的執行緒模型。
Netty 組件
如果你還記得上一節的內容的話那應該知道,NettyRomotingAbstract
有兩個實現類,分別是 NettyRemotingClient
和 NettyRemotingServer
,我們已經知道了前者的實現,現在我們再來看看後者
NettyRemotingServer

這個類很長,我們先來看它的屬性
/* 引導類和dispatch執行緒與select執行緒池 */
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
// 配置類
private final NettyServerConfig nettyServerConfig;
// 用來執行 callback 函數的執行緒池
private final ExecutorService publicExecutor;
// 自定義的 Channel 事件監聽器
private final ChannelEventListener channelEventListener;
// 掃描已經超時的 ResponseFeature
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 工作執行緒
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// sharable handlers
private HandshakeHandler handshakeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;
我們主要關心 serverBootStrap 的啟動
首先是它的初始化,初始化程式碼較長,主要做了三件事:
- 初始化 callback 函數執行執行緒池
- 在 Linux 平台上啟用 epoll
- 使用可能存在的 SSL
然後是重頭戲,其具體的創建
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 半連接隊列長度
.option(ChannelOption.SO_BACKLOG, 1024)
// 開啟內核中的 net.ipv4.tcp_tw_reuse 選項
.option(ChannelOption.SO_REUSEADDR, true)
// 關閉作業系統的連接維護,由自己去干
.option(ChannelOption.SO_KEEPALIVE, false)
// 禁用 Nagle 演算法
.childOption(ChannelOption.TCP_NODELAY, true)
// 設定發送緩衝區和接收緩衝區大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 設置監聽埠(0.0.0.0:xx)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 設置握手處理器
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
// 設置編解碼器
encoder,
new NettyDecoder(),
// 註冊 Netty 的心跳檢查
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// 管理連接,超時處理,維護channelTables與存活的連接
connectionManageHandler,
// 實際上的處理收到的請求
serverHandler
);
}
});
這裡需要關注的點很多,我們按照順序來看
首先是執行緒模型,在這裡我們可以看出它是 1(eventLoopGroupBoss) – N(eventLoopGroupSelector) – M(defaultEventExecutorGroup) 的執行緒模型,即有 一個 Acceptor,N 個 Select 執行緒,和 M 個 IO 執行緒。
如果了解過 Reactor 模型的話可以看出這屬於主從多 Reactor 模式,在 Nginx、Kakfa、Tomcat 都能看到類似的設計。
然後需要關注的是 SO_BACKLOG
,這裡指定了半隊列的長度為 1024
backlog
在 TCP 的三次握手中,backlog 用於處理從 SYN RECEIVED 到 ESTABLISHED 狀態之間的套接字。
其中具有 SYN 隊列和 accept 隊列:
SYN 隊列
長度由系統調整。
當伺服器端收到一個 SYN 包時,將其放入 SYN 隊列並返回 ACK+SYN。隊滿則拋棄,客戶端超時後重發。
accept 隊列
長度由程式調整(也就是我們通過
SO_BACKLOG
設置的長度)。當伺服器端收到之前自己發送的 SYN 的 ACK 時,會將套接字放入這裡。大多數時候這裡的數據可以很快的被程式通過
accept()
取出。隊滿時拋棄到來的 ACK 包(雖然客戶端已經進入了 ESTABLISHED 狀態,但由於 tcp 的慢啟動,並不會造成太大影響),客戶端重發到一定次數仍未被放入 accept 隊列時會被發送 RST 包。同時在 Linux 中,這裡隊滿時會對 SYN 隊列的接收速率進行控制。
再通過 SO_REUSEADDR
開啟了內核的 net.ipv4.tcp_tw_reuse
選項
net.ipv4.tcp_tw_reuse
這個選項主要用在具有大量短連接的應用。
問題:
在具有大量短連接時,伺服器端上具有太多屬於同一個客戶端的處於
TIME_WAIT
狀態的連接,而導致該客戶端不能建立新的連接。處理方法:
在 Linux 中,TCP 的
TIME_WAIT
時間默認為 1 分鐘,而TIME_WAIT
被設計出來的主要目的有兩個:
- 避免新的連接收到舊的連接的重發數據包
- 確保遠程端不是在
LAST_ACK
狀態在開啟這個選項後,如果
TIME_WAIT
狀態的連接過多,會使用在 TCP 可選頭部中的時間戳選項,來和之前存儲的時間戳對比,若該大,則從TIME_WAIT
狀態的存活連接中隨機選取一個並分配給該 TCP 連接。對於需要解決問題 1,由於舊的連接的重發包具有過期的時間戳,所以會被丟棄;
對於問題 2 ,當處於
LAST_ACK
的一端收到新的 TCP 連接的 SYN 包後,會將其丟棄,然後重發 FIN 包,處於SYN_SEND
狀態的一端收到這種錯誤的包後會發送 RST 包,然後再發送 SYN 包重試。
然後使用 SO_KEEPALIVE
關閉作業系統自帶的 KeepAlive 機制。
這是因為作業系統的連接維護默認為 2 小時,對其修改需要系統調用,且當協議被切換為 UDP 時會失效,故我們在後面使用了 IdleStateHandler 來註冊 Netty 自己實現的心跳檢測
接著將 TCP_NODELAY
設置為 True 來禁用 Nagle 演算法。
這是因為 Nagle 演算法會等待當前 TCP 的包到達了足夠的大小才會發送,這會造成發送延遲
再往後看可以發現是先註冊了 HandshakeHandler
,我們來看它幹了什麼
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 標記當前位置以便恢復。因為我們接下來需要查看第一個位元組以確定內容是否以 TLS 握手開始
msg.markReaderIndex();
byte b = msg.getByte(0);
// 握手的魔數,如果是說明這是個tls握手
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
// 禁用 SSL
case DISABLED:
ctx.close();
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
break;
// 可用或必須使用 SSL
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
// 添加 SSL handler
ctx.pipeline()
// SSL 隧道
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
// 用來保證文件在零拷貝時也進入能被 SSL 加密
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish an SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// 恢復read索引,以便握手協商可以正常進行。
msg.resetReaderIndex();
try {
// 完成 SSL 的判定後將被於本 pipeline 中移除
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// 交給下一個 handler
ctx.fireChannelRead(msg.retain());
}
從程式碼我們可以知道,這個 Handler 用於判斷是否使用 SSL 對連接進行加密,有的話則使用
然後是我們之前提到過的 IdleStateHandler
,它的幾個參數分別是:
- 讀超時時間
- 寫超時時間
- 讀寫超時時間
而我們在這將 1 和 2 都設置為了 0,即不進行觸發
一旦超時,它將會產生 IdleStateEvent
,在下一個 Handler NettyConnectManageHandler
中,我們可以看到它被捕獲了
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
最後其他的組件都和上一章講過差不多,故不再重複。接下來主要看一個和 Client 不同的地方。
ChannelEventListener
在上一章了解 Client 時,NettyConnectManageHandler
中在每一個狀態中都有以下程式碼
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType./* XXX */, remoteAddress, ctx.channel()));
}
Client 由於沒有註冊 channelEventListener
而沒有使用,在 NettyRemotingServer
中則在執行構造器時註冊了 ClientHousekeepingService
,當然是 Broekr 端,還有一個是 BrokerHousekeepingService
,用於 NameServer
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
從實現就能看出來,這個類是在定期掃描過期的 Channel 並移除,同時通過監聽事件在其 close、exception、idle 時移除
NettyRemotingAbstract
最後回到 NettyRemotingAbstract
的 processRequestCommand
方法,雖然在上一節中已經看過了,不過我們再來詳細看一次
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
首先我們可以知道在 processorTable
中存放著響應碼和其對應的請求處理器與執行執行緒池,如果沒有會使用默認處理器。
然後是使用其對應的執行緒池來執行業務請求,並使用處理回調函數
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = response -> { /* xxx */ };
// 如果是非同步請求處理器,則將回調函數交給其
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
// 否則進行同步的調用
callback.callback(response);
}
} catch (Throwable e) {
/* xxx */
}
那麼,這些響應函數和執行緒池是在什麼時候放入的呢?通過追蹤,我們發現了 BrokerController
類,其在初始化時調用的 registerProcessor
函數如下:
// 用於處理消息的發送請求
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* ReplyMessageProcessor
*/
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
/* 以下略 */
我們主要觀察到了幾個重點:
-
每一類業務處理都由該業務類型對應的執行緒池來處理
-
同時維護 remotingServer 和 fastRemotingServer 兩個處理服務
如果你對在第一節提到過的 VIP 還有印象的話,應該可以想起 VIP 埠就是 普通埠號-2。而這裡的 fastRemotingServer,監控的就是 VIP 埠
至此,我們終於可以畫出 RocketMQ 在 Broker 端的執行緒模型了
