淺談NIO

  • 2019 年 10 月 27 日
  • 筆記

淺談NIO

說到NIO大家都不會陌生,它是JDK中提供的IO工具集。 它又被稱作為New I/ONon Blocking I/O。相較於傳統面向流的java.io,nio是完全面向緩衝的I/O,它提供了更底層的操作。

如果你了解C語言那麼你一定接觸過標準I/O庫(stdio.h),其中實現的函數都是面向流的,而操作系統底層I/O函數(系統調用)都是基於緩衝去實現的(感興趣可以參閱一些標準庫的實現,例如glibc),標準I/O是在其上實現的高級API,它能對每個流的緩衝自動地進行管理,屏蔽掉了我們在管理緩衝上的複雜。實際上流也可以看作是一種很特殊的「緩衝」,比如將流看作一段段連續地緩衝塊。

基於流的I/O是獨立於操作系統的設計,它依賴操作系統底層的I/O模型,也帶入了I/O阻塞的問題。值得一提的是它的出現比socket要早很多年,或者說在此之前I/O的阻塞可能都不構成一個問題。為什麼這樣說呢?我們先來討論下什麼是阻塞。

阻塞IO

我們都知道,CPU從寄存器中讀取是最快的,其次CPU上的緩存,而讀取磁盤相對來說是非常慢的。拿3.3GHz主頻CPU為例,它的時鐘周期為3ns,假設固態硬盤一次順序讀取需要50μs,那就是相當於16.6萬個時鐘周期,如果換算成秒的話CPU需要等待2天才有數據進入。

下圖出自《Systems Performance》,表中列出一些事件的延遲時間。

所以調用一些直接與設備交互的函數(系統調用或者系統函數)CPU都會產生空閑,這種空閑會「阻塞」住進程。實際上大多數設備的操作,比如從硬盤上的文件系統中讀取一個文本,除非是遇到硬件錯誤,它們都是能很快返回的。我們常常討論的阻塞問題都是一些低速設備,例如網卡終端通道等,它們大多是一些被動的IO,如果不能從對端讀取到數據很可能就一直阻塞下去。

阻塞類似於一個長時間的睡眠,在阻塞發生時進程處於一種假死狀態,這時進程除了能被信號中斷外CPU將不會繼續往下執行指令。

考慮有以下服務器程序,serverSocket為服務器套接字實例,readMsg函數負責讀取客戶端套接字的邏輯。

while (true) {     Socket socket = serverSocket.accpet();     readMsg(socket);  }

我們可以使用telnet連接上這個服務,但如果我們什麼都不輸入,進程將被一直阻塞在readMsg函數。嚴重的是,我們的程序會成為了一對一的服務器程序。如果此時的連接不斷開,其他用戶試圖連入時也會被阻塞住,這樣的用戶體驗是非常糟糕的。

看到CPU會被讀取函數阻塞住,可能有人就會想到現代CPU都是多核架構,我們可以使用其它核去建立一個新連接就解決了。確實,可以使用多線程去改進它,考慮到創建線程的開銷我們會用到線程池。

while (true) {     Socket socket = serverSocket.accpet();     threadPool.submit(() -> readMsg(socket));  }

這樣,我們就可以在新的線程中處理連接了,僅實現一個簡單的web服務器也能有比較不錯的性能。但是資源始終是有限的,如果處理請求的函數都是需要長時間等待的又或者根本就是惡意的連接,它們還是會佔滿所有的資源,後續連接依然會被阻塞。

在套接字(socket)的實現中,提供了相關的選項可以讓發送端或接收端超時。它能讓socket在超過指定時間沒有收到響應就返回一個錯誤而不是一直阻塞。JDKSocket API也提供一個方法給套接字設置超時時間 – setSoTimeout(int),如果函數超出指定時間沒有返回,那麼將會拋出一個SocketTimeoutException,經過修改我們得到以下的加強版。

serverSocket.setSoTimeout(200);  while (true) {     Socket socket = null;     try {         socket = serverSocket.accpet();     } catch (SocketTimeoutException e) {         continue;     }       threadPool.submit(() -> {         socket.setSoTimeout(50);         readMsg(socket);     });  }

通過設置超時時間得到一種非阻塞的假象,吞吐量得到稍微改善,但是依然沒有本質上解決阻塞問題。

接下來我們來試試用NIO來解決阻塞帶來的問題。

NIO

在JDK標準實現中,NIO提供了與傳統IO完全不同的API來完成同樣的事,它也提供了更多、更複雜的IO模型。NIO主要包括四大基本組件 – 通道(Channel)選擇器(Selector)緩衝(Buffer)字符集(Charsets),本文會簡單介紹前三個。

