從同步阻塞到非同步非阻塞角度看網路編程

  • 2019 年 11 月 14 日
  • 筆記

1. 最簡單的網路通訊——同步阻塞通訊(BIO)

  首先來看一個傳統簡單的網路通訊案例,該案例是基於同步阻塞的I/O,服務端程式碼如下

public class Server extends Thread{      private ServerSocket serverSocket;      public Server(int port) throws IOException      {          serverSocket = new ServerSocket(port, 1000);    //埠號,以及運行連接可以保存的最長隊列          serverSocket.setSoTimeout(1000000);      }      public void run()      {          while(true)          {              try              {                  System.out.println("等待遠程連接,埠號為:" + serverSocket.getLocalPort() + "...");                  Socket server = serverSocket.accept();                  System.out.println("遠程主機地址:" + server.getRemoteSocketAddress());                  DataInputStream in = new DataInputStream(server.getInputStream());                  Thread.sleep(2000);                  System.out.println(in.readUTF());                  DataOutputStream out = new DataOutputStream(server.getOutputStream());                  out.writeUTF("0101, 主機收到:" + server.getLocalSocketAddress() + "nGoodbye!");                  server.close();              }catch(SocketTimeoutException s)              {                  System.out.println("Socket timed out!");                  break;              }catch(IOException e)              {                  e.printStackTrace();                  break;              } catch (InterruptedException e) {                  e.printStackTrace();              }          }      }      public static void main(String [] args) throws IOException {              Thread t = new Server(6666);              t.run();      }  }  

客戶端程式碼如下:

public class Client implements Runnable{      private int id;      public Client(int id){          this.id = id;      }      public static void main(String[] args) throws InterruptedException, IOException {          ExecutorService es = Executors.newFixedThreadPool(100);          for (int i = 0; i < 100; i++) {              es.execute(new Client(i+1));          }          es.shutdown();      }  ​      @Override      public void run() {          Socket client = null;          try {              client = new Socket("127.0.0.1", 6666);              OutputStream outToServer = client.getOutputStream();              DataOutputStream out = new DataOutputStream(outToServer);              out.writeUTF("Hello, I am the " + id + "-client and I come from " + client.getLocalSocketAddress());              InputStream inFromServer = client.getInputStream();              DataInputStream in = new DataInputStream(inFromServer);             System.out.println("client-" + id + " : response : " + in.readUTF());              client.close();          } catch (Exception e) {              e.printStackTrace();          }      }  }  

  看到當假設100個客戶端同時連接伺服器的時候,單執行緒下服務端對接收的請求只會一個一個去處理,導致很多客戶端請求被阻塞,處於等待情況,這個時候,通常的服務端優化的解決辦法是開啟利用執行緒池開啟多個執行緒去處理。如下:

public class BlockServer implements Runnable{  ​      private Socket server;      public  BlockServer(Socket server){          this.server = server;      }  ​      @Override      public void run() {          DataInputStream in = null;          DataOutputStream out = null;          try {              in = new DataInputStream(server.getInputStream());              System.out.println(server.getInetAddress() + ":" + in.readUTF());              out = new DataOutputStream(server.getOutputStream());              Thread.sleep(2000);              out.writeUTF("server receive your message." );              in.close();              out.close();              server.close();          } catch (Exception e) {              e.printStackTrace();          }      }  ​      public static void main(String[] args) throws IOException {          ExecutorService es = Executors.newFixedThreadPool(100);          ServerSocket serverSocket = new ServerSocket(6666, 1000);          System.out.println("等待遠程連接,埠號為:" + serverSocket.getLocalPort() + "...");          while (!Thread.currentThread().isInterrupted()){              Socket socket = serverSocket.accept();              es.execute(new BlockServer(socket));          }          es.shutdown();      }  }

