Netty源碼分析 (四)—– ChannelPipeline

  • 2019 年 10 月 3 日
  • 筆記

netty在服務端埠綁定和新連接建立的過程中會建立相應的channel,而與channel的動作密切相關的是pipeline這個概念,pipeline像是可以看作是一條流水線,原始的原料(位元組流)進來,經過加工,最後輸出

pipeline 初始化

在上一篇文章中,我們已經知道了創建NioSocketChannel的時候會將netty的核心組件創建出來

pipeline是其中的一員,在下面這段程式碼中被創建

protected AbstractChannel(Channel parent) {      this.parent = parent;      id = newId();      unsafe = newUnsafe();      pipeline = newChannelPipeline();  }

protected DefaultChannelPipeline newChannelPipeline() {      return new DefaultChannelPipeline(this);  }

NioSocketChannel中保存了pipeline的引用

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {      this.channel = ObjectUtil.checkNotNull(channel, "channel");      tail = new TailContext(this);      head = new HeadContext(this);        head.next = tail;      tail.prev = head;  }

pipeline中保存了channel的引用,創建完pipeline之後,整個pipeline是這個樣子的

 

pipeline中的每個節點是一個ChannelHandlerContext對象,每個context節點保存了它包裹的執行器 ChannelHandler 執行操作所需要的上下文,其實就是pipeline,因為pipeline包含了channel的引用,可以拿到所有的context資訊

pipeline添加節點

下面是一段非常常見的客戶端程式碼

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {       @Override       public void initChannel(SocketChannel ch) throws Exception {           ChannelPipeline p = ch.pipeline();           p.addLast(new Spliter())           p.addLast(new Decoder());           p.addLast(new BusinessHandler())           p.addLast(new Encoder());       }  });

首先,用一個spliter將來源TCP數據包拆包,然後將拆出來的包進行decoder,傳入業務處理器BusinessHandler,業務處理完encoder,輸出

整個pipeline結構如下

 

我用兩種顏色區分了一下pipeline中兩種不同類型的節點,一個是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取數據流,加工處理;還有一種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,比如當調用writeAndFlush()類方法時,就會經過該種類型的handler

不管是哪種類型的handler,其外層對象 ChannelHandlerContext 之間都是通過雙向鏈表連接,而區分一個 ChannelHandlerContext到底是in還是out,在添加節點的時候我們就可以看到netty是怎麼處理的

DefaultChannelPipeline

@Override  public final ChannelPipeline addLast(ChannelHandler... handlers) {      return addLast(null, handlers);  }