通道(Channel)

Nio為打開的IO設備提供了不同的抽象 – Channel,要用nio操縱一個設備需要一個Channel對象。

關於Channel的能力可以參閱官方文檔中java.nio.channels包下的接口介紹。需要提一下的是ByteChannel,它實現了ReadableByteChannelWritableByteChannel,也就是說它同時具備讀和寫的能力,這是有別於的設計,因為大多數流得實現都只具備輸入或輸出中的一種能力。當然,從命名上已經能看出區別了,現實中的是單向的,而通道可以是雙向的。在NIO中操縱套接字的SocketChannel也實現了ByteChannel,所以我們可以直接使用它讀寫套接字。

ServerSocketChannelSocketChannel這兩個抽象類分別作為服務器套接字通道和客戶端套接字通道的抽象,他們都繼承了SelectableChannel,這關係著套接字通道的另外兩項非常重要的能力 – 非阻塞I/O多路復用(multiplexing)。 我們先討論下非阻塞I/O,它提供了一個configureBlocking(blooean)方法,它用於設置套接字操作是否阻塞。這意味着當在打開的ServerSocketSocket上設置非阻塞,之前會被阻塞的地方都能立即返回,例如,在ServerSocket使用accept,沒有請求時會立即返回一個null

值得一提的是,ServerSocketChannelSocketChannel具體實現並沒有包含在java.*包中而是在sun.nio.*,這部分的源碼在Oracle提供的JDK中並沒有公開。 感興趣的同學可以去OpenJDK的源碼中參照實現。