 兩種結果的輸出可以看出基於多執行緒的網路通訊效率遠遠高於單執行緒。不過多執行緒通訊有一個很大的缺陷——嚴重依賴執行緒,通常在Linux環境下並沒有執行緒的概念,此時,執行緒的本質就是進程了,此時執行緒的創建銷毀,以及執行緒(上下文)的切換將導致很大的開銷,因此,基於這些原因,導致了執行緒資源不能隨便的使用,當我們面對大量的客戶端連接伺服器的時候,並不能一味的去瘋狂創建執行緒。此時,NIO就可以幫助我們解決此類問題。

2. 多路復用的NIO(New IO)——同步非阻塞

     BIO模型中,因為在進行IO操作的時候,程式無法知道數據到底準備好沒有,能否可讀,只能一直乾等著,而且即便我們可以猜到什麼時候數據準備好了,但我們也沒有辦法通過socket.read()或者socket.write()函數去返回,而NIO卻可以通過I/O復用技術把這些連接請求註冊到多路復用器Selector中去,用一個執行緒去監聽和處理多個SocketChannel上的事件。

BufferByte和Channel

      在NIO中並不是以流的方式來處理數據的,而是以buffer緩衝區和Channel管道(全雙工)配合使用來處理數據。這裡可以用鐵路交通來類比兩者的關係,假設現在有一批貨物要從北京運到上海且用鐵路運輸,則要有一條從北京到上海的鐵路,以及一列運輸貨物的火車,這裡貨物就是客戶端和服務端的交流的資訊,Channel管道則是從北京到上海的鐵路,而buffer緩衝區則是這列運輸火車。 其中Channel分為四類:

  • FileChannel: 用於文件IO,支援阻塞模式。可以通過InputStream/OutputStream/RandomAccssFile去獲取該對象。該Channel的用法在後面的文件傳輸示例程式碼中有展示,

  • DatagramChannel: 用於UDP通訊。

  • SocketChannel: 用於TCP的客戶端通訊。客戶端通過SocketChannel.open()獲得該對象。

  • ServerSocketChannel: 用於TCP的服務端通訊。服務端通過ServerSocketChannel.open()獲得該對象。

服務端ServerSocketChannel可以通過調用accept方法返回新建立的SocketChannel對象,通過該對象調用wriet/read(ByteBuffer)來將數據寫入通道或從通道中讀取數據。而ByteBuffer的用法,主要涉及到幾個變數:capacity,position,limit和mark,具體含義如下程式碼所示,如果要讀取buffer中的數據必須調用flip方法,通過改變position和limit的值,來讀取兩個下標之間數據。如下所示:

public class Test1 {      public static void main(String[] args) {          // 創建一個緩衝區          ByteBuffer byteBuffer = ByteBuffer.allocate(1024);  ​          // 看一下初始時4個核心變數的值          //limit 緩衝區里的數據的總數          System.out.println("初始時-->limit--->"+byteBuffer.limit());          //position 下一個要被讀或寫的元素的位置          System.out.println("初始時-->position--->"+byteBuffer.position());          //capacity 緩衝區能夠容納的數據元素的最大數量。          System.out.println("初始時-->capacity--->"+byteBuffer.capacity());          //mark 一個備忘位置。用於記錄上一次讀寫的位置。          System.out.println("初始時-->mark--->" + byteBuffer.mark());  ​          System.out.println("--------------------------------------");  ​          // 添加一些數據到緩衝區中          String s = "testing.....";          byteBuffer.put(s.getBytes());  ​          // 看一下初始時4個核心變數的值          System.out.println("put完之後-->limit--->"+byteBuffer.limit());          System.out.println("put完之後-->position--->"+byteBuffer.position());          System.out.println("put完之後-->capacity--->"+byteBuffer.capacity());          System.out.println("put完之後-->mark--->" + byteBuffer.mark());            //讀數據前要調用,可以指示讀數據的操作從position讀到limit之間的數據          byteBuffer.flip();  ​          System.out.println("--------------------------------------");          System.out.println("flip完之後-->limit--->"+byteBuffer.limit());          System.out.println("flip完之後-->position--->"+byteBuffer.position());          System.out.println("flip完之後-->capacity--->"+byteBuffer.capacity());          System.out.println("flip完之後-->mark--->" + byteBuffer.mark());  ​          // 創建一個limit()大小的位元組數組(因為就只有limit這麼多個數據可讀)          byte[] bytes = new byte[byteBuffer.limit()];  ​          // 將讀取的數據裝進我們的位元組數組中          byteBuffer.get(bytes);  ​          // 輸出數據          System.out.println(new String(bytes, 0, bytes.length));      }  }  /*output  初始時-->limit--->1024  初始時-->position--->0  初始時-->capacity--->1024  初始時-->mark--->java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]  --------------------------------------  put完之後-->limit--->1024  put完之後-->position--->12  put完之後-->capacity--->1024  put完之後-->mark--->java.nio.HeapByteBuffer[pos=12 lim=1024 cap=1024]  --------------------------------------  flip完之後-->limit--->12  flip完之後-->position--->0  flip完之後-->capacity--->1024  flip完之後-->mark--->java.nio.HeapByteBuffer[pos=0 lim=12 cap=1024]  testing.....  */  

