Java網路編程 — NIO非阻塞網路編程

  • 2019 年 10 月 3 日
  • 筆記

從Java1.4開始,為了替代Java IO和網路相關的API,提高程式的運行速度,Java提供了新的IO操作非阻塞的API即Java NIO。NIO中有三大核心組件:Buffer(緩衝區),Channel(通道),Selector(選擇器)。NIO基於Channel(通道)和Buffer(緩衝區))進行操作,數據總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中,而Selector(選擇器)主要用於監聽多個通道的事件,實現單個執行緒可以監聽多個數據通道。

Buffer(緩衝區)

緩衝區本質上是一個可以寫入數據的記憶體塊(類似數組),然後可以再次讀取。此記憶體塊包含在NIO Buffer對象中,該對象提供了一組方法,可以更輕鬆的使用記憶體塊。
相對於直接操作數組,Buffer API提供了更加容易的操作和管理,其進行數據的操作分為寫入和讀取,主要步驟如下:

  1. 將數據寫入緩衝區
  2. 調用buffer.flip(),轉換為讀取模式
  3. 緩衝區讀取數據
  4. 調用buffer.clear()或buffer.compact()清楚緩衝區

Buffer中有三個重要屬性:
capacity(容量):作為一個記憶體塊,Buffer具有一定的固定大小,也稱為容量
position(位置):寫入模式時代表寫數據的位置,讀取模式時代表讀取數據的位置
limit(限制):寫入模式等於Buffer的容量,讀取模式時等於寫入的數據量

img

Buffer使用程式碼示例:

public class BufferDemo {    public static void main(String[] args) {      // 構建一個byte位元組緩衝區,容量是4      ByteBuffer byteBuffer = ByteBuffer.allocate(4);      // 默認寫入模式,查看三個重要的指標      System.out.println(          String.format(              "初始化:capacity容量:%s, position位置:%s, limit限制:%s",              byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));      // 寫入數據      byteBuffer.put((byte) 1);      byteBuffer.put((byte) 2);      byteBuffer.put((byte) 3);      // 再次查看三個重要的指標      System.out.println(          String.format(              "寫入3位元組後後:capacity容量:%s, position位置:%s, limit限制:%s",              byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));        // 轉換為讀取模式(不調用flip方法,也是可以讀取數據的,但是position記錄讀取的位置不對)      System.out.println("開始讀取");      byteBuffer.flip();      byte a = byteBuffer.get();      System.out.println(a);      byte b = byteBuffer.get();      System.out.println(b);      System.out.println(          String.format(              "讀取2位元組數據後,capacity容量:%s, position位置:%s, limit限制:%s",              byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));        // 繼續寫入3位元組,此時讀模式下,limit=3,position=2.繼續寫入只能覆蓋寫入一條數據      // clear()方法清除整個緩衝區。compact()方法僅清除已閱讀的數據。轉為寫入模式      byteBuffer.compact();      // 清除了已經讀取的2位元組,剩餘1位元組,還可以寫入3位元組數據      // 多寫的話會報java.nio.BufferOverflowException異常      byteBuffer.put((byte) 3);      byteBuffer.put((byte) 4);      byteBuffer.put((byte) 5);      System.out.println(          String.format(              "最終的情況,capacity容量:%s, position位置:%s, limit限制:%s",              byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));    }  }  

ByteBuffer堆外記憶體

ByteBuffer為性能關鍵型程式碼提供了直接記憶體(direct,堆外)和非直接記憶體(heap,堆)兩種實現。堆外記憶體實現將記憶體對象分配在Java虛擬機的堆以外的記憶體,這些記憶體直接受作業系統管理,而不是虛擬機,這樣做的結果就是能夠在一定程度上減少垃圾回收對應用程式造成的影響,提供運行的速度。

堆外記憶體的獲取方式:ByteBuffer byteBuffer = ByteBuffer.allocateDirect(noBytes)

堆外記憶體的好處:

