Java NIO、Channel、Selector 詳解

  • 2019 年 11 月 11 日
  • 筆記

Java NIO 有三大組件:

  1. Buffer
  2. Channel
  3. Selector

Buffer

Buffer 是一個特定原始類型的容器。Buffer 是一個原始類型的線性的、有限序列,除了 Buffer 存儲的內容外,關鍵屬性還包括:capacity, limit 和 position。

  • capacity:Buffer 包含的元素的數量,capacity 永遠不會為負,也不會改變。
  • limit:Buffer 中第一個不能讀取或寫入的元素索引。limit 永遠不會為負,且永遠小於等於 capacity
  • position:下一個待讀取、寫入的元素索引。position 永遠不會為負,且永遠小於等於 limit

每個基本類型(布爾類型除外),都有一個 Buffer 的子類。Java NIO 中 Buffer 的一些實現,其中最重要的是 ByteBuffer,其餘類如 IntBuffer 的實現類未畫出。

image

我個人理解,Buffer 就是一個記憶體數組,並通過 capacity, limit 和 position 三個變數對讀寫操作進行控制。

position、limit、capacity

Buffer 的屬性主要有:

// 恆等式: mark <= position <= limit <= capacity  private int mark = -1;  private int position = 0;  private int limit;  private int capacity;    // 僅在 direct buffers 中使用  long address;

ByteBuffer 中額外定義了位元組數組(其餘 Buffer 的子類同理):

// 該位元組數組僅在分配在堆上時才非空(參考下面的 Direct vs. non-direct buffers)  final byte[] hb;

Buffer 就是根據這 4 個 int 型欄位來配合記憶體數組的讀寫。這 4 個屬性分別為:

  • mark:臨時保存 position 的值,每次調用 mark() 方法都會將 mark 設值為當前的 position
  • capacity:Buffer 緩衝區容量,capacity 永遠不會為負,也不會改變。
  • limit:Buffer 中第一個不能讀取或寫入的元素索引。limit 永遠不會為負,且永遠小於等於 capacity。寫模式下,limit 代表的是最大能寫入的數據,limit = capacity;讀模式下,limit = Buffer 實際寫入的數據大小。
  • position:下一個待讀取、寫入的元素索引。position 永遠不會為負,且永遠小於等於 limit。

image

ByteBuffer

從上圖中我們可以看到,ByteBuffer 類有 2 個實現類:

  1. MappedByteBuffer:DirectByteBuffer 的抽象類,JVM 會儘可能交給本地方法操作 I/O,其記憶體不會分配在堆上,不會佔用應用程式的記憶體。
  2. HeapByteBuffer:顧名思義是存儲在堆上的 Buffer,我們直接調用 ByteBuffer.allocate(1024); 時會創建此類 Buffer。
public static ByteBuffer allocate(int capacity) {      if (capacity < 0)          throw new IllegalArgumentException();      return new HeapByteBuffer(capacity, capacity);  }

Direct vs. non-direct buffers

一個 byte buffer 可以是 direct,也可以是非 direct 的。對於 direct byte buffer,JVM 將盡量在本機上執行 I/O 操作。也就是說,JVM 盡量避免每次在調用作業系統 I/O 操作前,將緩衝區內容複製到中間緩衝區。

可以通過類中的 allocateDirect 工廠方法創建 direct buffer,這個方法創建的 direct buffer 通常比 non-direct buffer 具有更高的分配和釋放成本。Direct buffer 記憶體可能分配在 GC 堆的外部,所以對應用程式的記憶體佔用影響並不明顯。所以建議將 direct buffer 分配給大型、壽命長的、受底層作業系統 I/O 操作約束的緩衝區。

可以通過調用 isDirect 方法判斷 byte buffer 是否是 direct 的。

Buffer 初始化

Buffer 可以通過 allocation 方法創建,也可以通過位元組數組的 wrapping 方法創建並填充。

ByteBuffer byteBuf = ByteBuffer.allocate(1024);
public static ByteBuffer wrap(byte[] array) {      ...  }

填充 Buffer

// 填充一個 byte  public abstract ByteBuffer put(byte b);    // 在指定位置填充一個 byte  public abstract ByteBuffer put(int index, byte b);    // 批量將 src buffer 填充到本 buffer  public ByteBuffer put(ByteBuffer src) {      ...  }    // 批量將 src 數組的特定區間填充到本 buffer  public ByteBuffer put(byte[] src, int offset, int length) {      ...  }    // 批量將 src 數組填充到本 buffer  public final ByteBuffer put(byte[] src) {      ...  }

我們還可以將 Channel 的數據填充到 Buffer 中,數據是從外部(文件、網路)讀到記憶體中。