 一些用NIO模型實現的簡單demo,可以查看[github地址],有文件傳輸以及多客戶端廣播的demo。​

 NIO是Java SE 1.4版,為了提升網路傳輸性能而設計的新版本的IO,注意,這裡的優化主要針對的是網路通訊方面的socket的優化。如下程式可以測試針對本地文件IO,兩者的異同。

public class FileTransformCompare {      //傳統方式      private long transferFile(File source, File dest) throws IOException {          long startTime = System.currentTimeMillis();          if(!dest.exists())              dest.createNewFile();          BufferedInputStream bis = new BufferedInputStream(new FileInputStream(source));          BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dest));          //將數據從源讀到目的文件          byte[] bytes = new byte[1024];          int len = 0;          while ((len = bis.read(bytes))>0){              bos.write(bytes, 0, len);          }          long endTime = System.currentTimeMillis();  ​          return endTime - startTime;      }      //NIO方式      private long transferFileFileWithNio(File source, File dest) throws IOException {          long startTime = System.currentTimeMillis();          if(!dest.exists())              dest.createNewFile();          RandomAccessFile sourceRAF = new RandomAccessFile(source, "rw");          RandomAccessFile destRAF = new RandomAccessFile(dest, "rw");          FileChannel readChannel = sourceRAF.getChannel();          FileChannel writeChannel = destRAF.getChannel();          ByteBuffer byteBuffer = ByteBuffer.allocate(1024*1024); //1M緩衝區          while (readChannel.read(byteBuffer) > 0){              byteBuffer.flip();              writeChannel.write(byteBuffer);              byteBuffer.clear();          }          writeChannel.close();          readChannel.close();          long endTime = System.currentTimeMillis();          return endTime - startTime;      }  ​      public static void main(String[] args) throws IOException {          FileTransformCompare ftc = new FileTransformCompare();  //        File source = new File("F:\apache-maven-3.6.2-bin.tar.gz");  //        File dest1 = new File("G:\迅雷下載\apache1.tar.gz");  //        File dest2 = new File("G:\迅雷下載\apache2.tar.gz");          File source = new File("G:\迅雷下載\影視\戰爭之王.BD1280超清國英雙語中英雙字.mp4");          File dest1 = new File("G:\迅雷下載\test1.mp4");          File dest2 = new File("G:\迅雷下載\test2.mp4");          long time = ftc.transferFile(source, dest1);          System.out.println("普通位元組流時間: " + time);          long timeNio = ftc.transferFileFileWithNio(source, dest2);          System.out.println("NIO時間: " + timeNio);      }  }  ​  /*      當文件的大小較小的時候,NIO會比傳統IO好一點,但是文件較大的時候,則NIO不如傳統IO      下面結果是複製一部2.6G的電影的結果:       普通位元組流時間: 79745      NIO時間: 80160  */

也就是說,通常談到NIO的時候,只會針對網路編程來說。

3. AIO 非同步非阻塞I/O