下面是我們使用NIO創建的一個服務端程序:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  serverSocketChannel.bind(new InetSocketAddress(PORT));  serverSocketChannel.configureBlocking(false);  while (true) {     SocketChannel socketChannel = null;     while ((socketChannel = serverSocketChannel.accept()) == null) {         // 沒有請求進入時     }       socketChannel.configureBlocking(false);     socketChannel.read(buff);  }

我這裡是使用一個輪詢(polling),這樣的缺點是如果沒有連接進入時,CPU就會不停的執行指令,通過top等工具我們也能很清楚的看到這時候CPU的負荷很高。由於大多數的服務器程序都不是計算密集型的,我們需要適當的讓CPU空閑而不是一直去執行無意義的指令。這種模型是不適合直接在實際應用中使用的,一般它都會和多路復用模型搭配使用。

多路復用(Selector)

其實操作系統設計者們早就考慮到阻塞式I/O帶的一些問題,所以在很早的時候操作系統中就有一種multiplexing的I/O模型。早在1983年發佈的BSD4.2中就引入了系統調用select,值得一提的是這個版本還首次引入了socket套接字API,很難不讓人聯想它的存在就是為了解決套接字這類「低速」設備上的阻塞I/O問題的。

在Unix標準實現中提供的selectpoll系統調用都可以說否是多路復用的實現,它們提供了讓內核通知進程I/O已準備就緒的能力,這樣我們就能串行操縱多個打開I/O設備

比較有意思的是某些Unix操作系統中,比如LinuxBSD中都包含一些非標準的實現,它們具備更優的性能,感興趣的同學可以參照epoll_create(2)kqueue(2)

在Jdk的實現中,java.nio提供一個名為Selector的抽象類,從名字可以看出它是具備類似於select系統調用的功能。與SocketChannel同樣它的實現也在sun.nio.*包中,它會根據操作系統提供不同的實現,例如,在linux會使用epollBSD中會使用kqueue,從而提供更好的性能。

我們可以直接使用它提供的靜態工廠方法即可創建selector:

Selector selector = Selector.open();

既然是要操作系統內核去通知進程,那自然需要有對應事件的處理。做過awt或者web的都應該清楚,如果要處理某一種事件(例如,點擊一個表單上的按鈕),就需要註冊對應事件到事件監聽器上。Selector也同樣,我們需要對創建的套接字註冊監聽事件。

前文提到SocketChannel所繼承的SelectableChannel是為套接字channel提供多路復用能力。

通過文檔我們在SelectableChannel中找到一個SelectionKey register(Selector sel, int ops)方法。注意,這裡要註冊一個事件到Selector是使用SelectableChannel的能力。

  • 返回值SelectionKey代表着一個已經被註冊到Selector中的SelectableChannel實例。
  • sel自然是需要我們傳入上面創建的selector
  • ops代表需要監聽的事件, 它們被定義在中SelectionKey中,例如需要監聽accept事件只需要調用register(selector, SelectionKey.OP_ACCEPT),也可以傳入多個事件,需要用或運算符|串起來 SelectionKey.OP_READ | SelectionKey.OP_WRITE,這是*nix系統函數的慣用傳參法。

接下來使用Selector中的int select()方法,它將返回到來事件的個數。這個方法將會一直阻塞到有事件發生,所以一般會使用另一個帶long參數的版本int select(long timeout)。它接收一個超時時間,當阻塞超時方法就會立即返回0

下面來演示一個多路復用服務端的實現:

serverSocketChannel.configureBlocking(false);  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  while (true ) {     if (selector.select(TIME_OUT) == 0) {         //...         continue;     }     //...  }

當有事件到來時,例如有客戶端連入,select()方法的返回事件個數使得程序跳出if分支。

這時候調用Selector.selectedKeys()會返回一個SelectionKey集合,代表着需要處理的事件。

SelectionKey中定義了一系列boolean is*()方法對了應事件類型,這樣我們就可以根據事件類型定義不同的操作

while (true) {     ...     Set<SelectionKey> set = selector.selectedKeys();       set.forEach((selectionKey) -> {         if (selectionKey.isAcceptable()) {             try {                 SocketChannel socketChannel = serverSocketChannel.accept();                 socketChannel.configureBlocking(false);                 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(16));             } catch (IOException e) {                 //             }         }         if (selectionKey.isReadable()) {             SocketChannel socketChannel = selectionKey.channel();             ByteBuffer buf = (ByteBuffer) selectionKey.attachment();             readMsg(socketChannel, buff);         }         ...     });       // 使用完畢需要清空     set.clear();  }

在處理accept事件時我們可以把建立的客戶端套接字也註冊到Selector上,它使用SelectionKey.selector()獲取與之關聯的Selector。這裡使用了之前register(Selector sel, int ops)的一個重載版本, 它在原本基礎上還需要額外的第三個參數 – 一個Object數據,代表一個與SelectorKey相關聯的附件。這樣我們在下次需要處理讀取寫入事件時直接取出它即可,節省了每次創建緩衝的開銷。

使用多路復用我們就可以在串行中處理很多個連接,當然這也不是沒有上限,這個取決於操作系統中文件描述符的上限,雖然說這個值也是可以修改的-_-

緩衝(Buffer)

對於操作系統來說,I/O是一個非常昂貴的操作,比如在磁盤中讀取,相較於隨機訪問內存(RAM)磁盤的訪問是非常緩慢的過程。所以,操作系統的設計中有很多機制去優化讀和寫,例如,在讀取一個文件時會事先將部分讀入內存中,這個步驟一般由內核來完成的。內核的代碼運行在內核空間,通常內核在讀取數據時會事先將數據讀入在內核空間的緩衝區中。

我們知道用戶所編寫的代碼是運行在用戶空間,是不能直接訪問內核空間的,內核提供一種名為系統調用(syscall)的接口去完成內核空間的訪問。

內核空間的執行代碼的代價也是相當昂貴的,因為稍有不慎就可能導致系統崩潰,所以內核中在系統調用的實現中有很多的檢查機制。大量使用系統調用會對我們的程序性能大打折扣,所以我們會考慮減少這類函數的訪問。比如在I/O系統調用中使用一個內存緩衝區比如數組,這樣可以避免每讀取一個位元組都調用一次系統函數,所以用戶緩衝區的大小關聯着程序的性能,當然性能也不是線性增長。

下表來自《The Linux Programming Interface》,作者對100m文本讀取時使用不同大小緩衝區的測試結果:

BUF_SIZE

Elapsed

Total CPU

User CPU

System CPU

1

107.43

107.32

8.20

99.12

2

54.16

53.89

4.13

49.76

4

31.72

30.96

2.30

28.66

8

15.59

14.34

1.08

13.26

16

7.50

7.14

0.51

6.63

32

3.76

3.68

0.26

3.41

64

2.19

2.04

0.13

1.91

128

2.16

1.59

0.11

1.48

256

2.06

1.75

0.10

1.65

512

2.06

1.03

0.05

0.98

1024

2.05

0.65

0.02

0.63

4096

2.05

0.38

0.01

0.38

16384

2.05

0.34

0.00

0.33

65536

2.06

0.32

0.00

0.32

這也就是為什麼很多編程語言中的IO處理函數都需要提供一個數組結構,IO本質上也就是用戶空間內核空間的拷貝。扯遠了,讀取Channel中的數據也是需要提供一個緩衝區的。NIO中的緩衝是一個Buffer對象,簡單點理解它底層就是一個數組,提供比數組更多的方式去管理和操作數組中的元素,但是它的實現也不是完全依賴於數組。

Buffer是一個抽象類,它有幾種基本數據類型的實現,例如,HeapByteBufferHeapIntBuffer,它們的底層也維護着一個與之對應的數組結構,注意這個Heap所表達的含義,它對應了我們使用new運算符在Heap中所創建的數組。Buffer一般通過靜態方法allocate去創建,當然創建方法還有可以一個數組入參的wrap(array)方法。

ByteBuffer byteBuff1 = ByteBuffer.allocate(1024);  ByteBuffer byteBuff2 = ByteBuffer.wrap(new byte[1024]);

在Java中數組是不能訪問索引超過數組大小的元素,如果超過則拋出索引越界異常。Buffer提供get(ini)/put(int, T)方法用於獲取或放置指定位置的數據。既然底層是數組,那麼Buffer就有一個最大容量,它和底層數組的大小等價,是一個不會改變的值。在Buffer中有一個capacity屬性代表着容量大小,它通過Buffer.capacity()方法直接獲取。

Buffer還有一個核心概念limit,它的值可以通過Buffer.limit()方法直接獲取,官方為其定義為第一個不可讀/寫的元素。也就是說從limit開始到capacity這段空間是不會被讀取或寫入的,用戶只能訪問索引從起始位置0limit - 1中的元素,使用大於(或等於)limit位置上的元素會拋出索引越界異常。limit的大小可以通過Buffer.limit(int)進行修改但不能超過capacitylimit初始值和capacity相等。

byte[] array = new byte[1024];  ByteBuffer byteBuff = ByteBuffer.wrap(array);  array.length == byteBuff.capacity(); // true  byteBuff.capacity() == byteBuff.limit(); // true  byteBuff.get(1023);byteBuff.limit(1023); // limit = 1023  byteBuff.get(1023); // java.lang.IndexOutOfBoundsException

在linux內核中維護着一個打開文件表,一個打開文件對應着表中的一條記錄,其中維護打開文件的偏移量狀態等信息。一些系統調用可以改變這些信息。比如,在read一個文件描述符(file descriptor)時會隱式將偏移量作調整,下次讀取時就會從該位置開始操作。

下圖為文件描述符表、打開文件表、inode表之間的關係:

Buffer中也有這麼一個類似偏移量的概念叫做position,它的值可以通過Buffer.position()方法直接獲取,官方對其的定義是下一個要讀或寫的索引值。每次讀取都會改變position的值,但是無論如何都不會超過limit,也就是說當position抵達limit時就無法用這個Buffer實例讀入或寫出數據。可以通過Buffer.flip()方法將limit設置為當前position位置並將position初始化為0,此時就可以使用這個Buffer去完成操作,這個過程也叫做讀寫切換。

while (fileChannel.read(byteBuff) > 0) {     int p = byteBuff.position();     byteBuff.flip(); // limit = position; position = 0;     p == byteBuff.limit(); // true     process(byteBuff);  }

還有一個和position相關的核心概念mark,在讀取數據的過程中可以通過Buffer.mark()mark的值置為positionmark的值沒有辦法直接獲取,但可以通過使用reset()position重置為mark的值,reset()方法不能在mark()之前調用。

以上就是Buffer中的幾個核心概念,它們的之間的關係為:

0⩽mark⩽position⩽limit

⩽capacity

多了Buffer這一層,我們就不用關心它們底層緩衝區具體是什麼,通過實現一套API就能完成基本的I/O操作。比如,通過DirectByteBuffer.allocateDirect()創建的DirectByteBuffer,它使用了堆外內存實現Buffer。又或者用FileChannel.map()方法創建的MappedByteBuffer,它將文件直接映射到內存中,是使用內存映射實現的Buffer,也是零拷貝的一種實現。

Buffer有很多高級用法就不一一敘述了,畢竟本文也不是介紹API的文章。

推薦讀物

寫到最後,我要推薦(安利)幾本個人覺得非常不錯的讀物:

首先當然是Steven大神的 《UNIX網絡編程 卷1 + 卷2》,這是網絡編程必讀書目之一。

系統編程首先推薦 《Linux/Unix系統編程手冊》,系統編程專家級讀物。當然apue也非常不錯,不過感覺這本可以與apue互補。

Java 網絡編程 《Java TCP/IP Socket編程》,關於Java Socket API 和 NIO API做了很全面的介紹,雖然內容有點老。