JAVA中的NIO (New IO)

  • 2019 年 10 月 29 日
  • 筆記

簡介

標準的IO是基於位元組流和字元流進行操作的,而JAVA中的NIO是基於Channel和Buffer進行操作的。

傳統IO

graph TB; 位元組流 –> InputStream; 位元組流 –> OutputStream; 字元流 –> Reader; 字元流 –> Writer;

NIO

graph TB; A[Channel] –> B[Buffer..]; C[Channel] –> D[Buffer..]; E[Channel] –> F[Buffer..];

核心模組

NIO主要有三個核心部分:Selector、Channel、Buffer

數據總是從Channel讀取到Buffer或者從Buffer寫入到Channel中。

Selector可以監聽多個Channel的多個事件。

graph TB; Selector –> A[Channel]; Selector –> B[Channel]; Selector –> C[Channel]; A –> E1[Event…]; B –> E2[Event…]; C –> E3[Event…];

傳統的IO與Channel的區別

1.傳統的IO是BIO的,而Channel是NIO的。

*當流調用了read()、write()方法後會一直阻塞執行緒直到數據被讀取或寫入完畢。

2.傳統IO流是單向的,而Channel是雙向的。


Channel

FileChannel:從文件中進行讀取    DatagramChannel:可以通過UDP協議在網路中進行數據的傳輸    SocketChannel:可以通過TCP協議在網路中進行數據的傳輸    ServerSocketChannel:可以作為一個伺服器監聽連接  

Channel通用API:

read(buffer):將數據從Channel讀取到Buffer中,讀取完畢返回-1。    read(buffer []):將數據從Channel讀取到多個Buffer中,僅當第一個Buffer被寫滿後往第二個Buffer中進行寫入。    write(buffer):將Buffer中的數據寫入到Channel中。    write(buffer[]):將多個Buffer中的數據寫入到Channel中,僅當第一個Buffer中的數據被讀取完畢後再從第二個Buffer中進行讀取。    register(selector,interest):將Channel註冊到Selector中,同時需要向Selector傳遞要監聽此Channel的事件類型(註冊到Selector中的Channel一定要非阻塞的)    configureBlocking(boolean):設置Channel是否為阻塞。    transferFrom(position,count,channel):將其他Channel中的數據傳輸到當前Channel中。    transferTo(position,count,channel):將當前Channel中的數據傳輸到其他Channel中。  

SocketChannel API

open()靜態方法:創建SocketChannel。    connect(new InetSocketAddress(port))方法:連接伺服器。    finishConnect()方法:判斷是否已經與伺服器建立連接。  

ServerSocketChannel API

open()靜態方法:創建ServerSocketChannel。    accept()方法:該方法會一直阻塞執行緒直到有新連接到達。  

阻塞式與非阻塞式Channel

正常情況下Channel都是阻塞的,只有當調用了configureBlocking(false)方法時Channel才為非阻塞。

阻塞式Channel的connect()、accept()、read()、write()方法都會阻塞執行緒,直到處理完畢。

非阻塞式Channel的connect()、accept()、read()、write()方法都是非同步的。

*當調用了非阻塞式Channel的connect()方法後,需要使用finishConnect()方法判斷是否已經與伺服器建立連接。

*當調用了非阻塞式Channel的accept()方法後,需要根據方法的返回值是否為NULL判斷是否接收到新的連接。

*當調用了非阻塞式Channel的read()方法後,需要根據方法的返回值是否大於0判斷是否有讀取到數據。

*在使用非阻塞式Channel的write()方法時,需要藉助while循環與hasRemaining()方法保證buffer中的內容被全部寫入。

*FileChannel一定是阻塞的。

示例

public void testFileChannel() throws IOException {      RandomAccessFile randomAccessFile = new RandomAccessFile(new File("F:\筆記\nginx.txt"), "rw");      FileChannel fileChannel = randomAccessFile.getChannel();      ByteBuffer byteBuffer = ByteBuffer.allocate(64);      int count = fileChannel.read(byteBuffer);      while (count != -1) {          byteBuffer.flip();          System.out.println(new String(Arrays.copyOfRange(byteBuffer.array(),0,byteBuffer.limit()),Charset.forName("UTF-8")));          byteBuffer.clear();          count = fileChannel.read(byteBuffer);      }  }  