  NIO的非阻塞模式採用多路復用器(Selector),用一個執行緒不斷的去輪詢所有的通道,一旦某個通道有數據可讀(或可寫),則表示該通道數據以及準備好(通道可寫),那麼這個通道就會被選擇出來,對它進行讀寫操作,但是要注意的是在執行讀寫操作的執行緒本身就是堵塞的,要等待該對該通道的數據操作完成,執行緒才可以去操作其他通道。

而AIO(Asynchronous IO)則是由作業系統在IO操作完成之後再去通知調用者,這就意味著執行程式的執行緒再發起讀寫操作的時候總是立即返回的,這個時候可以去做其他的事情,當底層讀寫操作完成的時候,將由作業系統通過調用相應的回調函數將已經讀到的函數交給程式進行處理(寫入過程一樣)。正因如此,會導致不同的作業系統上的性能表現會不同,在Linux系統中AIO的底層系統實現是epoll函數(NIO的底層實現是select函數或者poll函數——兩者的區別在於能存儲文件描述符的數量有關,因為**select存放文件描述符的地方是一個數組,而poll則是用鏈表去存儲**)

AIO主要針對一些非同步的IO操作,作業系統執行完讀寫事件後就會調用程式的回調函數—— java.util.concurrent.Future對象和java.nio.channels.CompletionHandler,而Future是基於CompletionHandler的封裝。因為該過數據的讀寫都是由作業系統負責,則回調函數只需要負責準備發送數據或者解析讀取的數據即可。

主要的API如下

1. AsynchronousChannelGroup——非同步通訊組,非同步通道在處理 I/O請求時,需要使用一個AsynchronousChannelGroup類,該類的對象表示的是一個非同步通道的分組,每一個分組都有一個執行緒池與之對應,需要使用AsynchronousChannelGroup類的靜態工廠方法withThreadPool(ExectorService es); withFixedThreadPool();withCachedThreadPool()設置執行緒池。

AsynchronousServerSocketChannel: 非同步版的ServerSocketChannel,其accpet方法有兩種:

//第一種  AsynchronousServerSocketChannel server      = AsynchronousServerSocketChannel.open().bind(null);  Future<AsynchronousSocketChannel> future = server.accept();  ​  future.isDone();        //返回對象來查詢操作的狀態  future.isCancelled();   //明確檢查操作是否被取消,如果操作在正常完成之前被取消,則它返回true  future.cancel(true);    //取消操作  AsynchronousSocketChannel client= future.get(); //使用get()方法,該方法將阻塞等待結果的返回:  AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS); //也可以設置阻塞時間  ​  //第二種  AsynchronousServerSocketChannel listener    = AsynchronousServerSocketChannel.open().bind(null);    listener.accept(    attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {      public void completed(        AsynchronousSocketChannel client, Object attachment) {            // do whatever with client        }      public void failed(Throwable exc, Object attachment) {            // handle failure        }    });  ​

2.AsynchronousSocketChannel非同步版的SocketChannel,提供了兩種的read()和write()方法。

    • void read(ByteBuffer buffer, A attachment, CompletionHandler handler);

    • void write(ByteBuffer buffer, A attachment, CompletionHandler handler);

    • Future<Integer> read(ByteBuffer buffer);

    • Future<Integer> write(ByteBuffer buffer);

3. CompletionHandler的回調介面,當IO操作完成的時候,即會調用這兩個方法:

  • void complete(V result, A attachment)

    當IO操作順利完成的時候被調用,對於accept方法返回Socket通道,對於read/write操作,則返回本次寫入或讀取的位元組數。

  • void failed(Throwable exe, A attachment)

    當IO操作失敗的時候被調用,建議在此方法中對連接等資源進行關閉和釋放。

關於AIO的demo可以參照github地址上的程式碼,實現一個前台輸入表達式,後端計算後返回結果的功能。

參考文獻

  1. 如何學習Java的NIO?

  2. Java NIO淺析