Java網路編程與NIO詳解2:JAVA NIO 一步步構建IO多路復用的請求模型

  • 2019 年 11 月 21 日
  • 筆記

本文轉載自:htJava網路編程與NIO詳解2:JAVA NIO 一步步構建IO多路復用的請求模型

ylife.com

該系列博文會告訴你如何從電腦網路的基礎知識入手,一步步地學習Java網路基礎,從socket到nio、bio、aio和netty等網路編程知識,並且進行實戰,網路編程是每一個Java後端工程師必須要學習和理解的知識點,進一步來說,你還需要掌握Linux中的網路編程原理,包括IO模型、網路編程框架netty的進階原理,才能更完整地了解整個Java網路編程的知識體系,形成自己的知識框架。

為了更好地總結和檢驗你的學習成果,本系列文章也會提供部分知識點對應的面試題以及參考答案。

如果對本系列文章有什麼建議,或者是有什麼疑問的話,也可以關注公眾號【Java技術江湖】聯繫作者,歡迎你參與本系列博文的創作和修訂。

  • 文章一:JAVA 中原生的 socket 通訊機制

當前環境

  1. jdk == 1.8

程式碼地址

git 地址:https://github.com/jasonGeng88/java-network-programming

知識點

  • nio 下 I/O 阻塞與非阻塞實現
  • SocketChannel 介紹
  • I/O 多路復用的原理
  • 事件選擇器與 SocketChannel 的關係
  • 事件監聽類型
  • 位元組緩衝 ByteBuffer 數據結構

場景

接著上一篇中的站點訪問問題,如果我們需要並發訪問10個不同的網站,我們該如何處理?

在上一篇中,我們使用了 java.net.socket類來實現了這樣的需求,以一執行緒處理一連接的方式,並配以執行緒池的控制,貌似得到了當前的最優解。可是這裡也存在一個問題,連接處理是同步的,也就是並發數量增大後,大量請求會在隊列中等待,或直接異常拋出。

為解決這問題,我們發現元兇處在「一執行緒一請求」上,如果一個執行緒能同時處理多個請求,那麼在高並發下性能上會大大改善。這裡就借住 JAVA 中的 nio 技術來實現這一模型。

nio 的阻塞實現

關於什麼是 nio,從字面上理解為 New IO,就是為了彌補原本 I/O 上的不足,而在 JDK 1.4 中引入的一種新的 I/O 實現方式。簡單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實現方式(當然,默認實現方式是阻塞的。)。

下面,我們先來看下 nio 以阻塞方式是如何處理的。

建立連接

有了上一篇 socket 的經驗,我們的第一步一定也是建立 socket 連接。只不過,這裡不是採用 newsocket() 的方式,而是引入了一個新的概念 SocketChannel。它可以看作是 socket 的一個完善類,除了提供 Socket 的相關功能外,還提供了許多其他特性,如後面要講到的向選擇器註冊的功能。建立連接程式碼實現:

// 初始化 socket,建立 socket 與 channel 的綁定關係  SocketChannel socketChannel = SocketChannel.open();  // 初始化遠程連接地址  SocketAddress remote = new InetSocketAddress(this.host, port);  // I/O 處理設置阻塞,這也是默認的方式,可不設置  socketChannel.configureBlocking(true);  // 建立連接  socketChannel.connect(remote);

獲取 socket 連接

因為是同樣是 I/O 阻塞的實現,所以後面的關於 socket 輸入輸出流的處理,和上一篇的基本相同。唯一差別是,這裡需要通過 channel 來獲取 socket 連接。

  • 獲取 socket 連接
Socket socket = socketChannel.socket();
  • 處理輸入輸出流
PrintWriter pw = getWriter(socketChannel.socket());  BufferedReader br = getReader(socketChannel.socket());

完整示例

package com.jason.network.mode.nio;    import com.jason.network.constant.HttpConstant;  import com.jason.network.util.HttpUtil;    import java.io.*;  import java.net.InetSocketAddress;  import java.net.Socket;  import java.net.SocketAddress;  import java.nio.channels.SocketChannel;    public class NioBlockingHttpClient {        private SocketChannel socketChannel;      private String host;        public static void main(String[] args) throws IOException {            for (String host: HttpConstant.HOSTS) {                NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);              client.request();            }        }        public NioBlockingHttpClient(String host, int port) throws IOException {          this.host = host;          socketChannel = SocketChannel.open();          socketChannel.socket().setSoTimeout(5000);          SocketAddress remote = new InetSocketAddress(this.host, port);          this.socketChannel.connect(remote);      }        public void request() throws IOException {          PrintWriter pw = getWriter(socketChannel.socket());          BufferedReader br = getReader(socketChannel.socket());            pw.write(HttpUtil.compositeRequest(host));          pw.flush();          String msg;          while ((msg = br.readLine()) != null){              System.out.println(msg);          }      }        private PrintWriter getWriter(Socket socket) throws IOException {          OutputStream out = socket.getOutputStream();          return new PrintWriter(out);      }        private BufferedReader getReader(Socket socket) throws IOException {          InputStream in = socket.getInputStream();          return new BufferedReader(new InputStreamReader(in));      }  }