@Override  public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {      for (ChannelHandler h: handlers) {          addLast(executor, null, h);      }      return this;  }

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {      final AbstractChannelHandlerContext newCtx;      synchronized (this) {          // 1.檢查是否有重複handler          checkMultiplicity(handler);          // 2.創建節點          newCtx = newContext(group, filterName(name, handler), handler);          // 3.添加節點          addLast0(newCtx);      }        // 4.回調用戶方法      callHandlerAdded0(handler);        return this;  }

這裡簡單地用synchronized方法是為了防止多執行緒並發操作pipeline底層的雙向鏈表

我們還是逐步分析上面這段程式碼

檢查是否有重複handler

在用戶程式碼添加一條handler的時候,首先會查看該handler有沒有添加過

private static void checkMultiplicity(ChannelHandler handler) {      if (handler instanceof ChannelHandlerAdapter) {          ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;          if (!h.isSharable() && h.added) {              throw new ChannelPipelineException(                      h.getClass().getName() +                      " is not a @Sharable handler, so can't be added or removed multiple times.");          }          h.added = true;      }  }

netty使用一個成員變數added標識一個channel是否已經添加,上面這段程式碼很簡單,如果當前要添加的Handler是非共享的,並且已經添加過,那就拋出異常,否則,標識該handler已經添加

由此可見,一個Handler如果是sharable的,就可以無限次被添加到pipeline中,我們客戶端程式碼如果要讓一個Handler被共用,只需要加一個@Sharable標註即可,如下

@Sharable  public class BusinessHandler {    }

而如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個

isSharable() 方法正是通過該Handler對應的類是否標註@Sharable來實現的

ChannelHandlerAdapter

public boolean isSharable() {     Class<?> clazz = getClass();      Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();      Boolean sharable = cache.get(clazz);      if (sharable == null) {          sharable = clazz.isAnnotationPresent(Sharable.class);          cache.put(clazz, sharable);      }      return sharable;  }

通過反射判斷是否有Sharable.class註解

創建節點

回到主流程,看創建上下文這段程式碼

newCtx = newContext(group, filterName(name, handler), handler);

這裡我們需要先分析 filterName(name, handler) 這段程式碼,這個函數用於給handler創建一個唯一性的名字

private String filterName(String name, ChannelHandler handler) {      if (name == null) {          return generateName(handler);      }      checkDuplicateName(name);      return name;  }

顯然,我們傳入的name為null,netty就給我們生成一個默認的name,否則,檢查是否有重名,檢查通過的話就返回

netty創建默認name的規則為 簡單類名#0,下面我們來看些具體是怎麼實現的

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =          new FastThreadLocal<Map<Class<?>, String>>() {      @Override      protected Map<Class<?>, String> initialValue() throws Exception {          return new WeakHashMap<Class<?>, String>();      }  };    private String generateName(ChannelHandler handler) {      // 先查看快取中是否有生成過默認name      Map<Class<?>, String> cache = nameCaches.get();      Class<?> handlerType = handler.getClass();      String name = cache.get(handlerType);      // 沒有生成過,就生成一個默認name,加入快取       if (name == null) {          name = generateName0(handlerType);          cache.put(handlerType, name);      }        // 生成完了,還要看默認name有沒有衝突      if (context0(name) != null) {          String baseName = name.substring(0, name.length() - 1);          for (int i = 1;; i ++) {              String newName = baseName + i;              if (context0(newName) == null) {                  name = newName;                  break;              }          }      }      return name;  }

netty使用一個 FastThreadLocal(後面的文章會細說)變數來快取Handler的類和默認名稱的映射關係,在生成name的時候,首先查看快取中有沒有生成過默認name(簡單類名#0),如果沒有生成,就調用generateName0()生成默認name,然後加入快取

接下來還需要檢查name是否和已有的name有衝突,調用context0(),查找pipeline裡面有沒有對應的context

private AbstractChannelHandlerContext context0(String name) {      AbstractChannelHandlerContext context = head.next;      while (context != tail) {          if (context.name().equals(name)) {              return context;          }          context = context.next;      }      return null;  }

context0()方法鏈表遍歷每一個 ChannelHandlerContext,只要發現某個context的名字與待添加的name相同,就返回該context,最後拋出異常,可以看到,這個其實是一個線性搜索的過程

如果context0(name) != null 成立,說明現有的context裡面已經有了一個默認name,那麼就從 簡單類名#1 往上一直找,直到找到一個唯一的name,比如簡單類名#3

如果用戶程式碼在添加Handler的時候指定了一個name,那麼要做到事僅僅為檢查一下是否有重複

private void checkDuplicateName(String name) {      if (context0(name) != null) {          throw new IllegalArgumentException("Duplicate handler name: " + name);      }  }

處理完name之後,就進入到創建context的過程,由前面的調用鏈得知,group為null,因此childExecutor(group)也返回null

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {      return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);  }    private EventExecutor childExecutor(EventExecutorGroup group) {      if (group == null) {          return null;      }      //..  }

DefaultChannelHandlerContext

DefaultChannelHandlerContext(          DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {      super(pipeline, executor, name, isInbound(handler), isOutbound(handler));      if (handler == null) {          throw new NullPointerException("handler");      }      this.handler = handler;  }

構造函數中,DefaultChannelHandlerContext將參數回傳到父類,保存Handler的引用,進入到其父類

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,                                boolean inbound, boolean outbound) {      this.name = ObjectUtil.checkNotNull(name, "name");      this.pipeline = pipeline;      this.executor = executor;      this.inbound = inbound;      this.outbound = outbound;  }

netty中用兩個欄位來表示這個channelHandlerContext屬於inBound還是outBound,或者兩者都是,兩個boolean是通過下面兩個小函數來判斷(見上面一段程式碼)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {      return handler instanceof ChannelInboundHandler;  }    private static boolean isOutbound(ChannelHandler handler) {      return handler instanceof ChannelOutboundHandler;  }

通過instanceof關鍵字根據介面類型來判斷,因此,如果一個Handler實現了兩類介面,那麼他既是一個inBound類型的Handler,又是一個outBound類型的Handler,比如下面這個類

 

常用的,將decode操作和encode操作合併到一起的codec,一般會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {        protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)          throws Exception;        protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)          throws Exception;  }

context 創建完了之後,接下來終於要將創建完畢的context加入到pipeline中去了

添加節點

private void addLast0(AbstractChannelHandlerContext newCtx) {      AbstractChannelHandlerContext prev = tail.prev;      newCtx.prev = prev; // 1      newCtx.next = tail; // 2      prev.next = newCtx; // 3      tail.prev = newCtx; // 4  }

用下面這幅圖可見簡單的表示這段過程,說白了,其實就是一個雙向鏈表的插入操作

操作完畢,該context就加入到pipeline中

到這裡,pipeline添加節點的操作就完成了,你可以根據此思路掌握所有的addxxx()系列方法

回調用戶方法

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {      ctx.handler().handlerAdded(ctx);      ctx.setAddComplete();  }

到了第四步,pipeline中的新節點添加完成,於是便開始回調用戶程式碼 ctx.handler().handlerAdded(ctx);,常見的用戶程式碼如下

public class DemoHandler extends SimpleChannelInboundHandler<...> {      @Override      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {          // 節點被添加完畢之後回調到此          // do something      }  }

接下來,設置該節點的狀態

AbstractChannelHandlerContext

final void setAddComplete() {      for (;;) {          int oldState = handlerState;          if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {              return;          }      }  }

用cas修改節點的狀態至:REMOVE_COMPLETE(說明該節點已經被移除) 或者 ADD_COMPLETE

pipeline刪除節點

netty 有個最大的特性之一就是Handler可插拔,做到動態編織pipeline,比如在首次建立連接的時候,需要通過進行許可權認證,在認證通過之後,就可以將此context移除,下次pipeline在傳播事件的時候就就不會調用到許可權認證處理器

下面是許可權認證Handler最簡單的實現,第一個數據包傳來的是認證資訊,如果校驗通過,就刪除此Handler,否則,直接關閉連接

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {      @Override      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {          if (verify(authDataPacket)) {              ctx.pipeline().remove(this);          } else {              ctx.close();          }      }        private boolean verify(ByteBuf byteBuf) {          //...      }  }

重點就在 ctx.pipeline().remove(this) 這段程式碼

@Override  public final ChannelPipeline remove(ChannelHandler handler) {      remove(getContextOrDie(handler));        return this;  }

remove操作相比add簡單不少,分為三個步驟:

1.找到待刪除的節點
2.調整雙向鏈表指針刪除
3.回調用戶函數

找到待刪除的節點

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {      AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);      if (ctx == null) {          throw new NoSuchElementException(handler.getClass().getName());      } else {          return ctx;      }  }    @Override  public final ChannelHandlerContext context(ChannelHandler handler) {      if (handler == null) {          throw new NullPointerException("handler");      }        AbstractChannelHandlerContext ctx = head.next;      for (;;) {            if (ctx == null) {              return null;          }            if (ctx.handler() == handler) {              return ctx;          }            ctx = ctx.next;      }  }

這裡為了找到Handler對應的context,照樣是通過依次遍歷雙向鏈表的方式,直到某一個context的Handler和當前Handler相同,便找到了該節點

調整雙向鏈表指針刪除

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {      assert ctx != head && ctx != tail;        synchronized (this) {          // 2.調整雙向鏈表指針刪除          remove0(ctx);      }      // 3.回調用戶函數      callHandlerRemoved0(ctx);      return ctx;  }    private static void remove0(AbstractChannelHandlerContext ctx) {      AbstractChannelHandlerContext prev = ctx.prev;      AbstractChannelHandlerContext next = ctx.next;      prev.next = next; // 1      next.prev = prev; // 2  }

經歷的過程要比添加節點要簡單,可以用下面一幅圖來表示

 

最後的結果為

 

結合這兩幅圖,可以很清晰地了解許可權驗證Handler的工作原理,另外,被刪除的節點因為沒有對象引用到,果過段時間就會被gc自動回收

回調用戶函數

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {      try {          ctx.handler().handlerRemoved(ctx);      } finally {          ctx.setRemoved();      }  }

到了第三步,pipeline中的節點刪除完成,於是便開始回調用戶程式碼 ctx.handler().handlerRemoved(ctx);,常見的程式碼如下

public class DemoHandler extends SimpleChannelInboundHandler<...> {      @Override      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {          // 節點被刪除完畢之後回調到此,可做一些資源清理          // do something      }  }

最後,將該節點的狀態設置為removed

final void setRemoved() {      handlerState = REMOVE_COMPLETE;  }

總結

1、在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應。

2、ChannelPipeline是一個維護了一個以 AbstractChannelHandlerContext 為節點的雙向鏈表,其中此鏈表是 以head(HeadContext)作為頭,以tail(TailContext)作為尾的雙向鏈表.

3、pipeline中的每個節點包著具體的處理器ChannelHandler,節點根據ChannelHandler的類型是ChannelInboundHandler還是ChannelOutboundHandler來判斷該節點屬於in還是out或者兩者都是