Buffer

Buffer是一塊可以進行讀寫操作的記憶體(順序存儲結構)

ByteBuffer:基於Byte類型進行存儲    CharBuffer:基於Char類型進行存儲    DoubleBuffer:基於Double類型進行存儲    FloatBuffer:基於Float類型進行存儲    IntBuffer:基於Int類型進行存儲    LongBuffer:基於Long類型進行存儲    ShortBuffer:基於Short類型進行存儲  

Buffer的內部結構

1.capacity:表示buffer的容量

2.position:表示當前的位置(從0開始,最大值為capacity-1)

3.limit:在寫模式中表示可以寫入的個數(與capacity一樣),在讀模式中表示可以讀取的個數。

從寫模式轉換成讀模式

limit設置為position+1,position設置為0。

從讀模式轉換成寫模式

limit設置為capacity,position設置為0。

往Buffer中寫數據

1.將數據從Channel讀取到Buffer中。

2.使用Buffer的put()方法。

從Buffer中讀數據

1.將Buffer中的數據寫入到Channel中。

2.使用Buffer的get()方法

Buffer通用API:

allocate(size)靜態靜態:初始化一個Buffer。    flip():將buffer從寫模式轉換成讀模式。    array():將Buffer中的內容轉換成數組(不受limit控制)    get():獲取Buffer中的內容。    hasRemaining():判斷Buffer中是否還有未讀的元素(limit - (postion+1) )    rewind():將positon設置為0。    clear():將limit設置為capacity,position設置為0。    compact():將所有未讀的元素移動到Buffer的起始處,position指向最後一個未讀的元素的下一位,limit設置為capacity。    *clear()和compact()方法都可以理解成將Buffer從讀模式轉換成寫模式,區別在於compact()方法會保留未讀取的元素。    mark():在當前position處打一個標記。    reset():將position恢復到標記處。  

Selector

Selector用於監聽多個Channel的多個事件(單執行緒)

graph TB; Selector –> A[Channel]; Selector –> B[Channel]; Selector –> C[Channel]; A –> E1[connect]; B –> E2[accept]; C –> E3[connect]; C –> E4[read];

Channel的事件類型

1.連接就緒:當SocketChannel、DatagramChannel成功與伺服器建立連接時將會觸發連接就緒事件。

2.接收就緒:當ServerSocketChannel有新連接到達時將會觸發接收就緒事件。

3.讀就緒:當SocketChannel、DatagramChannel有數據可讀時將會觸發讀就緒事件。

4.寫就緒:當SocketChannel、DatagramChannel可以進行數據寫入時將會觸發寫就緒事件。

SelectionKey

SelectionKey用於存儲Selector與Channel之間的相關資訊。

SelectionKey中提供了四個常量分別代表Channel的事件類型。

SelectionKey.OP_CONNECT    SelectionKey.OP_ACCEPT    SelectionKey.OP_READ    SelectionKey.OP_WRITE  

SelectableChannel提供的register(selector,interest)方法用於將Channel註冊到Selector中,同時需要向Selector傳遞要監聽此Channel的事件類型,當要監聽的事件類型不止一個時可以使用或運算,當將Channel註冊到Selector後會返回SelectionKey實例,用於存儲Selector與此Channel之間的相關資訊。

SelectionKey API:

interestOps()方法:返回Selector監聽此Channel的事件類型。    readyOps()方法:返回此Channel目前就緒的事件。    isAcceptable():判斷Channel是否接收就緒。    isConnectable():判斷Channel是否連接就緒。    isReadable():判斷Channel是否讀就緒。    isWriteable():判斷Channel是否寫就緒。    channel():返回具體的Channel實例。    selector():返回Selector實例。    attach():往SelectionKey中添加一個附加對象。    attachment():返回保存在SelectionKey中的附加對象。  

Selector API:

open()靜態方法:創建一個Selector。    select()方法:該方法會一直阻塞執行緒直到所監聽的Channel有事件就緒,返回就緒的Channel個數(只會返回新就緒的Channel個數)    selectedKeys()方法:返回就緒的Channel對應的SelectionKey。    *當Channel就緒的事件處理完畢後,需要手動刪除SelectionKey集合中該Channel對應的SelectionKey,當該Channel再次有事件就緒時會自動加入到Selectionkey集合中。  

非阻塞式Channel與Selector

非阻塞式Channel一般與Selector配合使用

當Selector監聽到ServerSocketChannel接收就緒時,那麼此時可以立即調用ServerSocketChannel的accept()方法獲取新連接。

當Selector監聽到SocketChannel讀就緒時,那麼此時可以立即調用SocketChannel的read()方法進行數據的讀取。

非阻塞式伺服器

伺服器

package com.novellatonyatt.nio;    import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.Set;  import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;    /**   * @Auther: Zhuang HaoTang   * @Date: 2019/10/26 16:35   * @Description:   */  public class Server {        private ServerSocketChannel createNIOServerSocketChannel() throws IOException {          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();          serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));          serverSocketChannel.configureBlocking(false);          return serverSocketChannel;      }        private void acceptHandler(SelectionKey selectionKey) throws IOException {          Selector selector = selectionKey.selector();          ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();          SocketChannel socketChannel = serverSocketChannel.accept();          socketChannel.configureBlocking(false);          socketChannel.register(selector, SelectionKey.OP_READ);          System.out.println("accept client connection " + socketChannel.getLocalAddress());      }        private void readHandler(SelectionKey selectionKey) throws IOException {          SocketChannel socketChannel = (SocketChannel) selectionKey.channel();          ByteBuffer byteBuffer = ByteBuffer.allocate(100);          int num = socketChannel.read(byteBuffer);          if(num == -1){ // 連接已斷開              System.out.println("client "+socketChannel.getLocalAddress() + " disconnection");              socketChannel.close();              return;          }          byteBuffer.flip();          while (byteBuffer.hasRemaining()) {              byte b = byteBuffer.get();              System.out.println((char) b);          }      }        public void open() throws IOException {          Selector selector = Selector.open();          ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();          System.out.println("start nio server and bind port 8888");          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);          int ready = selector.select();          while (ready > 0) {              System.out.println("ready channel count " + ready);              Set<SelectionKey> selectionKeySet = selector.selectedKeys();              for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {                  SelectionKey selectionKey = iterator.next();                  if (selectionKey.isAcceptable()) {                      System.out.println("acceptable");                      acceptHandler(selectionKey);                  } else if (selectionKey.isReadable()) {                      System.out.println("readable");                      readHandler(selectionKey);                  }                  iterator.remove();              }              ready = selector.select();          }      }        public static void main(String[] args) throws IOException {          Server server = new Server();          server.open();      }      }  

*一個Channel不會同時有多個事件就緒,以事件為單位。

*當客戶端斷開連接,那麼將會觸發讀就緒,並且channel的read()方法返回-1,表示連接已斷開,伺服器應該要做出處理,關閉這個連接。

客戶端

package com.novellatonyatt.nio;    import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SocketChannel;    /**   * @Auther: Zhuang HaoTang   * @Date: 2019/10/26 16:36   * @Description:   */  public class Client {        public static void main(String[] args) throws IOException, InterruptedException {          SocketChannel socketChannel = SocketChannel.open();          socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(),8888));            String message = "today is sunday";          ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);          byteBuffer.put(message.getBytes());          byteBuffer.flip();          socketChannel.write(byteBuffer);          Thread.sleep(5000);      }    }    

運行結果


Reactor模式

1.單執行緒的事件分發器。

2.具體事件類型的Handler執行緒池。

3.業務執行緒池。

graph TB; A[單執行緒的事件分發器] –> B[接收就緒]; A –> C[讀就緒]; A –> D[寫就緒]; C –> F[Handler執行緒池]; D –> G[Handler執行緒池]; F –> H[業務執行緒池]; G –> I[業務執行緒池];