nio 的非阻塞實現

原理分析

nio 的阻塞實現,基本與使用原生的 socket 類似,沒有什麼特別大的差別。

我們主要觀察圖中的前三種 I/O 模型,關於非同步 I/O,一般需要依靠作業系統的支援,這裡不討論。

  • 第一階段:等待數據就緒;
  • 第二階段:將已就緒的數據從內核緩衝區拷貝到用戶空間;

這裡產生了一個從內核到用戶空間的拷貝,主要是為了系統的性能優化考慮。假設,從網卡讀到的數據直接返回給用戶空間,那勢必會造成頻繁的系統中斷,因為從網卡讀到的數據不一定是完整的,可能斷斷續續的過來。通過內核緩衝區作為緩衝,等待緩衝區有足夠的數據,或者讀取完結後,進行一次的系統中斷,將數據返回給用戶,這樣就能避免頻繁的中斷產生。

了解了 I/O 阻塞的兩個階段,下面我們進入正題。看看一個執行緒是如何實現同時處理多個 I/O 調用的。從上圖中的非阻塞 I/O 可以看出,僅僅只有第二階段需要阻塞,第一階段的數據等待過程,我們是不需要關心的。不過該模型是頻繁地去檢查是否就緒,造成了 CPU 無效的處理,反而效果不好。如果有一種類似的好萊塢原則— 「不要給我們打電話,我們會打給你」 。這樣一個執行緒可以同時發起多個 I/O 調用,並且不需要同步等待數據就緒。在數據就緒完成的時候,會以事件的機制,來通知我們。這樣不就實現了單執行緒同時處理多個 IO 調用的問題了嗎?即所說的「I/O 多路復用模型」。


廢話講了一大堆,下面就來實際操刀一下。

創建選擇器

由上面分析可以,我們得有一個選擇器,它能監聽所有的 I/O 操作,並且以事件的方式通知我們哪些 I/O 已經就緒了。

程式碼如下:

import java.nio.channels.Selector;    ...    private static Selector selector;  static {      try {          selector = Selector.open();      } catch (IOException e) {          e.printStackTrace();      }  }

創建非阻塞 I/O

下面,我們來創建一個非阻塞的 SocketChannel,程式碼與阻塞實現類型,唯一不同是 socketChannel.configureBlocking(false)

注意:只有在 socketChannel.configureBlocking(false)之後的程式碼,才是非阻塞的,如果 socketChannel.connect()在設置非阻塞模式之前,那麼連接操作依舊是阻塞調用的。

SocketChannel socketChannel = SocketChannel.open();  SocketAddress remote = new InetSocketAddress(host, port);  // 設置非阻塞模式  socketChannel.configureBlocking(false);  socketChannel.connect(remote);

建立選擇器與 socket 的關聯

選擇器與 socket 都創建好了,下一步就是將兩者進行關聯,好讓選擇器和監聽到 Socket 的變化。這裡採用了以 SocketChannel 主動註冊到選擇器的方式進行關聯綁定,這也就解釋了,為什麼不直接 newSocket(),而是以 SocketChannel的方式來創建 socket。

程式碼如下:

socketChannel.register(selector,                          SelectionKey.OP_CONNECT                          | SelectionKey.OP_READ                          | SelectionKey.OP_WRITE);

上面程式碼,我們將 socketChannel 註冊到了選擇器中,並且對它的連接、可讀、可寫事件進行了監聽。

具體的事件監聽類型如下:

操作類型

描述

所屬對象

OP_READ

1 << 0

讀操作

SocketChannel

OP_WRITE

1 << 2

寫操作

SocketChannel

OP_CONNECT

1 << 3

連接socket操作

SocketChannel

OP_ACCEPT

1 << 4

接受socket操作

ServerSocketChannel

選擇器監聽 socket 變化