  • 進行網路IO或者文件IO時比heap buffer少一次拷貝。(file/socket — OS memory — jvm heap)在寫file和socket的過程中,GC會移動對象,JVM的實現中會把數據複製到堆外,再進行寫入。
  • GC範圍之外,降低GC壓力,但實現了自動管理,DirectByteBuffer中有一個Cleaner對象(PhantomReference),Cleaner被GC執行前會執行clean方法,觸發DirectByteBuffer中定義的Deallocator

堆外記憶體的使用建議:

  • 性能確實可觀的時候才去使用,分配給大型,長壽命的對象(網路傳輸,文件讀寫等場景)
  • 通過虛擬機參數MaxDirectMemorySize限制大小,防止耗盡整個機器的記憶體

Channel(通道)

Channel用於源節點與目標節點之間的連接,Channel類似於傳統的IO Stream,Channel本身不能直接訪問數據,Channel只能與Buffer進行交互。

Channel的API涵蓋了TCP/UDP網路和文件IO,常用的類有FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel

標準IO Stream通常是單向的(InputStream/OutputStream),而Channel是一個雙向的通道,可以在一個通道內進行讀取和寫入,可以非阻塞的讀取和寫入通道,而且通道始終讀取和寫入緩衝區(即Channel必須配合Buffer進行使用)。

img

SocketChannel

SocketChannel用於建立TCP網路連接,類似java.net.Socket。有兩種創建SocketChannel的形式,一個是客戶端主動發起和伺服器的連接,還有一個就是服務端獲取的新連接。SocketChannel中有兩個重要的方法,一個是write()寫方法,write()寫方法有可能在尚未寫入內容的時候就返回了,需要在循環中調用write()方法。還有一個就是read()讀方法,read()方法可能直接返回根本不讀取任何數據,可以根據返回的int值判斷讀取了多少位元組。

核心程式碼程式碼示例片段:

// 客戶端主動發起連接  SocketChannel socketChannel = SocketChannel.open();  // 設置為非阻塞模式  socketChannel.configureBlocking(false);  socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));  // 發生請求數據 - 向通道寫入數據  socketChannel.write(byteBuffer);  // 讀取服務端返回 - 讀取緩衝區數據  int readBytes = socketChannel.read(requestBuffer);  // 關閉連接  socketChannel.close();  

ServerSocketChannel

ServerSocketChannel可以監聽新建的TCP連接通道,類似ServerSocket。ServerSocketChannel的核心方法accept()方法,如果通道處於非阻塞模式,那麼如果沒有掛起的連接,該方法將立即返回null,實際使用中必須檢查返回的SocketChannel是否為null。

核心程式碼示例片段:

// 創建網路服務端  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  // 設置為非阻塞模式  serverSocketChannel.configureBlocking(false);  // 綁定埠  serverSocketChannel.socket().bind(new InetSocketAddress(8080));  while (true) {    // 獲取新tcp連接通道    SocketChannel socketChannel = serverSocketChannel.accept();    if (socketChannel != null) {      // tcp請求 讀取/響應    }  }  

Selector選擇器

Selector也是Java NIO核心組件,可以檢查一個或多個NIO通道,並確定哪些通道已經準備好進行讀取或寫入。實現單個執行緒可以管理多個通道,從而管理多個網路連接。

一個執行緒使用Selector可以監聽多個Channel的不同事件,其中主要有四種事件,分別對應SelectionKey中的四個常量,分別為:

