Netty 學習(十):ChannelPipeline源碼說明
Netty 學習(十):ChannelPipeline源碼說明
作者: Grey
原文地址:
部落格園:Netty 學習(十):ChannelPipeline源碼說明
CSDN:Netty 學習(十):ChannelPipeline源碼說明
ChannelPipeline可以看作一條流水線,原料(位元組流)進來,經過加工,形成一個個Java對象,然後基於這些對象進行處理,最後輸出二進位位元組流。
ChannelPipeline 在創建 NioSocketChannel 的時候創建, 其默認實現是 DefaultChannelPipeline
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
ChannelPipeline 中保存了 Channel 的引用,且其中每個節點都是一個 ChannelHandlerContext 對象。每個 ChannelHandlerContext 節點都保存了執行器(即:ChannelHandler)。
ChannelPipeline里有兩種不同的節點,一種是 ChannelInboundHandler,處理 inbound 事件(例如:讀取數據流),還有一種是 ChannelOutboundHandler,處理 Outbound 事件,比如調用writeAndFlush()類方法時,就會調用該 handler。
添加 handler 的邏輯如下
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢查是否有重複的 handler
checkMultiplicity(handler);
// 創建 節點
newCtx = newContext(group, filterName(name, handler), handler);
// 添加節點
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 回調用戶方法
callHandlerAdded0(newCtx);
return this;
}
如上程式碼,整個添加過程見注釋說明,其實主要就是四步:
第一步:檢查是否有重複的 handler,核心邏輯見
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
// 非共享的且添加過,就拋出異常,反之,如果一個 handler 支援共享,就可以無限次被添加到 ChannelPipeline 中
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
第二步:創建節點,即把 handler 包裹成 ChannelHandlerContext,核心邏輯如下
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() {
return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
// any name conflicts. Note that we don't cache the names generated here.
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
註:Netty 使用 FastThreadLocal 變數來快取 Handler 的類和名稱的映射關係,在生成 name 的時候,首先看快取中有沒有生成過默認 name,如果沒有生成,就調用generateName0()
生成默認名稱,加入快取。
第三步:把 ChannelHandlerContext 作為節點添加到 pipeline 中,核心程式碼如下
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
其本質就是一個雙向鏈表的插入節點過程,而且,ChannelPipeline 刪除 ChannelHandler 的方法,本質就是把這個雙向鏈表的某個節點刪掉!
第四步:回調用戶方法,核心程式碼如下
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
final void callHandlerRemoved() throws Exception {
try {
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}
其中ctx.callHandlerAdded();
就是回調用戶的handlerAdded
方法,然後通過 CAS 方式修改節點的狀態為 REMOVE_COMPLETE (說明該節點已經被移除),或者 ADD_COMPLETE (添加完成)。
完整程式碼見:hello-netty
本文所有圖例見:processon: Netty學習筆記
更多內容見:Netty專欄