int read = channel.read(buffer);

讀取 Buffer

對於前面的寫操作,每寫一個值,position 都會自增 1,所以 position 會指向最後寫入位置的後面一位。

如果要讀取 Buffer 的值,需要調用 flip() 方法,從寫模式切換到讀模式

public final Buffer flip() {      limit = position; // 將 limit 設置為實際寫入的數據數量      position = 0; // 重置 position      mark = -1; // 將 mark 設置為未標記      return this;  }

讀操作的 get 方法如下:

// 讀取當前 position 的位元組,然後 position 自增 1  public abstract byte get();    // 讀取 index 的位元組( position 不會自增!)  public abstract byte get(int index);    // 批量將緩衝區數據傳遞到 dst 數組中,position 自增 dst 的長度  public ByteBuffer get(byte[] dst) {      ...  }    // 批量將緩衝區數據傳遞到 dst 數組中  public ByteBuffer get(byte[] dst, int offset, int length) {      ...  }

我們可以將緩衝區的數據傳輸到 Channel 中:

  1. 通過 FileChannel 將數據寫到文件中
  2. 通過 SocketChannel 將數據寫入網路,發送到遠程機器
int write = channel.write(buffer);

mark(), reset()

mark 用於臨時保存 position 的值,每次調用 mark() 方法都會將 position 賦值給 mark。

public final Buffer mark() {      mark = position;      return this;  }

reset() 方法就是將 position 賦值到上次 mark 的位置上(也就是上一次調用 mark() 方法的時候),通過 mark(), reset() 兩個方法的配合,我們可以重複讀取某個區間的數據。

public final Buffer reset() {      int m = mark;      if (m < 0)          throw new InvalidMarkException();      position = m;      return this;  }

注意 mark 構造初始化時數值是 -1,如果 >= 0 則表示可以讀取。

rewind(), clear(), compact()

rewind() 重置 position 為 0。通常在 channel-write 和 get 前調用此方法。

public final Buffer rewind() {      position = 0;      mark = -1;      return this;  }

clear() 會重置 position,將 limit 設置為最大值 capacity,並將 mark 置成 -1。通常在 channel-read 和填充此 buffer 時,會先調用此方法。

public final Buffer clear() {      position = 0;      limit = capacity;      mark = -1;      return this;  }

compact() 方法並不常用,忽略。

public abstract ByteBuffer compact();

恆等式

mark, position, limit和 capacity 永遠遵循以下關係:

0 <= mark <= position <= limit <= capacity

新創建的 buffer position = 0,mark 是未定義的(-1)。limit 的初始值可能是 0,也可能是構造時傳入的其他值。新分配的緩衝區元素都初始化為 0。

Channel

Channel 是 I/O 操作的「橋樑」。Channel 可以是對硬體設備、文件、網路套接字、程式組件等實體的連接,該實體能夠執行不同的 I/O 操作(讀取或寫入)。

Channel 只有 2 種狀態:開啟和關閉。在創建時就是開啟的,一旦關閉就不會再回到打開狀態。Channel 一旦關閉,任何對 Channel 調用的 I/O 操作都會拋出 ClosedChannelException 異常,可以通過方法 isOpen() 來檢測 Channel 是否開啟。

Channel 介面定義如下:

public interface Channel extends Closeable {        public boolean isOpen();        public void close() throws IOException;  }

image

  • FileChannel:文件通道,用於文件讀寫
  • DatagramChannel:UDP 連接
  • SocketChannel:TCP 連接通道,TCP 客戶端
  • ServerSocketChannel:TCP 對應的服務端,用於監聽某個埠進來的請求

讀操作:將數據從 Channel 讀取到 Buffer 中

int read = channel.read(buffer);

寫操作:將數據從 Buffer 寫入到 Channel 中

int write = channel.write(buffer);

FileChannel

讀取文件內容,詳細說明見注釋。

@Test  public void testFileChannelRead() throws IOException {      // 獲取文件的 FileChannel      FileInputStream fileInputStream = new FileInputStream("/abc");      FileChannel channel = fileInputStream.getChannel();        // 創建 ByteBuffer      ByteBuffer buffer = ByteBuffer.allocate(30);        // 將文件內容讀取到 buffer 中      channel.read(buffer);        // buffer 從寫模式,切換到讀模式      buffer.flip();        // 列印 buffer(文件)的內容      while (buffer.hasRemaining()) {          System.out.print((char)buffer.get());      }  }

寫入文件內容,詳細說明見注釋。

@Test  public void testFileChannelWrite() throws IOException {      // 獲取文件的 FileChannel      FileOutputStream fileOutputStream = new FileOutputStream("/abc");      FileChannel channel = fileOutputStream.getChannel();        // 創建 ByteBuffer      ByteBuffer buffer = ByteBuffer.allocate(30);      buffer.put("123456".getBytes());        // Buffer 切換為讀模式      buffer.flip();      while(buffer.hasRemaining()) {          // 將 Buffer 中的內容寫入文件          channel.write(buffer);      }  }

SocketChannel

SocketChannel 顧名思義,就是 Socket 的 Channel,能夠讀寫 Socket。操作緩衝區同 FileChannel。

@Test  public void testSocketChannel() throws IOException {      SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 80));        // 創建 ByteBuffer      ByteBuffer buffer = ByteBuffer.allocate(30);      // 讀取數據      socketChannel.read(buffer);        // 寫入數據到網路連接中      while(buffer.hasRemaining()) {          socketChannel.write(buffer);      }  }

ServerSocketChannel

ServerSocketChannel 用於監聽機器埠,管理從這個埠進來的 TCP 連接。

// 實例化  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // 監聽 8080 埠  serverSocketChannel.socket().bind(new InetSocketAddress(8080));    while (true) {      // 一旦有一個 TCP 連接進來,就對應創建一個 SocketChannel 進行處理      SocketChannel socketChannel = serverSocketChannel.accept();  }
這裡可以看到 SocketChannel 的另一種實例化方式,SocketChannel 可讀可寫,操作一個網路通道。

ServerSocketChannel 不和 Buffer 打交道了,因為它並不實際處理數據,它一旦接收到請求後,實例化 SocketChannel,之後在這個連接通道上的數據傳遞它就不管了,因為它需要繼續監聽埠,等待下一個連接。

DatagramChannel

處理 UDP 連接(面向無連接的,不需要握手,只要把數據丟出去就好了),操作位元組數組,同 FileChannel,不作過多介紹。

Selector

Selector 是非阻塞的,多路復用就是基於 Selector 的,Java 能通過 Selector 實現一個執行緒管理多個 Channel。

基本操作

  1. 開啟一個 Selector(經常被翻譯成選擇器、多路復用器)
Selector selector = Selector.open();
  1. 將 Channel 註冊到 Selector 上。前面我們說了,Selector 建立在非阻塞模式之上,所以註冊到 Selector 的 Channel 必須要支援非阻塞模式,FileChannel 不支援非阻塞,我們這裡討論最常見的 SocketChannel 和 ServerSocketChannel。
// 將通道設置為非阻塞模式,因為默認都是阻塞模式的  channel.configureBlocking(false);  // 註冊  SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register 方法的第二個參數是 SelectionKey 中的常量,代表要監聽感興趣的事件,總共有以下 4 種:

// 通道有數據可讀  public static final int OP_READ = 1 << 0;  // 可以向通道中寫數據  public static final int OP_WRITE = 1 << 2;  // 成功建立 TCP 連接  public static final int OP_CONNECT = 1 << 3;  // 接受 TCP 連接  public static final int OP_ACCEPT = 1 << 4;

註冊方法返回值是 SelectionKey 實例,它包含了 Channel 和 Selector 資訊,也包括了一個叫做 Interest Set 的資訊,即我們設置的我們感興趣的正在監聽的事件集合。

  1. 調用 select() 方法獲取通道資訊。用於判斷是否有我們感興趣的事件已經發生了。

基本用法

Selector selector = Selector.open();    channel.configureBlocking(false);    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);    while(true) {    // 判斷是否有事件準備好    int readyChannels = selector.select();    if(readyChannels == 0) continue;      // 遍歷    Set<SelectionKey> selectedKeys = selector.selectedKeys();    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();    while(keyIterator.hasNext()) {      SelectionKey key = keyIterator.next();        if(key.isAcceptable()) {          // a connection was accepted by a ServerSocketChannel.        } else if (key.isConnectable()) {          // a connection was established with a remote server.        } else if (key.isReadable()) {          // a channel is ready for reading        } else if (key.isWritable()) {          // a channel is ready for writing      }        keyIterator.remove();    }  }

I/O 多路復用原理

這裡放上一張原來總結的思維導圖截圖,具體原理需要另行寫篇文章。

總結

  • Buffer 和數組差不多,它有 position、limit、capacity 幾個重要屬性。put() 一下數據、flip() 切換到讀模式、然後用 get() 獲取數據、clear() 一下清空數據、重新回到 put() 寫入數據。
  • Channel 基本上只和 Buffer 打交道,最重要的介面就是 channel.read(buffer) 和 channel.write(buffer)。
  • Selector 用於實現非阻塞 IO,這裡僅僅介紹介面使用,後續請關注非阻塞 IO 的介紹。