NIO指的是事件分化器,當應用程式發起IO操作後,可以利用等待的時間做一些處理,JAVA NIO使用了IO多路復用中的Select模型。

*主執行緒不需要等待具體事件類型的Handler處理完畢,直接非同步返回,那麼將會導致事件重複就緒,程式做出相應的控制即可。

*具體事件類型的Handler是非同步的(注意這並不是AIO)

伺服器

package com.novellatonyatt.nio;    import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.Set;  import java.util.concurrent.ArrayBlockingQueue;  import java.util.concurrent.ThreadPoolExecutor;  import java.util.concurrent.TimeUnit;    /**   * @author: Zhuang HaoTang   * @create: 2019-10-28 17:00   * @description:   */  public class ReactorServer {        private ThreadPoolExecutor eventHandlerPool = new ThreadPoolExecutor(10, 50, 2, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(200), new ThreadPoolExecutor.CallerRunsPolicy());        private ServerSocketChannel createNIOServerSocketChannel() throws IOException {          ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();          serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));          serverSocketChannel.configureBlocking(false);          return serverSocketChannel;      }        private void readHandler(SelectionKey selectionKey) {          SocketChannel socketChannel = (SocketChannel) selectionKey.channel();          ByteBuffer byteBuffer = ByteBuffer.allocate(100);          try {              int num = socketChannel.read(byteBuffer);              if (num == -1) {                  System.out.println("client " + socketChannel.getLocalAddress() + " disconnection");                  socketChannel.close(); // 底層有些邏輯                  return;              }              byteBuffer.flip();              while (byteBuffer.hasRemaining()) {                  byte b = byteBuffer.get();                  System.out.println((char) b);              }          } catch (Exception e) {              System.out.println("由於連接關閉導致並發執行緒讀取異常");          }      }        private void open() throws IOException {          Selector selector = Selector.open();          ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();          System.out.println("start nio server and bind port 8888");          serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);          int ready = selector.select();          while (ready > 0) {              System.out.println("ready channel count " + ready);              Set<SelectionKey> selectionKeySet = selector.selectedKeys();              for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {                  final SelectionKey selectionKey = iterator.next();                  if (selectionKey.isAcceptable()) {                      System.out.println("acceptable");                      serverSocketChannel = (ServerSocketChannel) selectionKey.channel();                      SocketChannel socketChannel = serverSocketChannel.accept();                      socketChannel.configureBlocking(false);                      socketChannel.register(selector, SelectionKey.OP_READ);                      System.out.println("accept client connection " + socketChannel.getLocalAddress());                  } else if (selectionKey.isReadable()) {                      System.out.println("readable");                      eventHandlerPool.submit(new Runnable() {                          @Override                          public void run() {                              readHandler(selectionKey);                          }                      });                  }                  iterator.remove();              }              ready = selector.select();          }      }        public static void main(String[] args) throws IOException {          ReactorServer reactorServer = new ReactorServer();          reactorServer.open();      }    }  

客戶端

package com.novellatonyatt.nio;    import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SocketChannel;    /**   * @Auther: Zhuang HaoTang   * @Date: 2019/10/26 16:36   * @Description:   */  public class Client {        public static void main(String[] args) throws IOException, InterruptedException {          SocketChannel socketChannel = SocketChannel.open();          socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));            String message = "today is sunday";          ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);          byteBuffer.put(message.getBytes());          byteBuffer.flip();          socketChannel.write(byteBuffer);          Thread.sleep(5000);      }    }    

運行結果

*當channel有數據可讀時,將會觸發讀就緒,那麼主執行緒將會不停的向執行緒池提交任務,直到某個執行緒讀取完畢,此時將會停止讀就緒,其他執行緒讀取到的個數為0。

*當客戶端斷開連接時,將會觸發讀就緒,那麼主執行緒將會不停的向執行緒池提交任務,直到某個執行緒關閉連接,此時將會停止讀就緒,其他執行緒要做相應的異常處理。

一般不會這麼去使用JAVA NIO,只是通過JAVA NIO學習他的設計思想,如果要想搭建NIO伺服器那麼應該使用Netty等NIO框架。