Java NIO、Channel、Selector 詳解
- 2019 年 11 月 11 日
- 筆記
Java NIO 有三大組件:
- Buffer
- Channel
- Selector
Buffer
Buffer 是一個特定原始類型的容器。Buffer 是一個原始類型的線性的、有限序列,除了 Buffer 存儲的內容外,關鍵屬性還包括:capacity, limit 和 position。
- capacity:Buffer 包含的元素的數量,capacity 永遠不會為負,也不會改變。
- limit:Buffer 中第一個不能讀取或寫入的元素索引。limit 永遠不會為負,且永遠小於等於 capacity
- position:下一個待讀取、寫入的元素索引。position 永遠不會為負,且永遠小於等於 limit
每個基本類型(布爾類型除外),都有一個 Buffer 的子類。Java NIO 中 Buffer 的一些實現,其中最重要的是 ByteBuffer,其餘類如 IntBuffer 的實現類未畫出。

image
我個人理解,Buffer 就是一個記憶體數組,並通過 capacity, limit 和 position 三個變數對讀寫操作進行控制。
position、limit、capacity
Buffer 的屬性主要有:
// 恆等式: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity; // 僅在 direct buffers 中使用 long address;
ByteBuffer 中額外定義了位元組數組(其餘 Buffer 的子類同理):
// 該位元組數組僅在分配在堆上時才非空(參考下面的 Direct vs. non-direct buffers) final byte[] hb;
Buffer 就是根據這 4 個 int 型欄位來配合記憶體數組的讀寫。這 4 個屬性分別為:
- mark:臨時保存 position 的值,每次調用 mark() 方法都會將 mark 設值為當前的 position
- capacity:Buffer 緩衝區容量,capacity 永遠不會為負,也不會改變。
- limit:Buffer 中第一個不能讀取或寫入的元素索引。limit 永遠不會為負,且永遠小於等於 capacity。寫模式下,limit 代表的是最大能寫入的數據,limit = capacity;讀模式下,limit = Buffer 實際寫入的數據大小。
- position:下一個待讀取、寫入的元素索引。position 永遠不會為負,且永遠小於等於 limit。