  • 連接事件 SelectionKey.OP_CONNECT
  • 準備就緒事件 SelectionKey.OP_ACCEPT
  • 讀取事件 SelectionKey.OP_READ
  • 寫入事件 SelectionKey.OP_WRITE

img

Selector實現一個執行緒處理多個通道的核心在於事件驅動機制,非阻塞的網路通道下,開發者通過Selector註冊對於通道感興趣的事件類型,執行緒通過監聽事件來觸發相應的程式碼執行。(更底層其實是作業系統的多路復用機制)

核心程式碼示例片段:

// 構建一個Selector選擇器,並且將channel註冊上去  Selector selector = Selector.open();  // 將serverSocketChannel註冊到selector  SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);  // 對serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支援accept操作)  selectionKey.interestOps(SelectionKey.OP_ACCEPT);  while (true) {    // 用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有返回    selector.select();    // 獲取事件    Set<SelectionKey> keys = selector.selectedKeys();    // 遍歷查詢結果    Iterator<SelectionKey> iterator = keys.iterator();    while (iterator.hasNext()) {      // 被封裝的查詢結果      SelectionKey key = iterator.next();      // 判斷不同的事件類型,執行對應的邏輯處理      if (key.isAcceptable()) {        // 處理連接的邏輯      }      if (key.isReadable()) {        //處理讀數據的邏輯      }        iterator.remove();    }  }  

NIO網路編程完整程式碼

服務端程式碼示例:

// 結合Selector實現的非阻塞服務端(放棄對channel的輪詢,藉助消息通知機制)  public class NIOServer {      public static void main(String[] args) throws IOException {      // 創建網路服務端ServerSocketChannel      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();      // 設置為非阻塞模式      serverSocketChannel.configureBlocking(false);        // 構建一個Selector選擇器,並且將channel註冊上去      Selector selector = Selector.open();      // 將serverSocketChannel註冊到selector      SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);      // 對serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支援accept操作)      selectionKey.interestOps(SelectionKey.OP_ACCEPT);        // 綁定埠      serverSocketChannel.socket().bind(new InetSocketAddress(8080));      System.out.println("啟動成功");        while (true) {        // 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會有返回        selector.select();        // 獲取事件        Set<SelectionKey> keys = selector.selectedKeys();        // 遍歷查詢結果        Iterator<SelectionKey> iterator = keys.iterator();        while (iterator.hasNext()) {          // 被封裝的查詢結果          SelectionKey key = iterator.next();          iterator.remove();          // 關注 Read 和 Accept兩個事件          if (key.isAcceptable()) {            ServerSocketChannel server = (ServerSocketChannel) key.attachment();            // 將拿到的客戶端連接通道,註冊到selector上面            SocketChannel clientSocketChannel = server.accept();            clientSocketChannel.configureBlocking(false);            clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);            System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());          }          if (key.isReadable()) {            SocketChannel socketChannel = (SocketChannel) key.attachment();            try {              ByteBuffer byteBuffer = ByteBuffer.allocate(1024);              while (socketChannel.isOpen() && socketChannel.read(byteBuffer) != -1) {                // 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)                if (byteBuffer.position() > 0) break;              }                if (byteBuffer.position() == 0) continue;              byteBuffer.flip();              byte[] content = new byte[byteBuffer.limit()];              byteBuffer.get(content);              System.out.println(new String(content));              System.out.println("收到數據,來自:" + socketChannel.getRemoteAddress());                // 響應結果 200              String response = "HTTP/1.1 200 OKrn" + "Content-Length: 11rnrn" + "Hello World";              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());              while (buffer.hasRemaining()) {                socketChannel.write(buffer);              }              } catch (Exception e) {              e.printStackTrace();              key.cancel(); // 取消事件訂閱            }          }            selector.selectNow();        }      }    }  }  

客戶端程式碼示例:

public class NIOClient {      public static void main(String[] args) throws IOException {      // 客戶端主動發起連接      SocketChannel socketChannel = SocketChannel.open();      // 設置為非阻塞模式      socketChannel.configureBlocking(false);      socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));      while (!socketChannel.finishConnect()) {        // 沒連接上,則一直等待        Thread.yield();      }        Scanner scanner = new Scanner(System.in);      System.out.println("請輸入:");      // 發送內容      String msg = scanner.nextLine();      ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());      while (byteBuffer.hasRemaining()) {        socketChannel.write(byteBuffer);      }        // 讀取響應      System.out.println("收到服務端響應:");      ByteBuffer buffer = ByteBuffer.allocate(1024);        while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {        // 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0位元組就認為請求結束了)        if (buffer.position() > 0) break;      }        buffer.flip();      byte[] content = new byte[buffer.limit()];      buffer.get(content);      System.out.println(new String(content));      scanner.close();      socketChannel.close();    }  }  

NIO與BIO的比較

img

如果程式需要支撐大量的連接,使用NIO是最好的方式。
Tomcat8中已經完全移除了BIO相關的網路處理程式碼,默認採用NIO進行網路處理。