現在,選擇器已經與我們關心的 socket 進行了關聯。下面就是感知事件的變化,然後調用相應的處理機制。

這裡與 Linux 下的 selector 有點不同,nio 下的 selecotr 不會去遍歷所有關聯的 socket。我們在註冊時設置了我們關心的事件類型,每次從選擇器中獲取的,只會是那些符合事件類型,並且完成就緒操作的 socket,減少了大量無效的遍歷操作。

public void select() throws IOException {    // 獲取就緒的 socket 個數    while (selector.select() > 0){          // 獲取符合的 socket 在選擇器中對應的事件句柄 key        Set keys = selector.selectedKeys();          // 遍歷所有的key        Iterator it = keys.iterator();        while (it.hasNext()){              // 獲取對應的 key,並從已選擇的集合中移除            SelectionKey key = (SelectionKey)it.next();            it.remove();              if (key.isConnectable()){                // 進行連接操作                connect(key);            }            else if (key.isWritable()){                // 進行寫操作                write(key);            }            else if (key.isReadable()){                // 進行讀操作                receive(key);            }        }    }}

注意:這裡的 selector.select()是同步阻塞的,等待有事件發生後,才會被喚醒。這就防止了 CPU 空轉的產生。當然,我們也可以給它設置超時時間, selector.select(longtimeout)來結束阻塞過程。

處理連接就緒事件

下面,我們分別來看下,一個 socket 是如何來處理連接、寫入數據和讀取數據的(這些操作都是阻塞的過程,只是我們將等待就緒的過程變成了非阻塞的了)。

處理連接程式碼:

// SelectionKey 代表 SocketChannel 在選擇器中註冊的事件句柄  private void connect(SelectionKey key) throws IOException {      // 獲取事件句柄對應的 SocketChannel      SocketChannel channel = (SocketChannel) key.channel();       // 真正的完成 socket 連接      channel.finishConnect();       // 列印連接資訊      InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();      String host = remote.getHostName();      int port = remote.getPort();      System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));  }

處理寫入就緒事件

// 字符集處理類  private Charset charset = Charset.forName("utf8");    private void write(SelectionKey key) throws IOException {      SocketChannel channel = (SocketChannel) key.channel();      InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();      String host = remote.getHostName();        // 獲取 HTTP 請求,同上一篇      String request = HttpUtil.compositeRequest(host);        // 向 SocketChannel 寫入事件      channel.write(charset.encode(request));        // 修改 SocketChannel 所關心的事件      key.interestOps(SelectionKey.OP_READ);  }

這裡有兩個地方需要注意:

  • 第一個是使用 channel.write(charset.encode(request)); 進行數據寫入。有人會說,為什麼不能像上面同步阻塞那樣,通過 PrintWriter包裝類進行操作。因為 PrintWriterwrite() 方法是阻塞的,也就是說要等數據真正從 socket 發送出去後才返回。

這與我們這裡所講的阻塞是不一致的,這裡的操作雖然也是阻塞的,但它發生的過程是在數據從用戶空間到內核緩衝區拷貝過程。至於系統將緩衝區的數據通過 socket 發送出去,這不在阻塞範圍內。也解釋了為什麼要用 Charset 對寫入內容進行編碼了,因為緩衝區接收的格式是 ByteBuffer

  • 第二,選擇器用來監聽事件變化的兩個參數是 interestOpsreadyOps
    • interestOps:表示 SocketChannel 所關心的事件類型,也就是告訴選擇器,當有這幾種事件發生時,才來通知我。這裡通過 key.interestOps(SelectionKey.OP_READ);告訴選擇器,之後我只關心「讀就緒」事件,其他的不用通知我了。
    • readyOps:表示 SocketChannel 當前就緒的事件類型。以 key.isReadable()為例,判斷依據就是:return(readyOps()&OP_READ)!=0;

處理讀取就緒事件

private void receive(SelectionKey key) throws IOException {      SocketChannel channel = (SocketChannel) key.channel();      ByteBuffer buffer = ByteBuffer.allocate(1024);      channel.read(buffer);      buffer.flip();      String receiveData = charset.decode(buffer).toString();        // 當再沒有數據可讀時,取消在選擇器中的關聯,並關閉 socket 連接      if ("".equals(receiveData)) {          key.cancel();          channel.close();          return;      }        System.out.println(receiveData);  }

這裡的處理基本與寫入一致,唯一要注意的是,這裡我們需要自行處理去緩衝區讀取數據的操作。首先會分配一個固定大小的緩衝區,然後從內核緩衝區中,拷貝數據至我們剛分配固定緩衝區上。這裡存在兩種情況:

  • 我們分配的緩衝區過大,那多餘的部分以0補充(初始化時,其實會自動補0)。
  • 我們分配的緩衝去過小,因為選擇器會不停的遍歷。只要 SocketChannel 處理讀就緒狀態,那下一次會繼續讀取。當然,分配過小,會增加遍歷次數。

最後,將一下 ByteBuffer 的結構,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 為例,講下各屬性的作用(mark 主要是用來標記之前 position 的位置,是在當前 postion 無法滿足的情況下使用的,這裡不作討論)。

從圖中看出,

  • 容量(capacity):表示緩衝區可以保存的數據容量;
  • 極限(limit):表示緩衝區的當前終點,即寫入、讀取都不可超過該重點;
  • 位置(position):表示緩衝區下一個讀寫單元的位置;

完整程式碼

package com.jason.network.mode.nio;    import com.jason.network.constant.HttpConstant;  import com.jason.network.util.HttpUtil;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.net.SocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.nio.charset.Charset;  import java.util.Iterator;  import java.util.Set;    public class NioNonBlockingHttpClient {        private static Selector selector;      private Charset charset = Charset.forName("utf8");        static {          try {              selector = Selector.open();          } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) throws IOException {            NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();            for (String host: HttpConstant.HOSTS) {                client.request(host, HttpConstant.PORT);            }            client.select();        }        public void request(String host, int port) throws IOException {          SocketChannel socketChannel = SocketChannel.open();          socketChannel.socket().setSoTimeout(5000);          SocketAddress remote = new InetSocketAddress(host, port);          socketChannel.configureBlocking(false);          socketChannel.connect(remote);          socketChannel.register(selector,                          SelectionKey.OP_CONNECT                          | SelectionKey.OP_READ                          | SelectionKey.OP_WRITE);      }        public void select() throws IOException {          while (selector.select(500) > 0){              Set keys = selector.selectedKeys();                Iterator it = keys.iterator();                while (it.hasNext()){                    SelectionKey key = (SelectionKey)it.next();                  it.remove();                    if (key.isConnectable()){                      connect(key);                  }                  else if (key.isWritable()){                      write(key);                  }                  else if (key.isReadable()){                      receive(key);                  }              }          }      }        private void connect(SelectionKey key) throws IOException {          SocketChannel channel = (SocketChannel) key.channel();          channel.finishConnect();          InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();          String host = remote.getHostName();          int port = remote.getPort();          System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));      }        private void write(SelectionKey key) throws IOException {          SocketChannel channel = (SocketChannel) key.channel();          InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();          String host = remote.getHostName();            String request = HttpUtil.compositeRequest(host);          System.out.println(request);            channel.write(charset.encode(request));          key.interestOps(SelectionKey.OP_READ);      }        private void receive(SelectionKey key) throws IOException {          SocketChannel channel = (SocketChannel) key.channel();          ByteBuffer buffer = ByteBuffer.allocate(1024);          channel.read(buffer);          buffer.flip();          String receiveData = charset.decode(buffer).toString();            if ("".equals(receiveData)) {              key.cancel();              channel.close();              return;          }            System.out.println(receiveData);      }  }

總結

本文從 nio 的阻塞方式講起,介紹了阻塞 I/O 與非阻塞 I/O 的區別,以及在 nio 下是如何一步步構建一個 IO 多路復用的模型的客戶端。文中需要理解的內容比較多,如果有理解錯誤的地方,歡迎指正~

後續

  • Netty 下的非同步請求實現

推薦閱讀

(點擊標題可跳轉閱讀)

夯實Java基礎系列16:一文讀懂Java IO流和常見面試題

夯實Java基礎系列15:Java註解簡介和最佳實踐

夯實Java基礎系列14:深入理解Java枚舉類

夯實Java基礎系列11:深入理解Java中的回調機制

夯實Java基礎系列10:深入理解Java中的異常體系

夯實Java基礎系列9:深入理解Class類和Object類

夯實Java基礎系列8:深入理解Java內部類及其實現原理

夯實Java基礎系列7:一文讀懂Java 程式碼塊和程式碼執行順序

一文搞懂抽象類和介面,從基礎到面試題,揭秘其本質區別!

一文讀懂 Java 文件和包結構,解讀開發中常用的 jar 包

一文了解 final 關鍵字的特性、使用方法以及實現原理

點個「在看」,轉發朋友圈,都是對我最好的支援!