image
ByteBuffer
從上圖中我們可以看到,ByteBuffer 類有 2 個實現類:
- MappedByteBuffer:DirectByteBuffer 的抽象類,JVM 會儘可能交給本地方法操作 I/O,其記憶體不會分配在堆上,不會佔用應用程式的記憶體。
- HeapByteBuffer:顧名思義是存儲在堆上的 Buffer,我們直接調用
ByteBuffer.allocate(1024);
時會創建此類 Buffer。
public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
Direct vs. non-direct buffers
一個 byte buffer 可以是 direct,也可以是非 direct 的。對於 direct byte buffer,JVM 將盡量在本機上執行 I/O 操作。也就是說,JVM 盡量避免每次在調用作業系統 I/O 操作前,將緩衝區內容複製到中間緩衝區。
可以通過類中的 allocateDirect 工廠方法創建 direct buffer,這個方法創建的 direct buffer 通常比 non-direct buffer 具有更高的分配和釋放成本。Direct buffer 記憶體可能分配在 GC 堆的外部,所以對應用程式的記憶體佔用影響並不明顯。所以建議將 direct buffer 分配給大型、壽命長的、受底層作業系統 I/O 操作約束的緩衝區。
可以通過調用 isDirect 方法判斷 byte buffer 是否是 direct 的。
Buffer 初始化
Buffer 可以通過 allocation 方法創建,也可以通過位元組數組的 wrapping 方法創建並填充。
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
public static ByteBuffer wrap(byte[] array) { ... }
填充 Buffer
// 填充一個 byte public abstract ByteBuffer put(byte b); // 在指定位置填充一個 byte public abstract ByteBuffer put(int index, byte b); // 批量將 src buffer 填充到本 buffer public ByteBuffer put(ByteBuffer src) { ... } // 批量將 src 數組的特定區間填充到本 buffer public ByteBuffer put(byte[] src, int offset, int length) { ... } // 批量將 src 數組填充到本 buffer public final ByteBuffer put(byte[] src) { ... }
我們還可以將 Channel 的數據填充到 Buffer 中,數據是從外部(文件、網路)讀到記憶體中。
int read = channel.read(buffer);
讀取 Buffer
對於前面的寫操作,每寫一個值,position 都會自增 1,所以 position 會指向最後寫入位置的後面一位。
如果要讀取 Buffer 的值,需要調用 flip() 方法,從寫模式切換到讀模式。
public final Buffer flip() { limit = position; // 將 limit 設置為實際寫入的數據數量 position = 0; // 重置 position mark = -1; // 將 mark 設置為未標記 return this; }
讀操作的 get 方法如下:
// 讀取當前 position 的位元組,然後 position 自增 1 public abstract byte get(); // 讀取 index 的位元組( position 不會自增!) public abstract byte get(int index); // 批量將緩衝區數據傳遞到 dst 數組中,position 自增 dst 的長度 public ByteBuffer get(byte[] dst) { ... } // 批量將緩衝區數據傳遞到 dst 數組中 public ByteBuffer get(byte[] dst, int offset, int length) { ... }
我們可以將緩衝區的數據傳輸到 Channel 中:
- 通過 FileChannel 將數據寫到文件中
- 通過 SocketChannel 將數據寫入網路,發送到遠程機器
int write = channel.write(buffer);
mark(), reset()
mark 用於臨時保存 position 的值,每次調用 mark() 方法都會將 position 賦值給 mark。
public final Buffer mark() { mark = position; return this; }
reset() 方法就是將 position 賦值到上次 mark 的位置上(也就是上一次調用 mark() 方法的時候),通過 mark(), reset() 兩個方法的配合,我們可以重複讀取某個區間的數據。
public final Buffer reset() { int m = mark; if (m < 0) throw new InvalidMarkException(); position = m; return this; }
注意 mark 構造初始化時數值是 -1,如果 >= 0 則表示可以讀取。
rewind(), clear(), compact()
rewind() 重置 position 為 0。通常在 channel-write 和 get 前調用此方法。
public final Buffer rewind() { position = 0; mark = -1; return this; }
clear() 會重置 position,將 limit 設置為最大值 capacity,並將 mark 置成 -1。通常在 channel-read 和填充此 buffer 時,會先調用此方法。
public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; }
compact() 方法並不常用,忽略。
public abstract ByteBuffer compact();
恆等式
mark, position, limit和 capacity 永遠遵循以下關係:
0 <= mark <= position <= limit <= capacity
新創建的 buffer position = 0,mark 是未定義的(-1)。limit 的初始值可能是 0,也可能是構造時傳入的其他值。新分配的緩衝區元素都初始化為 0。
Channel
Channel 是 I/O 操作的「橋樑」。Channel 可以是對硬體設備、文件、網路套接字、程式組件等實體的連接,該實體能夠執行不同的 I/O 操作(讀取或寫入)。
Channel 只有 2 種狀態:開啟和關閉。在創建時就是開啟的,一旦關閉就不會再回到打開狀態。Channel 一旦關閉,任何對 Channel 調用的 I/O 操作都會拋出 ClosedChannelException 異常,可以通過方法 isOpen() 來檢測 Channel 是否開啟。
Channel 介面定義如下:
public interface Channel extends Closeable { public boolean isOpen(); public void close() throws IOException; }

image
- FileChannel:文件通道,用於文件讀寫
- DatagramChannel:UDP 連接
- SocketChannel:TCP 連接通道,TCP 客戶端
- ServerSocketChannel:TCP 對應的服務端,用於監聽某個埠進來的請求
讀操作:將數據從 Channel 讀取到 Buffer 中
int read = channel.read(buffer);
寫操作:將數據從 Buffer 寫入到 Channel 中
int write = channel.write(buffer);
FileChannel
讀取文件內容,詳細說明見注釋。
@Test public void testFileChannelRead() throws IOException { // 獲取文件的 FileChannel FileInputStream fileInputStream = new FileInputStream("/abc"); FileChannel channel = fileInputStream.getChannel(); // 創建 ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(30); // 將文件內容讀取到 buffer 中 channel.read(buffer); // buffer 從寫模式,切換到讀模式 buffer.flip(); // 列印 buffer(文件)的內容 while (buffer.hasRemaining()) { System.out.print((char)buffer.get()); } }
寫入文件內容,詳細說明見注釋。
@Test public void testFileChannelWrite() throws IOException { // 獲取文件的 FileChannel FileOutputStream fileOutputStream = new FileOutputStream("/abc"); FileChannel channel = fileOutputStream.getChannel(); // 創建 ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(30); buffer.put("123456".getBytes()); // Buffer 切換為讀模式 buffer.flip(); while(buffer.hasRemaining()) { // 將 Buffer 中的內容寫入文件 channel.write(buffer); } }
SocketChannel
SocketChannel 顧名思義,就是 Socket 的 Channel,能夠讀寫 Socket。操作緩衝區同 FileChannel。
@Test public void testSocketChannel() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 80)); // 創建 ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(30); // 讀取數據 socketChannel.read(buffer); // 寫入數據到網路連接中 while(buffer.hasRemaining()) { socketChannel.write(buffer); } }
ServerSocketChannel
ServerSocketChannel 用於監聽機器埠,管理從這個埠進來的 TCP 連接。
// 實例化 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 監聽 8080 埠 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while (true) { // 一旦有一個 TCP 連接進來,就對應創建一個 SocketChannel 進行處理 SocketChannel socketChannel = serverSocketChannel.accept(); }
這裡可以看到 SocketChannel 的另一種實例化方式,SocketChannel 可讀可寫,操作一個網路通道。
ServerSocketChannel 不和 Buffer 打交道了,因為它並不實際處理數據,它一旦接收到請求後,實例化 SocketChannel,之後在這個連接通道上的數據傳遞它就不管了,因為它需要繼續監聽埠,等待下一個連接。
DatagramChannel
處理 UDP 連接(面向無連接的,不需要握手,只要把數據丟出去就好了),操作位元組數組,同 FileChannel,不作過多介紹。
Selector
Selector 是非阻塞的,多路復用就是基於 Selector 的,Java 能通過 Selector 實現一個執行緒管理多個 Channel。
基本操作
- 開啟一個 Selector(經常被翻譯成選擇器、多路復用器)
Selector selector = Selector.open();
- 將 Channel 註冊到 Selector 上。前面我們說了,Selector 建立在非阻塞模式之上,所以註冊到 Selector 的 Channel 必須要支援非阻塞模式,FileChannel 不支援非阻塞,我們這裡討論最常見的 SocketChannel 和 ServerSocketChannel。
// 將通道設置為非阻塞模式,因為默認都是阻塞模式的 channel.configureBlocking(false); // 註冊 SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
register 方法的第二個參數是 SelectionKey 中的常量,代表要監聽感興趣的事件,總共有以下 4 種:
// 通道有數據可讀 public static final int OP_READ = 1 << 0; // 可以向通道中寫數據 public static final int OP_WRITE = 1 << 2; // 成功建立 TCP 連接 public static final int OP_CONNECT = 1 << 3; // 接受 TCP 連接 public static final int OP_ACCEPT = 1 << 4;
註冊方法返回值是 SelectionKey 實例,它包含了 Channel 和 Selector 資訊,也包括了一個叫做 Interest Set 的資訊,即我們設置的我們感興趣的正在監聽的事件集合。
- 調用 select() 方法獲取通道資訊。用於判斷是否有我們感興趣的事件已經發生了。
基本用法
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { // 判斷是否有事件準備好 int readyChannels = selector.select(); if(readyChannels == 0) continue; // 遍歷 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } }
I/O 多路復用原理
這裡放上一張原來總結的思維導圖截圖,具體原理需要另行寫篇文章。

總結
- Buffer 和數組差不多,它有 position、limit、capacity 幾個重要屬性。put() 一下數據、flip() 切換到讀模式、然後用 get() 獲取數據、clear() 一下清空數據、重新回到 put() 寫入數據。
- Channel 基本上只和 Buffer 打交道,最重要的介面就是 channel.read(buffer) 和 channel.write(buffer)。
- Selector 用於實現非阻塞 IO,這裡僅僅介紹介面使用,後續請關注非阻塞 IO 的介紹。